feat: native matrix URI pills for {user}/{room} + major rendering & codebase refactor

This change introduces native `matrix:` URI-based rendering for `{user}` and `{room}` placeholders,
replacing previous plaintext and matrix.to-based links. Users and rooms are now rendered as clickable
pills in supporting clients, with a clean display using display names and room names (no @/# prefixes).

Reporting, moderation, and auto-redaction messages have been updated to use the same rendering logic.
Inspect and event links now also use native `matrix:` URIs for direct in-client navigation.

Internally, URI generation and rendering logic have been unified via central helper functions,
ensuring consistent handling of user IDs, room IDs, aliases, and event IDs.

This commit also includes a broader refactor of the codebase:
- decomposed complex flows (e.g. join handling) into smaller helpers
- moved mutable class-level state to instance-level
- reduced duplicate API calls and redundant logic
- improved overall structure and maintainability

Test coverage has been extended for URI helpers and rendering logic to prevent regressions.

No breaking changes to existing template parameters like `{user_link}` or `{room_link}`.
This commit is contained in:
2026-04-11 20:21:33 +02:00
parent 933865d80c
commit edd3eee178
40 changed files with 474 additions and 382 deletions
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
+6 -4
View File
@@ -69,10 +69,10 @@ invitees: []
# set to {} if you don't care about greetings # set to {} if you don't care about greetings
greetings: greetings:
generic: | generic: |
Welcome {user_id}! Please be sure to read the topic for helpful links and information. Welcome {user}! Please be sure to read the topic for helpful links and information.
Use <a href="https://google.com">Google</a> for all other queries ;) Use <a href="https://google.com">Google</a> for all other queries ;)
encrypted: | encrypted: |
welcome {user_id}, this is an encrypted room, so you may not be able to see messages previously sent here. don't be welcome {user}, this is an encrypted room, so you may not be able to see messages previously sent here. don't be
alarmed. alarmed.
# which of the above greetings should be used in which rooms? use the exact name of each greeting # which of the above greetings should be used in which rooms? use the exact name of each greeting
@@ -100,7 +100,7 @@ notification_room:
# - {room_link}: clickable matrix.to-compatible link to the room # - {room_link}: clickable matrix.to-compatible link to the room
# - {room_id}: raw room ID # - {room_id}: raw room ID
join_notification_message: | join_notification_message: |
{user_link} ({user_id}) has joined {room_link}. {user} has joined {room}.
# whether to censor files/messages # whether to censor files/messages
# can be boolean (true/false) for all-or-nothing behavior, # can be boolean (true/false) for all-or-nothing behavior,
@@ -190,8 +190,10 @@ verification_message: |
Please send a message to this chat with the content: "{phrase}" Please send a message to this chat with the content: "{phrase}"
# prefixes used for the clickable {user} and {room} placeholders in rendered HTML notices.
# set either value to "" for a cleaner look without a visible prefix.
# Base URL for Matrix permalink generation. # Base URL for Matrix permalink generation.
# This is used for placeholders such as {user_link} and {room_link}. # This is used for placeholders such as {user_link} and {room_link}.
# Set this to your own matrix.to-compatible instance if you do not want to use https://matrix.to. # Set this to your own matrix.to-compatible instance if you do not want to use https://matrix.to.
matrix_to_base_url: 'https://matrix.to' matrix_to_base_url: "https://matrix.to"
Executable → Regular
View File
Executable → Regular
+347 -370
View File
@@ -1,15 +1,20 @@
# kickbot - a maubot plugin to track user activity and remove inactive users from rooms/spaces. # kickbot - a maubot plugin to track user activity and remove inactive users from rooms/spaces.
from typing import Awaitable, Type, Optional, Tuple, Dict from typing import Awaitable, Type, Optional, Tuple, Dict, Any
import json import json
import time import time
import re import re
import fnmatch import fnmatch
import asyncio import asyncio
from html import escape from html import escape
from urllib.parse import quote
import random import random
import asyncpg.exceptions import asyncpg.exceptions
from datetime import datetime from datetime import datetime
from dataclasses import dataclass
DEFAULT_USER_PILL_PREFIX = ""
DEFAULT_ROOM_PILL_PREFIX = ""
from mautrix.client import ( from mautrix.client import (
Client, Client,
@@ -66,6 +71,15 @@ from .helpers import (
) )
@dataclass(frozen=True)
class RenderContext:
user_id: str
user_display: str
room_id: Optional[str] = None
room_text: Optional[str] = None
event_id: Optional[str] = None
class Config(BaseProxyConfig): class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None: def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("sleep") helper.copy("sleep")
@@ -107,11 +121,6 @@ class CommunityBot(Plugin):
return str(self.config.get("matrix_to_base_url", "https://matrix.to")).rstrip("/") return str(self.config.get("matrix_to_base_url", "https://matrix.to")).rstrip("/")
_redaction_tasks: asyncio.Task = None
_verification_states: Dict[str, Dict] = {}
_report_counts: Dict[str, set] = {}
def _matrix_to_url(self, target: str) -> str: def _matrix_to_url(self, target: str) -> str:
base_url = self._get_matrix_to_base_url() base_url = self._get_matrix_to_base_url()
return f"{base_url}/#/{target}" return f"{base_url}/#/{target}"
@@ -120,6 +129,22 @@ class CommunityBot(Plugin):
def _render_html_link(self, target: str, label: str) -> str: def _render_html_link(self, target: str, label: str) -> str:
return f"<a href='{escape(self._matrix_to_url(target), quote=True)}'>{escape(label)}</a>" return f"<a href='{escape(self._matrix_to_url(target), quote=True)}'>{escape(label)}</a>"
def _encode_matrix_uri_part(self, value: str, sigils: str = "@!#$") -> str:
"""Encode an MXID-like identifier for safe use inside a matrix: URI path."""
return quote(str(value).lstrip(sigils), safe=":.=_+-")
def _matrix_uri_user(self, user_id: str) -> str:
return f"matrix:u/{self._encode_matrix_uri_part(user_id, '@')}?action=chat"
def _matrix_uri_room(self, room_id: str) -> str:
return f"matrix:roomid/{self._encode_matrix_uri_part(room_id, '!')}"
def _matrix_uri_event(self, room_id: str, event_id: str) -> str:
encoded_room_id = self._encode_matrix_uri_part(room_id, "!")
encoded_event_id = self._encode_matrix_uri_part(event_id, "$")
return f"matrix:roomid/{encoded_room_id}/e/{encoded_event_id}"
async def _get_user_display_name(self, room_id: RoomID, user_id: str) -> str: async def _get_user_display_name(self, room_id: RoomID, user_id: str) -> str:
try: try:
member_state = await self.client.get_state_event( member_state = await self.client.get_state_event(
@@ -130,14 +155,278 @@ class CommunityBot(Plugin):
displayname = getattr(member_state, "displayname", None) displayname = getattr(member_state, "displayname", None)
if displayname: if displayname:
return str(displayname) return str(displayname)
except Exception: except Exception as e:
pass self.log.debug(f"Failed to fetch display name for {user_id} in {room_id}: {e}")
try: try:
return self.client.parse_user_id(user_id)[0] return self.client.parse_user_id(user_id)[0]
except Exception: except Exception:
return user_id return user_id
def _matrix_user_uri(self, user_id: str) -> str:
"""Build a Matrix URI for a user."""
return self._matrix_uri_user(user_id)
def _matrix_room_uri(self, room_id: str, room_alias: str | None = None) -> str:
"""Build a Matrix URI for a room, preferring canonical alias when available."""
if room_alias:
return f"matrix:r/{self._encode_matrix_uri_part(room_alias, '#')}"
return self._matrix_uri_room(room_id)
def _matrix_event_uri(self, room_id: str, event_id: str) -> str:
"""Build a Matrix URI for an event inside a room."""
return self._matrix_uri_event(room_id, event_id)
async def _get_room_display_text(self, room_id: RoomID) -> str:
"""Return a human-friendly room name, falling back to the room ID."""
try:
room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
room_name = getattr(room_name_event, "name", None)
if room_name:
return str(room_name)
except Exception as e:
self.log.debug(f"Failed to fetch room name for {room_id}: {e}")
return str(room_id)
async def _build_render_context(self, room_id: RoomID, user_id: str) -> RenderContext:
"""Fetch the values needed for template rendering once per join event."""
user_display = await self._get_user_display_name(room_id, user_id)
room_text = await self._get_room_display_text(room_id)
return RenderContext(
user_id=str(user_id),
user_display=user_display,
room_id=str(room_id),
room_text=room_text,
)
async def _send_rendered_notice(
self,
target_room_id: RoomID,
template: str,
context: RenderContext,
) -> None:
"""Render a template once and send plaintext + HTML variants."""
plain_text, html_message = self._render_message_template(
template,
context.user_id,
context.user_display,
context.room_id,
context.room_text,
)
await self.client.send_notice(target_room_id, plain_text, html=html_message)
async def _handle_join_notifications(self, evt: StateEvent) -> None:
"""Send configured greetings and join notifications for a new member."""
room_id = str(evt.room_id)
if room_id not in self.config["greeting_rooms"]:
return
greeting_name = self.config["greeting_rooms"][room_id]
context = await self._build_render_context(evt.room_id, evt.sender)
if greeting_name != "none":
greeting_map = self.config["greetings"]
await self._sleep_if_configured(self.config["welcome_sleep"])
await self._send_rendered_notice(evt.room_id, greeting_map[greeting_name], context)
if self.config["notification_room"]:
await self._send_rendered_notice(
self.config["notification_room"],
self.config["join_notification_message"],
context,
)
def _is_human_verification_enabled_for_room(self, room_id: RoomID) -> bool:
configured = self.config["check_if_human"]
if isinstance(configured, bool):
return configured
if isinstance(configured, list):
return room_id in configured
return False
async def _user_requires_human_verification(
self, user_id: UserID, room_id: RoomID
) -> Optional[int]:
"""Return the required message power level if verification should proceed."""
if await self.user_has_unlimited_power(user_id, room_id):
self.log.debug(
f"User {user_id} has unlimited power in {room_id}, skipping verification"
)
return None
power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS)
user_level = power_levels.get_user_level(user_id)
required_level = power_levels.events.get(
str(EventType.ROOM_MESSAGE), power_levels.events_default
)
self.log.debug(
f"User {user_id} has power level {user_level}, required level is {required_level}"
)
if user_level >= required_level:
self.log.debug(
f"User {user_id} already has sufficient power level ({user_level} >= {required_level})"
)
return None
return required_level
async def _create_verification_dm(
self, user_id: UserID, roomname: str
) -> Optional[RoomID]:
"""Create a DM room for human verification with bounded retries."""
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
dm_room = await self.client.create_room(
preset=RoomCreatePreset.PRIVATE,
invitees=[user_id],
is_direct=True,
initial_state=[
{
"type": str(EventType.ROOM_NAME),
"content": {"name": f"[{roomname}] join verification"},
}
],
)
self.log.info(f"Created DM room {dm_room} for {user_id}")
return dm_room
except Exception as e:
if attempt < max_retries - 1:
self.log.warning(
f"Failed to create DM room (attempt {attempt + 1}/{max_retries}): {e}"
)
await asyncio.sleep(retry_delay)
else:
self.log.error(
f"Failed to initiate verification process after {max_retries} attempts: {e}"
)
return None
async def _maybe_start_human_verification(
self, evt: StateEvent, room_label: str
) -> None:
"""Run the human verification flow for a newly joined member when configured."""
if not (self.config["check_if_human"] and self.config["verification_phrases"]):
return
verification_enabled = self._is_human_verification_enabled_for_room(evt.room_id)
self.log.debug(
f"Verification enabled for room {evt.room_id}: {verification_enabled}"
)
if not verification_enabled:
return
try:
required_level = await self._user_requires_human_verification(
evt.sender, evt.room_id
)
except Exception as e:
self.log.error(f"Failed to check user power level: {e}")
return
if required_level is None:
return
dm_room = await self._create_verification_dm(evt.sender, room_label)
if not dm_room:
return
verification_phrase = random.choice(self.config["verification_phrases"])
verification_state = {
"user": evt.sender,
"target_room": evt.room_id,
"phrase": verification_phrase,
"attempts": self.config["verification_attempts"],
"required_level": required_level,
}
await self.store_verification_state(dm_room, verification_state)
greeting = self.config["verification_message"].format(
room=room_label, phrase=verification_phrase
)
await self.client.send_notice(dm_room, html=greeting)
self.log.info(
f"Started verification process for {evt.sender} in room {evt.room_id} for room {room_label}"
)
async def _resolve_room_identifier(self, room: str) -> str:
"""Resolve either a room alias or a room ID into a room ID."""
if room.startswith("#"):
resolved = await self.client.resolve_room_alias(room)
room_id = resolved["room_id"]
self.log.info(f"Resolved alias '{room}' to room ID: {room_id}")
return room_id
self.log.info(f"Using direct room ID: {room}")
return room
async def _get_room_metadata(self, room_id: str) -> Dict[str, Optional[str]]:
"""Fetch the room name and topic once for room replacement flows."""
metadata: Dict[str, Optional[str]] = {"room_name": None, "room_topic": None}
try:
room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
metadata["room_name"] = getattr(room_name_event, "name", None)
except Exception as e:
self.log.warning(f"Failed to get room name: {e}")
try:
room_topic_event = await self.client.get_state_event(room_id, EventType.ROOM_TOPIC)
metadata["room_topic"] = getattr(room_topic_event, "topic", None)
except Exception as e:
self.log.warning(f"Failed to get room topic: {e}")
return metadata
async def _detect_space_type(self, room_id: str) -> bool:
"""Return True when the target room is a space."""
try:
state_events = await self.client.get_state(room_id)
for state_event in state_events:
if str(state_event.type) != "m.room.create":
continue
space_type = state_event.content.get("type")
is_space = space_type == "m.space"
self.log.info(f"Detected room type {space_type!r} for {room_id}")
return is_space
except Exception as e:
self.log.error(f"Failed to detect room type for {room_id}: {e}")
return False
def _format_user_pill(
self,
user_id: str,
user_display: Optional[str] = None,
) -> Tuple[str, str]:
"""Return plaintext and HTML variants for the {user} placeholder."""
safe_user_display = user_display or user_id
prefix = DEFAULT_USER_PILL_PREFIX
label = f"{prefix}{safe_user_display}"
href = self._matrix_uri_user(str(user_id))
return label, f"<a href='{href}'>{escape(label)}</a>"
def _format_room_pill(
self,
room_id: Optional[str] = None,
room_text: Optional[str] = None,
) -> Tuple[str, str]:
"""Return plaintext and HTML variants for the {room} placeholder."""
safe_room_id = str(room_id or "")
safe_room_text = room_text or safe_room_id
prefix = DEFAULT_ROOM_PILL_PREFIX
label = f"{prefix}{safe_room_text}"
if safe_room_id:
href = self._matrix_uri_room(safe_room_id)
html = f"<a href='{href}'>{escape(label)}</a>"
else:
html = escape(label)
return label, html
def _render_message_template( def _render_message_template(
self, self,
template: str, template: str,
@@ -151,21 +440,23 @@ class CommunityBot(Plugin):
safe_room_id = room_id or "" safe_room_id = room_id or ""
safe_room_text = room_text or safe_room_id safe_room_text = room_text or safe_room_id
room_url = self._matrix_to_url(safe_room_id) if safe_room_id else "" room_url = self._matrix_to_url(safe_room_id) if safe_room_id else ""
user_plain, user_html = self._format_user_pill(user_id, safe_user_display)
room_plain, room_html = self._format_room_pill(safe_room_id, safe_room_text)
plain_text = template.format( plain_text = template.format(
user=safe_user_display, user=user_plain,
user_id=user_id, user_id=user_id,
user_link=user_url, user_link=user_url,
room=safe_room_text, room=room_plain,
room_link=room_url, room_link=room_url,
room_id=safe_room_id, room_id=safe_room_id,
) )
html_message = template.format( html_message = template.format(
user=escape(safe_user_display), user=user_html,
user_id=escape(user_id), user_id=escape(user_id),
user_link=f"<a href='{user_url}'>{escape(safe_user_display)}</a>", user_link=f"<a href='{user_url}'>{escape(safe_user_display)}</a>",
room=escape(safe_room_text), room=room_html,
room_link=( room_link=(
f"<a href='{room_url}'>{escape(safe_room_text)}</a>" f"<a href='{room_url}'>{escape(safe_room_text)}</a>"
if room_url if room_url
@@ -181,14 +472,14 @@ class CommunityBot(Plugin):
self.config.load_and_update() self.config.load_and_update()
self.config_manager = config_manager.ConfigManager(self.config) self.config_manager = config_manager.ConfigManager(self.config)
self.client.add_dispatcher(MembershipEventDispatcher) self.client.add_dispatcher(MembershipEventDispatcher)
# Start background redaction task self._redaction_task: Optional[asyncio.Task] = None
self._redaction_tasks = asyncio.create_task(self._redaction_loop()) self._report_counts: Dict[str, set[str]] = {}
# Clean up stale verification states self._redaction_task = asyncio.create_task(self._redaction_loop())
await self.cleanup_stale_verification_states() await self.cleanup_stale_verification_states()
async def stop(self) -> None: async def stop(self) -> None:
if self._redaction_tasks: if self._redaction_task:
self._redaction_tasks.cancel() self._redaction_task.cancel()
await super().stop() await super().stop()
async def _sleep_if_configured(self, delay: float) -> None: async def _sleep_if_configured(self, delay: float) -> None:
@@ -1055,212 +1346,35 @@ class CommunityBot(Plugin):
async def newjoin(self, evt: StateEvent) -> None: async def newjoin(self, evt: StateEvent) -> None:
if evt.source & SyncStream.STATE: if evt.source & SyncStream.STATE:
return return
else:
# we only care about join events in rooms in the space
# this avoids trying to verify users in other rooms the bot might be in,
# such as public banlist policy rooms
space_rooms = await self.get_space_roomlist()
if evt.room_id not in space_rooms:
return
try: space_rooms = await self.get_space_roomlist()
on_banlist = await self.check_if_banned(evt.sender) if evt.room_id not in space_rooms:
except Exception as e: return
self.log.error(f"Failed to check if {evt.sender} is banned: {e}")
on_banlist = False
if on_banlist:
await self.ban_this_user(evt.sender)
return
# passive sync of tracking db
if evt.room_id == self.config["parent_room"]:
await self.do_sync()
# greeting activities
room_id = str(evt.room_id)
self.log.debug(f"New join in room {room_id} by {evt.sender}")
self.log.debug(f"Greeting rooms config: {self.config['greeting_rooms']}")
self.log.debug(f"Check if human config: {self.config['check_if_human']}")
self.log.debug(
f"Verification phrases config: {self.config['verification_phrases']}"
)
if room_id in self.config["greeting_rooms"]: try:
if on_banlist: on_banlist = await self.check_if_banned(evt.sender)
return except Exception as e:
greeting_map = self.config["greetings"] self.log.error(f"Failed to check if {evt.sender} is banned: {e}")
greeting_name = self.config["greeting_rooms"][room_id] on_banlist = False
user_display = await self._get_user_display_name(evt.room_id, evt.sender)
try:
roomnamestate = await self.client.get_state_event(
evt.room_id, "m.room.name"
)
room_text = getattr(roomnamestate, "name", str(evt.room_id))
except Exception:
room_text = str(evt.room_id)
if greeting_name != "none": if on_banlist:
greeting_text, greeting_html = self._render_message_template( await self.ban_this_user(evt.sender)
greeting_map[greeting_name], return
evt.sender,
user_display,
evt.room_id,
room_text,
)
await self._sleep_if_configured(self.config["welcome_sleep"])
await self.client.send_notice(
evt.room_id,
greeting_text,
html=greeting_html,
)
else:
pass
if self.config["notification_room"]: if evt.room_id == self.config["parent_room"]:
try: await self.do_sync()
roomnamestate = await self.client.get_state_event(
evt.room_id, "m.room.name"
)
room_text = getattr(roomnamestate, "name", str(evt.room_id)) self.log.debug(f"New join in room {evt.room_id} by {evt.sender}")
except Exception: await self._handle_join_notifications(evt)
room_text = str(evt.room_id)
user_display = await self._get_user_display_name(evt.room_id, evt.sender) if not (self.config["check_if_human"] and self.config["verification_phrases"]):
notification_text, notification_html = self._render_message_template( return
self.config["join_notification_message"],
evt.sender,
user_display,
evt.room_id,
room_text,
)
await self.client.send_notice(
self.config["notification_room"],
notification_text,
html=notification_html,
)
# Human verification logic try:
if self.config["check_if_human"] and self.config["verification_phrases"]: room_label = await common_utils.get_room_name(self.client, evt.room_id, self.log)
try: await self._maybe_start_human_verification(evt, room_label)
# Check if verification is enabled for this room except Exception as e:
verification_enabled = False self.log.error(f"Failed to start verification process: {e}")
if isinstance(self.config["check_if_human"], bool):
verification_enabled = self.config["check_if_human"]
elif isinstance(self.config["check_if_human"], list):
verification_enabled = (
evt.room_id in self.config["check_if_human"]
)
self.log.debug(
f"Verification enabled for room {room_id}: {verification_enabled}"
)
if not verification_enabled:
return
# Get room name for greeting
roomname = "this room"
roomname = await common_utils.get_room_name(
self.client, evt.room_id, self.log
)
# Check if user already has sufficient power level or unlimited power
try:
# First check if user has unlimited power (creator in modern room versions)
if await self.user_has_unlimited_power(evt.sender, evt.room_id):
self.log.debug(
f"User {evt.sender} has unlimited power in {evt.room_id}, skipping verification"
)
return
power_levels = await self.client.get_state_event(
evt.room_id, EventType.ROOM_POWER_LEVELS
)
user_level = power_levels.get_user_level(evt.sender)
events_default = power_levels.events_default
events = power_levels.events
# Get the required power level for sending messages
required_level = events.get(
str(EventType.ROOM_MESSAGE), events_default
)
self.log.debug(
f"User {evt.sender} has power level {user_level}, required level is {required_level}"
)
# If user already has sufficient power level, skip verification
if user_level >= required_level:
self.log.debug(
f"User {evt.sender} already has sufficient power level ({user_level} >= {required_level})"
)
return
except Exception as e:
self.log.error(f"Failed to check user power level: {e}")
return
# Create DM room with name
max_retries = 3
retry_delay = 1 # seconds
last_error = None
for attempt in range(max_retries):
try:
dm_room = await self.client.create_room(
preset=RoomCreatePreset.PRIVATE,
invitees=[evt.sender],
is_direct=True,
initial_state=[
{
"type": str(EventType.ROOM_NAME),
"content": {
"name": f"[{roomname}] join verification"
},
}
],
)
self.log.info(f"Created DM room {dm_room} for {evt.sender}")
break
except Exception as e:
last_error = e
if (
attempt < max_retries - 1
): # Don't sleep on the last attempt
self.log.warning(
f"Failed to create DM room (attempt {attempt + 1}/{max_retries}): {e}"
)
await asyncio.sleep(retry_delay)
else:
self.log.error(
f"Failed to initiate verification process after {max_retries} attempts: {e}"
)
return
# Select random verification phrase
verification_phrase = random.choice(
self.config["verification_phrases"]
)
# Store verification state
verification_state = {
"user": evt.sender,
"target_room": evt.room_id,
"phrase": verification_phrase,
"attempts": self.config["verification_attempts"],
"required_level": required_level,
}
await self.store_verification_state(dm_room, verification_state)
# Send greeting
greeting = self.config["verification_message"].format(
room=roomname, phrase=verification_phrase
)
await self.client.send_notice(dm_room, html=greeting)
self.log.info(
f"Started verification process for {evt.sender} in room {room_id} for room {roomname}"
)
except Exception as e:
self.log.error(f"Failed to start verification process: {e}")
@event.on(EventType.ROOM_MESSAGE) @event.on(EventType.ROOM_MESSAGE)
async def handle_verification(self, evt: MessageEvent) -> None: async def handle_verification(self, evt: MessageEvent) -> None:
@@ -1447,11 +1561,10 @@ class CommunityBot(Plugin):
room_text = str(evt.room_id) room_text = str(evt.room_id)
reporter_display = await self._get_user_display_name(evt.room_id, evt.sender) reporter_display = await self._get_user_display_name(evt.room_id, evt.sender)
room_url = self._matrix_to_url(evt.room_id) room_plain, room_link_html = self._format_room_pill(str(evt.room_id), room_text)
message_url = self._matrix_to_url(f"{evt.room_id}/{target_event_id}") reporter_plain, reporter_link_html = self._format_user_pill(str(evt.sender), reporter_display)
room_url = room_plain
room_link_html = self._render_html_link(evt.room_id, room_text) message_url = self._matrix_uri_event(str(evt.room_id), str(target_event_id))
reporter_link_html = self._render_html_link(evt.sender, reporter_display)
message_link_html = f"<a href='{escape(message_url, quote=True)}'>Original Event Link</a>" message_link_html = f"<a href='{escape(message_url, quote=True)}'>Original Event Link</a>"
if self.config.get("auto_redact_majority", False): if self.config.get("auto_redact_majority", False):
@@ -1468,13 +1581,13 @@ class CommunityBot(Plugin):
) )
notification_text = ( notification_text = (
"Message Auto-Redacted 🗑️\n" "🗑️ Message Auto-Redacted\n"
f"Room: {room_url}\n" f"Room: {room_url}\n"
f"Reason: Community majority vote reached ({current_reports} out of {human_count} members).\n" f"Reason: Community majority vote reached ({current_reports} out of {human_count} members).\n"
f"Context: {message_url}" f"Context: {message_url}"
) )
notification_html = ( notification_html = (
f"<b>Message Auto-Redacted</b> 🗑️<br>" f"🗑️ <b>Message Auto-Redacted</b><br>"
f"<b>Room:</b> {room_link_html}<br>" f"<b>Room:</b> {room_link_html}<br>"
f"<b>Reason:</b> Community majority vote reached ({current_reports} out of {human_count} members).<br>" f"<b>Reason:</b> Community majority vote reached ({current_reports} out of {human_count} members).<br>"
f"<b>Context:</b> {message_link_html}" f"<b>Context:</b> {message_link_html}"
@@ -1492,14 +1605,14 @@ class CommunityBot(Plugin):
if current_reports == 1: if current_reports == 1:
notification_text = ( notification_text = (
"Message Reported 🚨\n" "🚨 Message Reported\n"
f"First Reporter: {reporter_display} ({evt.sender})\n" f"First Reporter: {reporter_plain}\n"
f"Room: {room_url}\n" f"Room: {room_url}\n"
f"Action: {message_url}" f"Action: {message_url}"
) )
notification_html = ( notification_html = (
f"<b>Message Reported</b> 🚨<br>" f"🚨 <b>Message Reported</b><br>"
f"<b>First Reporter:</b> {reporter_link_html} ({escape(evt.sender)})<br>" f"<b>First Reporter:</b> {reporter_link_html}<br>"
f"<b>Room:</b> {room_link_html}<br>" f"<b>Room:</b> {room_link_html}<br>"
f"<b>Action:</b> <a href='{escape(message_url, quote=True)}'>Click here to inspect and moderate</a>" f"<b>Action:</b> <a href='{escape(message_url, quote=True)}'>Click here to inspect and moderate</a>"
) )
@@ -2159,176 +2272,40 @@ class CommunityBot(Plugin):
@decorators.require_parent_room @decorators.require_parent_room
@decorators.require_permission(min_level=100) @decorators.require_permission(min_level=100)
async def room_replace(self, evt: MessageEvent, room: str) -> None: async def room_replace(self, evt: MessageEvent, room: str) -> None:
self.log.info(f"=== REPLACEROOM COMMAND STARTED ===")
self.log.info(f"Command arguments: room='{room}', evt.room_id='{evt.room_id}'")
await evt.mark_read() await evt.mark_read()
if not room: if not room:
room = evt.room_id room = evt.room_id
# first we need to get relevant room state of the room we want to replace
# this includes the room name, alias, and join rules
if room.startswith("#"):
room_id = await self.client.resolve_room_alias(room)
room_id = room_id["room_id"]
self.log.info(f"Resolved alias '{room}' to room ID: {room_id}")
else:
room_id = room
self.log.info(f"Using direct room ID: {room_id}")
# Check bot permissions in the old room room_id = await self._resolve_room_identifier(room)
self.log.info(f"=== CHECKING BOT PERMISSIONS ===")
has_perms, error_msg, _ = await self.check_bot_permissions( has_perms, error_msg, _ = await self.check_bot_permissions(
room_id, evt, ["state", "tombstone", "power_levels"] room_id, evt, ["state", "tombstone", "power_levels"]
) )
self.log.info(
f"Bot permissions check result: has_perms={has_perms}, error_msg='{error_msg}'"
)
if not has_perms: if not has_perms:
await evt.respond(f"Cannot replace room: {error_msg}") await evt.respond(f"Cannot replace room: {error_msg}")
self.log.info("Bot permissions check failed, returning")
return return
# Get the room name from the state event metadata = await self._get_room_metadata(room_id)
room_name = None room_name = metadata["room_name"]
try: room_topic = metadata["room_topic"]
room_name_event = await self.client.get_state_event( is_space = await self._detect_space_type(room_id)
room_id, EventType.ROOM_NAME
)
room_name = room_name_event.name
self.log.info(f"Retrieved room name: '{room_name}'")
except Exception as e:
self.log.warning(f"Failed to get room name: {e}")
# room_name remains None
# get the room topic from the state event
room_topic = None
try:
room_topic_event = await self.client.get_state_event(
room_id, EventType.ROOM_TOPIC
)
room_topic = room_topic_event.topic
except Exception as e:
self.log.warning(f"Failed to get room topic: {e}")
# room_topic remains None
# Check if the room being replaced is a space
is_space = False
self.log.info(f"=== ABOUT TO START SPACE DETECTION ===")
self.log.info(f"=== SPACE DETECTION DEBUG START ===")
self.log.info(f"Room ID being checked: {room_id}")
self.log.info(f"EventType module: {EventType}")
self.log.info(
f"EventType.ROOM_CREATE exists: {hasattr(EventType, 'ROOM_CREATE')}"
)
if hasattr(EventType, "ROOM_CREATE"):
self.log.info(
f"EventType.ROOM_CREATE value: {getattr(EventType, 'ROOM_CREATE')}"
)
else:
self.log.warning("EventType.ROOM_CREATE does not exist!")
try:
# Get the room creation event to check if it's a space
state_events = await self.client.get_state(room_id)
self.log.info(
f"Retrieved {len(state_events)} state events for space detection"
)
# Log all event types for debugging
event_types = [event.type for event in state_events]
self.log.info(f"Event types found: {event_types}")
# Debug EventType.ROOM_CREATE constant
self.log.info(f"EventType.ROOM_CREATE value: {EventType.ROOM_CREATE}")
self.log.info(f"EventType.ROOM_CREATE type: {type(EventType.ROOM_CREATE)}")
# Also try string comparison as fallback
room_create_string = "m.room.create"
self.log.info(f"String comparison value: {room_create_string}")
# Try to find the room creation event using multiple methods
room_create_event = None
for i, event in enumerate(state_events):
self.log.info(
f"Event {i}: type={event.type} (type: {type(event.type)})"
)
# Try multiple comparison methods
if (
hasattr(EventType, "ROOM_CREATE")
and event.type == EventType.ROOM_CREATE
):
self.log.info(f"✓ Matched EventType.ROOM_CREATE")
room_create_event = event
break
elif str(event.type) == room_create_string:
self.log.info(f"✓ Matched string comparison 'm.room.create'")
room_create_event = event
break
elif event.type == "m.room.create":
self.log.info(f"✓ Matched direct string comparison")
room_create_event = event
break
else:
self.log.info(f"✗ No match for event {i}")
# Now process the room creation event if found
if room_create_event:
space_type = room_create_event.content.get("type")
self.log.info(f"Found ROOM_CREATE event with type: {space_type}")
self.log.info(f"Full ROOM_CREATE content: {room_create_event.content}")
is_space = space_type == "m.space"
self.log.info(f"Space detection result: {is_space}")
else:
self.log.warning("No ROOM_CREATE event found using any method")
if is_space:
self.log.info(
f"✓ FINAL RESULT: Room {room_id} IS a space - will create new space"
)
else:
self.log.info(
f"✗ FINAL RESULT: Room {room_id} is NOT a space - will create regular room"
)
except Exception as e:
self.log.error(f"❌ ERROR during space detection: {e}")
import traceback
self.log.error(f"Traceback: {traceback.format_exc()}")
# Assume it's not a space if we can't determine
is_space = False
self.log.info(f"=== SPACE DETECTION DEBUG END - is_space={is_space} ===")
# Get list of aliases to transfer while removing them from the old room
aliases_to_transfer = await self.remove_room_aliases(room_id, evt) aliases_to_transfer = await self.remove_room_aliases(room_id, evt)
# Check if community slug is configured
if self.config["use_community_slug"] and not self.config["community_slug"]: if self.config["use_community_slug"] and not self.config["community_slug"]:
await evt.respond( await evt.respond(
"No community slug configured. Please run initialize command first." "No community slug configured. Please run initialize command first."
) )
return return
# Inform user about what type of room is being replaced
if not room_name: if not room_name:
room_name = f"Room {room_id[:8]}..." # Fallback name room_name = f"Room {room_id[:8]}..."
self.log.warning(f"Using fallback room name: {room_name}") self.log.warning(f"Using fallback room name: {room_name}")
self.log.info(
f"Final decision - is_space: {is_space}, room_name: '{room_name}'"
)
self.log.info(f"About to send user message - is_space: {is_space}")
if is_space: if is_space:
await evt.respond(f"Replacing space '{room_name}' with a new space...") await evt.respond(f"Replacing space '{room_name}' with a new space...")
self.log.info(f"✓ Sent 'Replacing space' message to user")
else: else:
await evt.respond(f"Replacing room '{room_name}' with a new room...") await evt.respond(f"Replacing room '{room_name}' with a new room...")
self.log.info(f"✗ Sent 'Replacing room' message to user")
# Validate that the new room alias is available # Validate that the new room alias is available
is_valid, conflicting_aliases = await self.validate_room_aliases( is_valid, conflicting_aliases = await self.validate_room_aliases(
Executable → Regular
View File
Executable → Regular
View File
View File
View File
View File
View File
Executable → Regular
View File
View File
View File
View File
View File
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
+4 -4
View File
@@ -126,10 +126,10 @@ plugin_config:
# set to {} if you don't care about greetings # set to {} if you don't care about greetings
greetings: greetings:
generic: | generic: |
Welcome {user_link}! Please be sure to read the topic for helpful links and information. Welcome {user}! Please be sure to read the topic for helpful links and information.
Use <a href="https://google.com">Google</a> for all other queries ;) Use <a href="https://google.com">Google</a> for all other queries ;)
encrypted: | encrypted: |
welcome {user_link}, this is an encrypted room, so you may not be able to see messages previously sent here. don't be welcome {user}, this is an encrypted room, so you may not be able to see messages previously sent here. don't be
alarmed. alarmed.
# which of the above greetings should be used in which rooms? use the exact name of each greeting # which of the above greetings should be used in which rooms? use the exact name of each greeting
@@ -150,9 +150,9 @@ plugin_config:
# message to send to the notification room when someone joins one of the above rooms: # message to send to the notification room when someone joins one of the above rooms:
join_notification_message: | join_notification_message: |
{user_link} has joined {room_link} ({user_id}). {user} has joined {room}.
# Base URL for Matrix permalink generation. # Base URL for Matrix permalink generation.
# This is used for placeholders such as {user_link} and {room_link}. # This is used for placeholders such as {user_link} and {room_link}.
# Set this to your own matrix.to-compatible instance if you do not want to use https://matrix.to. # Set this to your own matrix.to-compatible instance if you do not want to use https://matrix.to.
matrix_to_base_url: "https://matrix.to" matrix_to_base_url: "https://matrix.to"
Executable → Regular
+1 -1
View File
@@ -1,6 +1,6 @@
maubot: 0.1.0 maubot: 0.1.0
id: org.jobmachine.communitybot id: org.jobmachine.communitybot
version: 0.5.0 version: 0.6.0
license: MIT license: MIT
modules: modules:
- community - community
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
Executable → Regular
View File
+56
View File
@@ -0,0 +1,56 @@
import html
from types import SimpleNamespace
import pytest
from community.bot import CommunityBot, DEFAULT_ROOM_PILL_PREFIX, DEFAULT_USER_PILL_PREFIX
@pytest.fixture()
def bot() -> CommunityBot:
plugin = CommunityBot.__new__(CommunityBot)
plugin.config = {}
return plugin
def test_user_uri_helper_strips_at_and_uses_chat_action(bot: CommunityBot) -> None:
assert bot._matrix_user_uri("@alice:example.org") == "matrix:u/alice:example.org?action=chat"
def test_room_uri_helper_prefers_alias(bot: CommunityBot) -> None:
assert bot._matrix_room_uri("!roomid:example.org", "#general:example.org") == "matrix:r/general:example.org"
def test_room_uri_helper_falls_back_to_room_id_without_bang(bot: CommunityBot) -> None:
assert bot._matrix_room_uri("!roomid:example.org", None) == "matrix:roomid/roomid:example.org"
def test_event_uri_helper_strips_prefixes(bot: CommunityBot) -> None:
assert bot._matrix_event_uri("!roomid:example.org", "$eventid") == "matrix:roomid/roomid:example.org/e/eventid"
def test_format_user_pill_uses_clean_default_prefix(bot: CommunityBot) -> None:
plain, formatted = bot._format_user_pill("@alice:example.org", "Alice")
assert plain == f"{DEFAULT_USER_PILL_PREFIX}Alice"
assert 'href="matrix:u/alice:example.org?action=chat"' in formatted
assert ">Alice<" in formatted
def test_format_room_pill_uses_alias_when_available(bot: CommunityBot) -> None:
plain, formatted = bot._format_room_pill("!roomid:example.org", "General", "#general:example.org")
assert plain == f"{DEFAULT_ROOM_PILL_PREFIX}General"
assert formatted == '<a href="matrix:r/general:example.org">General</a>'
def test_format_room_pill_falls_back_to_room_id(bot: CommunityBot) -> None:
plain, formatted = bot._format_room_pill("!roomid:example.org", "General", None)
assert plain == f"{DEFAULT_ROOM_PILL_PREFIX}General"
assert formatted == '<a href="matrix:roomid/roomid:example.org">General</a>'
def test_format_user_pill_escapes_displayname(bot: CommunityBot) -> None:
plain, formatted = bot._format_user_pill("@alice:example.org", '<Admin & Ops>')
assert plain == f"{DEFAULT_USER_PILL_PREFIX}<Admin & Ops>"
# Keep this broad enough to avoid coupling to quote style.
assert "matrix:u/alice:example.org?action=chat" in formatted
assert html.escape('<Admin & Ops>') in formatted
Executable → Regular
View File
+10
View File
@@ -0,0 +1,10 @@
from community.bot import CommunityBot
def test_matrix_uri_wrappers_delegate_to_canonical_helpers() -> None:
bot = CommunityBot.__new__(CommunityBot)
bot.config = {}
assert bot._matrix_user_uri("@alice:example.org") == "matrix:u/alice:example.org?action=chat"
assert bot._matrix_room_uri("!roomid:example.org") == "matrix:roomid/roomid:example.org"
assert bot._matrix_room_uri("!roomid:example.org", "#general:example.org") == "matrix:r/general:example.org"
assert bot._matrix_event_uri("!roomid:example.org", "$eventid") == "matrix:roomid/roomid:example.org/e/eventid"
Executable → Regular
View File
Executable → Regular
View File
View File
Executable → Regular
+50 -3
View File
@@ -12,7 +12,11 @@ from community.bot import CommunityBot
def bot(): def bot():
bot = CommunityBot.__new__(CommunityBot) bot = CommunityBot.__new__(CommunityBot)
bot.client = Mock() bot.client = Mock()
bot.config = {"matrix_to_base_url": "https://matrix.to"} bot.config = {
"matrix_to_base_url": "https://matrix.to",
"user_pill_prefix": "@",
"room_pill_prefix": "#",
}
return bot return bot
@@ -51,7 +55,50 @@ def test_render_message_template_supports_user_id_and_user_link(bot):
"General", "General",
) )
assert plain == "Alice / @alice:example.org / https://matrix.to/#/@alice:example.org" assert plain == "@Alice / @alice:example.org / https://matrix.to/#/@alice:example.org"
assert "Alice / @alice:example.org / " in html assert "@Alice / @alice:example.org / " in html
assert '<a href=' in html assert '<a href=' in html
assert '>Alice</a>' in html assert '>Alice</a>' in html
def test_render_message_template_uses_configurable_user_and_room_pill_prefixes(bot):
bot.config["user_pill_prefix"] = ""
bot.config["room_pill_prefix"] = ""
plain, html = bot._render_message_template(
"{user} has joined {room}.",
"@alice:example.org",
"Alice",
"!room:example.org",
"General",
)
assert plain == "Alice has joined General."
assert "<a href='matrix:u/alice:example.org?action=chat'>Alice</a>" in html
assert "<a href='matrix:roomid/room:example.org'>General</a>" in html
def test_render_message_template_defaults_to_prefixed_user_and_room_pills(bot):
plain, html = bot._render_message_template(
"{user} has joined {room}.",
"@alice:example.org",
"Alice",
"!room:example.org",
"General",
)
assert plain == "@Alice has joined #General."
assert "<a href='matrix:u/alice:example.org?action=chat'>@Alice</a>" in html
assert "<a href='matrix:roomid/room:example.org'>#General</a>" in html
def test_matrix_uri_helpers_are_consistent():
from community.bot import CommunityBot
bot = CommunityBot.__new__(CommunityBot)
bot.config = {}
assert bot._matrix_user_uri("@alice:example.org") == "matrix:u/alice:example.org?action=chat"
assert bot._matrix_room_uri("!roomid:example.org", "#general:example.org") == "matrix:r/general:example.org"
assert bot._matrix_room_uri("!roomid:example.org", None) == "matrix:roomid/roomid:example.org"
assert bot._matrix_event_uri("!roomid:example.org", "$eventid") == "matrix:roomid/roomid:example.org/e/eventid"
Executable → Regular
View File