From a92759c10010472968d1b97451787fed89701b3a Mon Sep 17 00:00:00 2001 From: William Kray Date: Tue, 9 Sep 2025 21:21:51 -0700 Subject: [PATCH] formatting --- community/bot.py | 1247 +++++++++++++-------- community/helpers/__init__.py | 15 +- community/helpers/base_command_handler.py | 114 +- community/helpers/common_utils.py | 20 +- community/helpers/config_manager.py | 133 ++- community/helpers/database_utils.py | 80 +- community/helpers/decorators.py | 12 +- community/helpers/diagnostic_utils.py | 201 ++-- community/helpers/message_utils.py | 26 +- community/helpers/report_utils.py | 66 +- community/helpers/response_builder.py | 132 ++- community/helpers/room_creation_utils.py | 174 +-- community/helpers/room_utils.py | 65 +- community/helpers/user_utils.py | 62 +- 14 files changed, 1380 insertions(+), 967 deletions(-) diff --git a/community/bot.py b/community/bot.py index 90bfd53..97fdc3c 100644 --- a/community/bot.py +++ b/community/bot.py @@ -35,7 +35,7 @@ from mautrix.types import ( SpaceParentStateEventContent, JoinRulesStateEventContent, JoinRule, - RoomCreatePreset + RoomCreatePreset, ) from mautrix.errors import MNotFound from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper @@ -48,7 +48,20 @@ BAN_STATE_EVENT = EventType.find("m.policy.rule.user", EventType.Class.STATE) from .db import upgrade_table # Helper modules -from .helpers import message_utils, room_utils, user_utils, database_utils, report_utils, decorators, common_utils, room_creation_utils, config_manager, response_builder, diagnostic_utils, base_command_handler +from .helpers import ( + message_utils, + room_utils, + user_utils, + database_utils, + report_utils, + decorators, + common_utils, + room_creation_utils, + config_manager, + response_builder, + diagnostic_utils, + base_command_handler, +) class Config(BaseProxyConfig): @@ -102,7 +115,9 @@ class CommunityBot(Plugin): self._redaction_tasks.cancel() await super().stop() - async def user_permitted(self, user_id: UserID, min_level: int = 50, room_id: str = None) -> bool: + async def user_permitted( + self, user_id: UserID, min_level: int = 50, room_id: str = None + ) -> bool: """Check if a user has sufficient power level in a room. Args: @@ -114,15 +129,20 @@ class CommunityBot(Plugin): bool: True if user has sufficient power level """ return await user_utils.user_permitted( - self.client, user_id, self.config["parent_room"], min_level, room_id, self.log + self.client, + user_id, + self.config["parent_room"], + min_level, + room_id, + self.log, ) def generate_community_slug(self, community_name: str) -> str: """Generate a community slug from the community name. - + Args: community_name: The full community name - + Returns: str: A slug made from the first letter of each word, lowercase """ @@ -130,31 +150,37 @@ class CommunityBot(Plugin): async def validate_room_alias(self, alias_localpart: str, server: str) -> bool: """Check if a room alias already exists. - + Args: alias_localpart: The localpart of the alias (without # and :server) server: The server domain - + Returns: bool: True if alias is available, False if it already exists """ - return await room_utils.validate_room_alias(self.client, alias_localpart, server) + return await room_utils.validate_room_alias( + self.client, alias_localpart, server + ) - async def validate_room_aliases(self, room_names: list[str], evt: MessageEvent = None) -> tuple[bool, list[str]]: + async def validate_room_aliases( + self, room_names: list[str], evt: MessageEvent = None + ) -> tuple[bool, list[str]]: """Validate that all room aliases are available. - + Args: room_names: List of room names to validate evt: Optional MessageEvent for progress updates - + Returns: tuple: (is_valid, list_of_conflicting_aliases) """ if not self.config.get("community_slug", ""): if evt: - await evt.respond("Error: No community slug configured. Please run initialize command first.") + await evt.respond( + "Error: No community slug configured. Please run initialize command first." + ) return False, [] - + server = self.client.parse_user_id(self.client.mxid)[1] return await room_utils.validate_room_aliases( self.client, room_names, self.config.get("community_slug", ""), server @@ -162,20 +188,27 @@ class CommunityBot(Plugin): async def get_moderators_and_above(self) -> list[str]: """Get list of users with moderator or higher permissions from the parent space. - + Returns: list: List of user IDs with power level >= 50 (moderator or above) """ - return await room_utils.get_moderators_and_above(self.client, self.config.get("parent_room", "")) + return await room_utils.get_moderators_and_above( + self.client, self.config.get("parent_room", "") + ) - async def create_space(self, space_name: str, evt: MessageEvent = None, power_level_override: Optional[PowerLevelStateEventContent] = None) -> tuple[str, str]: + async def create_space( + self, + space_name: str, + evt: MessageEvent = None, + power_level_override: Optional[PowerLevelStateEventContent] = None, + ) -> tuple[str, str]: """Create a new space without community slug suffix. - + Args: space_name: The name for the new space evt: Optional MessageEvent for progress updates power_level_override: Optional power levels to use - + Returns: tuple: (space_id, space_alias) if successful, None if failed """ @@ -204,20 +237,27 @@ class CommunityBot(Plugin): creation_content = { "type": "m.space", "m.federate": True, - "m.room.history_visibility": "joined" + "m.room.history_visibility": "joined", } - + # For modern room versions (12+), remove the bot from power levels # as creators have unlimited power by default and cannot appear in power levels - if self.is_modern_room_version(self.config.get("room_version", "1")) and power_level_override: - self.log.info(f"Modern room version {self.config.get('room_version', '1')} detected - removing bot from power levels") + if ( + self.is_modern_room_version(self.config.get("room_version", "1")) + and power_level_override + ): + self.log.info( + f"Modern room version {self.config.get('room_version', '1')} detected - removing bot from power levels" + ) if power_level_override.users: # Remove bot from users list but keep other important settings power_level_override.users.pop(self.client.mxid, None) - + # Create the space with space-specific content # Note: room_version is set via the room_version parameter, not creation_content - self.log.info(f"Creating space with room_version={self.config.get('room_version', '1')}") + self.log.info( + f"Creating space with room_version={self.config.get('room_version', '1')}" + ) self.log.info(f"Creation content: {creation_content}") self.log.info(f"Calling client.create_room with parameters:") self.log.info(f" - alias_localpart: {sanitized_name}") @@ -226,23 +266,29 @@ class CommunityBot(Plugin): self.log.info(f" - power_level_override: {power_level_override}") self.log.info(f" - creation_content: {creation_content}") self.log.info(f" - room_version: {self.config.get('room_version', '1')}") - + space_id = await self.client.create_room( alias_localpart=sanitized_name, name=space_name, invitees=invitees, power_level_override=power_level_override, creation_content=creation_content, - room_version=self.config.get("room_version", "1") + room_version=self.config.get("room_version", "1"), ) # Verify the space version and type were set correctly try: - actual_version, actual_creators = await self.get_room_version_and_creators(space_id) - self.log.info(f"Space {space_id} created with version {actual_version} (requested: {self.config.get('room_version', '1')})") + actual_version, actual_creators = ( + await self.get_room_version_and_creators(space_id) + ) + self.log.info( + f"Space {space_id} created with version {actual_version} (requested: {self.config.get('room_version', '1')})" + ) if actual_version != self.config.get("room_version", "1"): - self.log.warning(f"Space version mismatch: requested {self.config.get('room_version', '1')}, got {actual_version}") - + self.log.warning( + f"Space version mismatch: requested {self.config.get('room_version', '1')}, got {actual_version}" + ) + # Verify the space type was set state_events = await self.client.get_state(space_id) space_type_set = False @@ -250,26 +296,30 @@ class CommunityBot(Plugin): if event.type == EventType.ROOM_CREATE: space_type = event.content.get("type") self.log.info(f"Space creation event type: {space_type}") - space_type_set = (space_type == "m.space") + space_type_set = space_type == "m.space" break - + if not space_type_set: self.log.error(f"Space type was not set correctly in {space_id}") # Try to set the space type after creation as a fallback try: - self.log.info(f"Attempting to set space type after creation for {space_id}") + self.log.info( + f"Attempting to set space type after creation for {space_id}" + ) await self.client.send_state_event( space_id, EventType.ROOM_CREATE, {"type": "m.space"}, - state_key="" + state_key="", + ) + self.log.info( + f"Successfully set space type after creation for {space_id}" ) - self.log.info(f"Successfully set space type after creation for {space_id}") except Exception as e2: self.log.error(f"Failed to set space type after creation: {e2}") else: self.log.info(f"Space type verified as 'm.space' in {space_id}") - + except Exception as e: self.log.warning(f"Could not verify space creation: {e}") @@ -277,7 +327,7 @@ class CommunityBot(Plugin): await evt.respond( f"#{sanitized_name}:{server} has been created.", edits=mymsg, - allow_html=True + allow_html=True, ) self.log.info(f"Space creation completed successfully: {space_id}") @@ -363,19 +413,19 @@ class CommunityBot(Plugin): async def get_space_roomlist(self) -> list[str]: space = self.config["parent_room"] rooms = [] - + # Check if parent room is configured if not space: self.log.warning("No parent room configured, cannot get space roomlist") return rooms - + try: self.log.debug(f"DEBUG getting roomlist from {space} space") state = await self.client.get_state(space) for evt in state: if evt.type == EventType.SPACE_CHILD: - # only look for rooms that include a via path, otherwise they - # are not really in the space! + # only look for rooms that include a via path, otherwise they + # are not really in the space! if evt.content and evt.content.via: rooms.append(evt.state_key) except Exception as e: @@ -403,17 +453,19 @@ class CommunityBot(Plugin): ) kick_inactive_results = await self.database.fetch(kick_q, kick_days_ago) ignored_results = await self.database.fetch(ignored_q) - + database_results = { "warn_inactive": warn_inactive_results, "kick_inactive": kick_inactive_results, - "ignored": ignored_results + "ignored": ignored_results, } - + return report_utils.generate_activity_report(database_results) def flag_message(self, msg): - return message_utils.flag_message(msg, self.config["censor_wordlist"], self.config["censor_files"]) + return message_utils.flag_message( + msg, self.config["censor_wordlist"], self.config["censor_files"] + ) def flag_instaban(self, msg): return message_utils.flag_instaban(msg, self.config["censor_wordlist_instaban"]) @@ -422,13 +474,19 @@ class CommunityBot(Plugin): return message_utils.censor_room(msg, self.config["censor"]) async def check_if_banned(self, userid): - return await user_utils.check_if_banned(self.client, userid, self.config["banlists"], self.log) + return await user_utils.check_if_banned( + self.client, userid, self.config["banlists"], self.log + ) async def get_messages_to_redact(self, room_id, mxid): - return await database_utils.get_messages_to_redact(self.client, room_id, mxid, self.log) + return await database_utils.get_messages_to_redact( + self.client, room_id, mxid, self.log + ) async def redact_messages(self, room_id): - return await database_utils.redact_messages(self.client, self.database, room_id, self.config["sleep"], self.log) + return await database_utils.redact_messages( + self.client, self.database, room_id, self.config["sleep"], self.log + ) async def check_bot_permissions( self, @@ -650,22 +708,33 @@ class CommunityBot(Plugin): roomlist = await self.get_space_roomlist() # don't forget to kick from the space itself roomlist.append(self.config["parent_room"]) - + return await user_utils.ban_user_from_rooms( - self.client, user, roomlist, reason, all_rooms, - self.config["redact_on_ban"], self.get_messages_to_redact, - self.database, self.config["sleep"], self.log + self.client, + user, + roomlist, + reason, + all_rooms, + self.config["redact_on_ban"], + self.get_messages_to_redact, + self.database, + self.config["sleep"], + self.log, ) async def get_banlist_roomids(self): - return await user_utils.get_banlist_roomids(self.client, self.config["banlists"], self.log) + return await user_utils.get_banlist_roomids( + self.client, self.config["banlists"], self.log + ) - async def get_room_version_and_creators(self, room_id: str) -> tuple[str, list[str]]: + async def get_room_version_and_creators( + self, room_id: str + ) -> tuple[str, list[str]]: """Get the room version and creators for a room. - + Args: room_id: The room ID to check - + Returns: tuple: (room_version, list_of_creators) """ @@ -673,10 +742,10 @@ class CommunityBot(Plugin): def is_modern_room_version(self, room_version: str) -> bool: """Check if a room version is 12 or newer (modern room versions). - + Args: room_version: The room version string to check - + Returns: bool: True if room version is 12 or newer """ @@ -684,11 +753,11 @@ class CommunityBot(Plugin): async def user_has_unlimited_power(self, user_id: UserID, room_id: str) -> bool: """Check if a user has unlimited power in a room (creator in modern room versions). - + Args: user_id: The user ID to check room_id: The room ID to check in - + Returns: bool: True if user has unlimited power """ @@ -750,7 +819,9 @@ class CommunityBot(Plugin): if room_id == self.config["parent_room"]: continue - roomname = await common_utils.get_room_name(self.client, room_id, self.log) + roomname = await common_utils.get_room_name( + self.client, room_id, self.log + ) # Get current power levels try: @@ -808,41 +879,63 @@ class CommunityBot(Plugin): async def handle_leave_events(self, evt: StateEvent) -> None: """Common logic for handling membership changes (leave/kick/ban).""" if evt.source & SyncStream.STATE: - self.log.debug(f"Sync stream leave event for {evt.state_key} in {evt.room_id} detected") + self.log.debug( + f"Sync stream leave event for {evt.state_key} in {evt.room_id} detected" + ) return else: - # check if the room the person left is protected by check_if_human - # kick and ban events are sent by other people, so we need to use the state_key - # when referring to the user who left - user_id = evt.state_key - self.log.debug(f"membership change event for {user_id} in {evt.room_id} detected") - if ( - isinstance(self.config["check_if_human"], bool) and self.config["check_if_human"] - ) or ( - isinstance(self.config["check_if_human"], list) and evt.room_id in self.config["check_if_human"] - ): - self.log.debug(f"Checking if {user_id} is a verified user in {evt.room_id}") - - # Check if user has unlimited power (creator in modern room versions) - if await self.user_has_unlimited_power(user_id, evt.room_id): - self.log.debug(f"User {user_id} has unlimited power in {evt.room_id}, skipping power level cleanup") - return - - pl_state = await self.client.get_state_event(evt.room_id, EventType.ROOM_POWER_LEVELS) + # check if the room the person left is protected by check_if_human + # kick and ban events are sent by other people, so we need to use the state_key + # when referring to the user who left + user_id = evt.state_key + self.log.debug( + f"membership change event for {user_id} in {evt.room_id} detected" + ) + if ( + isinstance(self.config["check_if_human"], bool) + and self.config["check_if_human"] + ) or ( + isinstance(self.config["check_if_human"], list) + and evt.room_id in self.config["check_if_human"] + ): + self.log.debug( + f"Checking if {user_id} is a verified user in {evt.room_id}" + ) + + # Check if user has unlimited power (creator in modern room versions) + if await self.user_has_unlimited_power(user_id, evt.room_id): + self.log.debug( + f"User {user_id} has unlimited power in {evt.room_id}, skipping power level cleanup" + ) + return + + pl_state = await self.client.get_state_event( + evt.room_id, EventType.ROOM_POWER_LEVELS + ) + try: + user_level = pl_state.get_user_level(user_id) + except Exception as e: + self.log.error( + f"Failed to get user level for {user_id} in {evt.room_id}: {e}" + ) + return + default_level = pl_state.users_default + self.log.debug( + f"User {user_id} has power level {user_level}, default level is {default_level}" + ) + if user_level == (default_level + 1): # indicates verified user + self.log.debug( + f"Removing {user_id} from power levels state event in {evt.room_id}" + ) + pl_state.users.pop(user_id) try: - user_level = pl_state.get_user_level(user_id) + await self.client.send_state_event( + evt.room_id, EventType.ROOM_POWER_LEVELS, pl_state + ) except Exception as e: - self.log.error(f"Failed to get user level for {user_id} in {evt.room_id}: {e}") - return - default_level = pl_state.users_default - self.log.debug(f"User {user_id} has power level {user_level}, default level is {default_level}") - if user_level == ( default_level + 1 ): # indicates verified user - self.log.debug(f"Removing {user_id} from power levels state event in {evt.room_id}") - pl_state.users.pop(user_id) - try: - await self.client.send_state_event(evt.room_id, EventType.ROOM_POWER_LEVELS, pl_state) - except Exception as e: - self.log.error(f"Failed to update power levels state event in {evt.room_id}: {e}") + self.log.error( + f"Failed to update power levels state event in {evt.room_id}: {e}" + ) @event.on(InternalEventType.LEAVE) async def handle_leave(self, evt: StateEvent) -> None: @@ -870,7 +963,7 @@ class CommunityBot(Plugin): space_rooms = await self.get_space_roomlist() if evt.room_id not in space_rooms: return - + try: on_banlist = await self.check_if_banned(evt.sender) except Exception as e: @@ -887,8 +980,10 @@ class CommunityBot(Plugin): self.log.debug(f"New join in room {room_id} by {evt.sender}") self.log.debug(f"Greeting rooms config: {self.config['greeting_rooms']}") self.log.debug(f"Check if human config: {self.config['check_if_human']}") - self.log.debug(f"Verification phrases config: {self.config['verification_phrases']}") - + self.log.debug( + f"Verification phrases config: {self.config['verification_phrases']}" + ) + if room_id in self.config["greeting_rooms"]: if on_banlist: return @@ -924,39 +1019,53 @@ class CommunityBot(Plugin): if isinstance(self.config["check_if_human"], bool): verification_enabled = self.config["check_if_human"] elif isinstance(self.config["check_if_human"], list): - verification_enabled = evt.room_id in self.config["check_if_human"] - - self.log.debug(f"Verification enabled for room {room_id}: {verification_enabled}") - + verification_enabled = ( + evt.room_id in self.config["check_if_human"] + ) + + self.log.debug( + f"Verification enabled for room {room_id}: {verification_enabled}" + ) + if not verification_enabled: return # Get room name for greeting roomname = "this room" - roomname = await common_utils.get_room_name(self.client, evt.room_id, self.log) + roomname = await common_utils.get_room_name( + self.client, evt.room_id, self.log + ) # Check if user already has sufficient power level or unlimited power try: # First check if user has unlimited power (creator in modern room versions) if await self.user_has_unlimited_power(evt.sender, evt.room_id): - self.log.debug(f"User {evt.sender} has unlimited power in {evt.room_id}, skipping verification") + self.log.debug( + f"User {evt.sender} has unlimited power in {evt.room_id}, skipping verification" + ) return - + power_levels = await self.client.get_state_event( evt.room_id, EventType.ROOM_POWER_LEVELS ) user_level = power_levels.get_user_level(evt.sender) events_default = power_levels.events_default events = power_levels.events - + # Get the required power level for sending messages - required_level = events.get(str(EventType.ROOM_MESSAGE), events_default) - - self.log.debug(f"User {evt.sender} has power level {user_level}, required level is {required_level}") - + required_level = events.get( + str(EventType.ROOM_MESSAGE), events_default + ) + + self.log.debug( + f"User {evt.sender} has power level {user_level}, required level is {required_level}" + ) + # If user already has sufficient power level, skip verification if user_level >= required_level: - self.log.debug(f"User {evt.sender} already has sufficient power level ({user_level} >= {required_level})") + self.log.debug( + f"User {evt.sender} already has sufficient power level ({user_level} >= {required_level})" + ) return except Exception as e: self.log.error(f"Failed to check user power level: {e}") @@ -966,7 +1075,7 @@ class CommunityBot(Plugin): max_retries = 3 retry_delay = 1 # seconds last_error = None - + for attempt in range(max_retries): try: dm_room = await self.client.create_room( @@ -976,41 +1085,52 @@ class CommunityBot(Plugin): initial_state=[ { "type": str(EventType.ROOM_NAME), - "content": {"name": f"[{roomname}] join verification"} + "content": { + "name": f"[{roomname}] join verification" + }, } - ] + ], ) self.log.info(f"Created DM room {dm_room} for {evt.sender}") break except Exception as e: last_error = e - if attempt < max_retries - 1: # Don't sleep on the last attempt - self.log.warning(f"Failed to create DM room (attempt {attempt + 1}/{max_retries}): {e}") + if ( + attempt < max_retries - 1 + ): # Don't sleep on the last attempt + self.log.warning( + f"Failed to create DM room (attempt {attempt + 1}/{max_retries}): {e}" + ) await asyncio.sleep(retry_delay) else: - self.log.error(f"Failed to initiate verification process after {max_retries} attempts: {e}") + self.log.error( + f"Failed to initiate verification process after {max_retries} attempts: {e}" + ) return # Select random verification phrase - verification_phrase = random.choice(self.config["verification_phrases"]) - + verification_phrase = random.choice( + self.config["verification_phrases"] + ) + # Store verification state verification_state = { "user": evt.sender, "target_room": evt.room_id, "phrase": verification_phrase, "attempts": self.config["verification_attempts"], - "required_level": required_level + "required_level": required_level, } await self.store_verification_state(dm_room, verification_state) # Send greeting greeting = self.config["verification_message"].format( - room=roomname, - phrase=verification_phrase + room=roomname, phrase=verification_phrase ) await self.client.send_notice(dm_room, html=greeting) - self.log.info(f"Started verification process for {evt.sender} in room {room_id} for room {roomname}") + self.log.info( + f"Started verification process for {evt.sender} in room {room_id} for room {roomname}" + ) except Exception as e: self.log.error(f"Failed to start verification process: {e}") @@ -1026,20 +1146,23 @@ class CommunityBot(Plugin): # self.log.debug(f"No verification state stored for {evt.room_id}") return - #self.log.debug(f"Checking verification for {evt.sender} in {evt.room_id}") + # self.log.debug(f"Checking verification for {evt.sender} in {evt.room_id}") user_phrase = evt.content.body.strip().lower() expected_phrase = state["phrase"].lower() # Remove punctuation and compare - user_phrase = re.sub(r'[^\w\s]', '', user_phrase) - expected_phrase = re.sub(r'[^\w\s]', '', expected_phrase) + user_phrase = re.sub(r"[^\w\s]", "", user_phrase) + expected_phrase = re.sub(r"[^\w\s]", "", expected_phrase) if user_phrase == expected_phrase: try: # confirm user is still in target room members = await self.client.get_joined_members(state["target_room"]) if state["user"] not in members: - await self.client.send_notice(evt.room_id, "Looks like you've left the target room. Rejoin to try again.") + await self.client.send_notice( + evt.room_id, + "Looks like you've left the target room. Rejoin to try again.", + ) else: # Update power levels in target room power_levels = await self.client.get_state_event( @@ -1049,16 +1172,19 @@ class CommunityBot(Plugin): await self.client.send_state_event( state["target_room"], EventType.ROOM_POWER_LEVELS, power_levels ) - await self.client.send_notice(evt.room_id, "Success! My work here is done. You can leave this room now.") + await self.client.send_notice( + evt.room_id, + "Success! My work here is done. You can leave this room now.", + ) except Exception as e: await self.client.send_notice( - evt.room_id, - f"Something went wrong: {str(e)}. Please report this to the room moderators." + evt.room_id, + f"Something went wrong: {str(e)}. Please report this to the room moderators.", ) if self.config["notification_room"]: await self.client.send_notice( self.config["notification_room"], - f"User verification failed for {evt.sender} in room {evt.room_id}, you may need to manually verify them." + f"User verification failed for {evt.sender} in room {evt.room_id}, you may need to manually verify them.", ) finally: await self.client.leave_room(evt.room_id) @@ -1067,26 +1193,28 @@ class CommunityBot(Plugin): state["attempts"] -= 1 if state["attempts"] <= 0: await self.client.send_notice( - evt.room_id, - "You have run out of attempts. Please contact a room moderator for assistance." + evt.room_id, + "You have run out of attempts. Please contact a room moderator for assistance.", ) if self.config["notification_room"]: await self.client.send_notice( self.config["notification_room"], - f"User verification failed for {evt.sender} in room {evt.room_id}, you may need to manually verify them." + f"User verification failed for {evt.sender} in room {evt.room_id}, you may need to manually verify them.", ) await self.client.leave_room(evt.room_id) await self.delete_verification_state(evt.room_id) else: await self.store_verification_state(evt.room_id, state) await self.client.send_notice( - evt.room_id, - f"Phrase does not match, you have {state['attempts']} tries remaining." + evt.room_id, + f"Phrase does not match, you have {state['attempts']} tries remaining.", ) async def upsert_user_timestamp(self, mxid: str, timestamp: int) -> None: """Database-agnostic upsert for user timestamp updates.""" - await database_utils.upsert_user_timestamp(self.database, mxid, timestamp, self.log) + await database_utils.upsert_user_timestamp( + self.database, mxid, timestamp, self.log + ) @event.on(EventType.ROOM_MESSAGE) async def update_message_timestamp(self, evt: MessageEvent) -> None: @@ -1166,18 +1294,16 @@ class CommunityBot(Plugin): return False return True - @community.subcommand( - "user", help="manage users in the community" - ) + @community.subcommand("user", help="manage users in the community") @decorators.require_parent_room @decorators.require_permission() async def user(self, evt: MessageEvent) -> None: """Main user command - shows usage by default""" - await evt.reply("Use !community user to manage users. Available subcommands: bancheck, ban, unban, kick, ignore, unignore, redact") + await evt.reply( + "Use !community user to manage users. Available subcommands: bancheck, ban, unban, kick, ignore, unignore, redact" + ) - @user.subcommand( - "bancheck", help="check subscribed banlists for a user's mxid" - ) + @user.subcommand("bancheck", help="check subscribed banlists for a user's mxid") @command.argument("mxid", "full matrix ID", required=True) async def user_bancheck(self, evt: MessageEvent, mxid: UserID) -> None: if not await self.check_parent_room(evt): @@ -1228,9 +1354,7 @@ class CommunityBot(Plugin): for room in roomlist: try: roomname = None - roomnamestate = await self.client.get_state_event( - room, "m.room.name" - ) + roomnamestate = await self.client.get_state_event(room, "m.room.name") if roomnamestate: roomname = roomnamestate.name else: @@ -1293,7 +1417,8 @@ class CommunityBot(Plugin): await evt.react("✅") @user.subcommand( - "redact", help="redact messages from a specific user (optionally in a specific room)" + "redact", + help="redact messages from a specific user (optionally in a specific room)", ) @command.argument("mxid", "full matrix ID", required=True) @command.argument("room", "room ID", required=False) @@ -1373,9 +1498,7 @@ class CommunityBot(Plugin): allow_html=True, ) - @report.subcommand( - "all", help="generate a full report of all user activity status" - ) + @report.subcommand("all", help="generate a full report of all user activity status") @decorators.require_parent_room @decorators.require_permission() async def report_all(self, evt: MessageEvent) -> None: @@ -1418,7 +1541,8 @@ class CommunityBot(Plugin): ) @report.subcommand( - "purgable", help="generate a list of users that would be kicked with the purge command" + "purgable", + help="generate a list of users that would be kicked with the purge command", ) @decorators.require_parent_room @decorators.require_permission() @@ -1455,7 +1579,6 @@ class CommunityBot(Plugin): allow_html=True, ) - @community.subcommand("purge", help="kick users for excessive inactivity") @decorators.require_parent_room @decorators.require_permission() @@ -1481,9 +1604,7 @@ class CommunityBot(Plugin): ) roomname = roomnamestate["name"] - await self.client.get_state_event( - room, EventType.ROOM_MEMBER, user - ) + await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) await self.client.kick_user(room, user, reason="inactivity") if roomname: purge_list[user].append(roomname) @@ -1527,9 +1648,7 @@ class CommunityBot(Plugin): for room in roomlist: try: roomname = None - roomnamestate = await self.client.get_state_event( - room, "m.room.name" - ) + roomnamestate = await self.client.get_state_event(room, "m.room.name") roomname = roomnamestate["name"] await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) @@ -1555,24 +1674,36 @@ class CommunityBot(Plugin): # sync our database after we've made changes to room memberships await self.do_sync() - - async def create_room(self, roomname: str, evt: MessageEvent = None, power_level_override: Optional[PowerLevelStateEventContent] = None, creation_content: Optional[dict] = None, invitees: Optional[list[str]] = None) -> tuple[str, str] | None: + async def create_room( + self, + roomname: str, + evt: MessageEvent = None, + power_level_override: Optional[PowerLevelStateEventContent] = None, + creation_content: Optional[dict] = None, + invitees: Optional[list[str]] = None, + ) -> tuple[str, str] | None: """Create a new room and add it to the parent space. - + Args: roomname: The name for the new room evt: Optional MessageEvent for progress updates. If provided, will send status messages. power_level_override: Optional power levels to use. If not provided, will try to get from parent room. creation_content: Optional creation content to use when creating the room. invitees: Optional list of users to invite. If not provided, uses config invitees. - + Returns: tuple: (room_id, room_alias) if successful, None if failed """ mymsg = None try: # Validate and process room creation parameters - sanitized_name, force_encryption, force_unencryption, error_msg, cleaned_roomname = await room_creation_utils.validate_room_creation_params( + ( + sanitized_name, + force_encryption, + force_unencryption, + error_msg, + cleaned_roomname, + ) = await room_creation_utils.validate_room_creation_params( roomname, self.config, evt ) if error_msg: @@ -1582,10 +1713,12 @@ class CommunityBot(Plugin): return None # Prepare room creation data - alias_localpart, server, room_invitees, parent_room = await room_creation_utils.prepare_room_creation_data( - sanitized_name, self.config, self.client, invitees + alias_localpart, server, room_invitees, parent_room = ( + await room_creation_utils.prepare_room_creation_data( + sanitized_name, self.config, self.client, invitees + ) ) - + # Validate that the alias is available is_available = await self.validate_room_alias(alias_localpart, server) if not is_available: @@ -1604,14 +1737,19 @@ class CommunityBot(Plugin): except Exception as e: self.log.error(f"Failed to prepare power levels: {e}") raise - + # Adjust power levels for modern rooms power_levels = room_creation_utils.adjust_power_levels_for_modern_rooms( power_levels, self.config["room_version"] ) - - if self.is_modern_room_version(self.config["room_version"]) and power_levels: - self.log.info(f"Modern room version {self.config['room_version']} detected - removing bot from power levels") + + if ( + self.is_modern_room_version(self.config["room_version"]) + and power_levels + ): + self.log.info( + f"Modern room version {self.config['room_version']} detected - removing bot from power levels" + ) if power_levels.users: power_levels.users.pop(self.client.mxid, None) @@ -1622,16 +1760,25 @@ class CommunityBot(Plugin): # Prepare initial state events initial_state = room_creation_utils.prepare_initial_state( - self.config, parent_room, server, force_encryption, force_unencryption, creation_content + self.config, + parent_room, + server, + force_encryption, + force_unencryption, + creation_content, ) - + # Create the room - self.log.info(f"Creating room with room_version={self.config['room_version']}") + self.log.info( + f"Creating room with room_version={self.config['room_version']}" + ) if power_levels: - self.log.info(f"Power level override users: {list(power_levels.users.keys()) if power_levels.users else 'None'}") + self.log.info( + f"Power level override users: {list(power_levels.users.keys()) if power_levels.users else 'None'}" + ) else: self.log.info("No power level override") - + try: room_id = await self.client.create_room( alias_localpart=alias_localpart, @@ -1640,7 +1787,7 @@ class CommunityBot(Plugin): initial_state=initial_state, power_level_override=power_levels, creation_content=creation_content, - room_version=self.config["room_version"] + room_version=self.config["room_version"], ) self.log.info(f"Room created successfully: {room_id}") except Exception as e: @@ -1661,7 +1808,7 @@ class CommunityBot(Plugin): await evt.respond( f"#{alias_localpart}:{server} has been created and added to the space.", edits=mymsg, - allow_html=True + allow_html=True, ) return room_id, f"#{alias_localpart}:{server}" @@ -1675,14 +1822,14 @@ class CommunityBot(Plugin): await evt.respond(error_msg) return None - @community.subcommand( - "room", help="manage rooms in the community" - ) + @community.subcommand("room", help="manage rooms in the community") @decorators.require_parent_room @decorators.require_permission() async def room(self, evt: MessageEvent) -> None: """Main room command - shows usage by default""" - await evt.reply("Use !community room to manage rooms. Available subcommands: create, archive, replace, guests, id, version, setpower") + await evt.reply( + "Use !community room to manage rooms. Available subcommands: create, archive, replace, guests, id, version, setpower" + ) @room.subcommand( "create", @@ -1703,13 +1850,19 @@ class CommunityBot(Plugin): # Check if community slug is configured if not self.config["community_slug"]: - await evt.reply("No community slug configured. Please run initialize command first.") + await evt.reply( + "No community slug configured. Please run initialize command first." + ) return # Validate the room alias before creating - is_valid, conflicting_aliases = await self.validate_room_aliases([roomname], evt) + is_valid, conflicting_aliases = await self.validate_room_aliases( + [roomname], evt + ) if not is_valid: - await evt.reply(f"Cannot create room: {conflicting_aliases[0]} already exists.") + await evt.reply( + f"Cannot create room: {conflicting_aliases[0]} already exists." + ) return result = await self.create_room(roomname, evt) @@ -1756,7 +1909,7 @@ class CommunityBot(Plugin): async def room_replace(self, evt: MessageEvent, room: str) -> None: self.log.info(f"=== REPLACEROOM COMMAND STARTED ===") self.log.info(f"Command arguments: room='{room}', evt.room_id='{evt.room_id}'") - + await evt.mark_read() if not room: @@ -1776,7 +1929,9 @@ class CommunityBot(Plugin): has_perms, error_msg, _ = await self.check_bot_permissions( room_id, evt, ["state", "tombstone", "power_levels"] ) - self.log.info(f"Bot permissions check result: has_perms={has_perms}, error_msg='{error_msg}'") + self.log.info( + f"Bot permissions check result: has_perms={has_perms}, error_msg='{error_msg}'" + ) if not has_perms: await evt.respond(f"Cannot replace room: {error_msg}") self.log.info("Bot permissions check failed, returning") @@ -1811,37 +1966,48 @@ class CommunityBot(Plugin): self.log.info(f"=== SPACE DETECTION DEBUG START ===") self.log.info(f"Room ID being checked: {room_id}") self.log.info(f"EventType module: {EventType}") - self.log.info(f"EventType.ROOM_CREATE exists: {hasattr(EventType, 'ROOM_CREATE')}") - if hasattr(EventType, 'ROOM_CREATE'): - self.log.info(f"EventType.ROOM_CREATE value: {getattr(EventType, 'ROOM_CREATE')}") + self.log.info( + f"EventType.ROOM_CREATE exists: {hasattr(EventType, 'ROOM_CREATE')}" + ) + if hasattr(EventType, "ROOM_CREATE"): + self.log.info( + f"EventType.ROOM_CREATE value: {getattr(EventType, 'ROOM_CREATE')}" + ) else: self.log.warning("EventType.ROOM_CREATE does not exist!") - + try: # Get the room creation event to check if it's a space state_events = await self.client.get_state(room_id) - self.log.info(f"Retrieved {len(state_events)} state events for space detection") - + self.log.info( + f"Retrieved {len(state_events)} state events for space detection" + ) + # Log all event types for debugging event_types = [event.type for event in state_events] self.log.info(f"Event types found: {event_types}") - + # Debug EventType.ROOM_CREATE constant self.log.info(f"EventType.ROOM_CREATE value: {EventType.ROOM_CREATE}") self.log.info(f"EventType.ROOM_CREATE type: {type(EventType.ROOM_CREATE)}") - + # Also try string comparison as fallback room_create_string = "m.room.create" self.log.info(f"String comparison value: {room_create_string}") - + # Try to find the room creation event using multiple methods room_create_event = None - + for i, event in enumerate(state_events): - self.log.info(f"Event {i}: type={event.type} (type: {type(event.type)})") - + self.log.info( + f"Event {i}: type={event.type} (type: {type(event.type)})" + ) + # Try multiple comparison methods - if hasattr(EventType, 'ROOM_CREATE') and event.type == EventType.ROOM_CREATE: + if ( + hasattr(EventType, "ROOM_CREATE") + and event.type == EventType.ROOM_CREATE + ): self.log.info(f"✓ Matched EventType.ROOM_CREATE") room_create_event = event break @@ -1855,29 +2021,34 @@ class CommunityBot(Plugin): break else: self.log.info(f"✗ No match for event {i}") - + # Now process the room creation event if found if room_create_event: space_type = room_create_event.content.get("type") self.log.info(f"Found ROOM_CREATE event with type: {space_type}") self.log.info(f"Full ROOM_CREATE content: {room_create_event.content}") - is_space = (space_type == "m.space") + is_space = space_type == "m.space" self.log.info(f"Space detection result: {is_space}") else: self.log.warning("No ROOM_CREATE event found using any method") - + if is_space: - self.log.info(f"✓ FINAL RESULT: Room {room_id} IS a space - will create new space") + self.log.info( + f"✓ FINAL RESULT: Room {room_id} IS a space - will create new space" + ) else: - self.log.info(f"✗ FINAL RESULT: Room {room_id} is NOT a space - will create regular room") - + self.log.info( + f"✗ FINAL RESULT: Room {room_id} is NOT a space - will create regular room" + ) + except Exception as e: self.log.error(f"❌ ERROR during space detection: {e}") import traceback + self.log.error(f"Traceback: {traceback.format_exc()}") # Assume it's not a space if we can't determine is_space = False - + self.log.info(f"=== SPACE DETECTION DEBUG END - is_space={is_space} ===") # Get list of aliases to transfer while removing them from the old room @@ -1885,17 +2056,21 @@ class CommunityBot(Plugin): # Check if community slug is configured if not self.config["community_slug"]: - await evt.respond("No community slug configured. Please run initialize command first.") + await evt.respond( + "No community slug configured. Please run initialize command first." + ) return # Inform user about what type of room is being replaced if not room_name: room_name = f"Room {room_id[:8]}..." # Fallback name self.log.warning(f"Using fallback room name: {room_name}") - - self.log.info(f"Final decision - is_space: {is_space}, room_name: '{room_name}'") + + self.log.info( + f"Final decision - is_space: {is_space}, room_name: '{room_name}'" + ) self.log.info(f"About to send user message - is_space: {is_space}") - + if is_space: await evt.respond(f"Replacing space '{room_name}' with a new space...") self.log.info(f"✓ Sent 'Replacing space' message to user") @@ -1904,9 +2079,13 @@ class CommunityBot(Plugin): self.log.info(f"✗ Sent 'Replacing room' message to user") # Validate that the new room alias is available - is_valid, conflicting_aliases = await self.validate_room_aliases([room_name], evt) + is_valid, conflicting_aliases = await self.validate_room_aliases( + [room_name], evt + ) if not is_valid: - await evt.respond(f"Cannot replace room: {conflicting_aliases[0]} already exists.") + await evt.respond( + f"Cannot replace room: {conflicting_aliases[0]} already exists." + ) return # Now we can start the process of replacing the room @@ -1918,12 +2097,16 @@ class CommunityBot(Plugin): # For spaces, we need to pass power_level_override to ensure proper creation # Get power levels from the old space to use as a template try: - old_power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS) - self.log.info(f"Using user power levels from old space for new space creation") - + old_power_levels = await self.client.get_state_event( + room_id, EventType.ROOM_POWER_LEVELS + ) + self.log.info( + f"Using user power levels from old space for new space creation" + ) + # Create new power levels with server defaults, not copying all permissions from old space power_levels = PowerLevelStateEventContent() - + # Copy only user power levels from old space, not the entire permission set if old_power_levels.users: user_power_levels = old_power_levels.users.copy() @@ -1934,15 +2117,17 @@ class CommunityBot(Plugin): power_levels.users = { self.client.mxid: 1000, # Bot gets highest power } - + # Set explicit config values power_levels.invite = self.config["invite_power_level"] - + # For other permissions, let the server use its defaults instead of copying from old space # This prevents issues like only admins being able to post messages - self.log.info(f"Using user power levels from old space but server defaults for other permissions") + self.log.info( + f"Using user power levels from old space but server defaults for other permissions" + ) power_level_override = power_levels - + # remove the bot's explicit power level for modern room versions # since creators have unlimited power in modern rooms if self.is_modern_room_version(self.config["room_version"]): @@ -1950,22 +2135,32 @@ class CommunityBot(Plugin): power_level_override.users.pop(self.client.mxid, None) self.log.info(f"Removed bot since they are creator") except Exception as e: - self.log.warning(f"Could not get power levels from old space, using defaults: {e}") + self.log.warning( + f"Could not get power levels from old space, using defaults: {e}" + ) power_level_override = None - - self.log.info(f"Calling create_space with room_name='{room_name}', power_level_override={power_level_override is not None}") - new_room_id, new_room_alias = await self.create_space(room_name, evt, power_level_override) - self.log.info(f"create_space returned: room_id={new_room_id}, alias={new_room_alias}") + + self.log.info( + f"Calling create_space with room_name='{room_name}', power_level_override={power_level_override is not None}" + ) + new_room_id, new_room_alias = await self.create_space( + room_name, evt, power_level_override + ) + self.log.info( + f"create_space returned: room_id={new_room_id}, alias={new_room_alias}" + ) else: # Create a regular room self.log.info(f"Calling create_room with room_name='{room_name}'") new_room_id, new_room_alias = await self.create_room(room_name, evt) - self.log.info(f"create_room returned: room_id={new_room_id}, alias={new_room_alias}") - + self.log.info( + f"create_room returned: room_id={new_room_id}, alias={new_room_alias}" + ) + if not new_room_id: await evt.respond("Failed to create new room") return - + # Ensure the new space is NOT added to the old space as a child room if is_space: try: @@ -1976,16 +2171,24 @@ class CommunityBot(Plugin): for event in state_events: if event.type == EventType.SPACE_PARENT: old_space_parent_events.append(event.state_key) - + if old_space_parent_events: - self.log.info(f"Old space has {len(old_space_parent_events)} parent space references - ensuring new space is not added as child") - await evt.respond(f"Note: Old space has {len(old_space_parent_events)} parent space references - new space will be independent") - + self.log.info( + f"Old space has {len(old_space_parent_events)} parent space references - ensuring new space is not added as child" + ) + await evt.respond( + f"Note: Old space has {len(old_space_parent_events)} parent space references - new space will be independent" + ) + # Also check if the old space is a child of the community parent space # and ensure the new space doesn't automatically inherit that relationship if room_id == self.config.get("parent_room"): - self.log.info("Old space is the community parent space - new space will be independent") - await evt.respond("Note: Old space is the community parent space - new space will be independent and may need manual configuration") + self.log.info( + "Old space is the community parent space - new space will be independent" + ) + await evt.respond( + "Note: Old space is the community parent space - new space will be independent and may need manual configuration" + ) except Exception as e: self.log.warning(f"Could not check old space parent references: {e}") @@ -2001,8 +2204,10 @@ class CommunityBot(Plugin): # Transfer the aliases to the new room/space if aliases_to_transfer: - await evt.respond(f"Transferring {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}...") - + await evt.respond( + f"Transferring {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}..." + ) + for alias in aliases_to_transfer: localpart = alias.split(":")[0][1:] # Remove # and get localpart server = alias.split(":")[1] @@ -2023,8 +2228,10 @@ class CommunityBot(Plugin): self.log.error( f"Failed to transfer modified alias {modified_alias}: {e2}" ) - - await evt.respond(f"Successfully transferred {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}") + + await evt.respond( + f"Successfully transferred {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}" + ) else: await evt.respond("No aliases to transfer") @@ -2041,9 +2248,13 @@ class CommunityBot(Plugin): self.log.info( f"Successfully copied {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'} {new_room_id}" ) - await evt.respond(f"Copied avatar to new {'space' if is_space else 'room'}") + await evt.respond( + f"Copied avatar to new {'space' if is_space else 'room'}" + ) except Exception as e: - self.log.error(f"Failed to copy {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'}: {e}") + self.log.error( + f"Failed to copy {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'}: {e}" + ) # await evt.respond(f"Failed to copy {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'}: {e}") # Set the room topic in the new room/space @@ -2052,10 +2263,16 @@ class CommunityBot(Plugin): await self.client.send_state_event( new_room_id, EventType.ROOM_TOPIC, {"topic": room_topic} ) - self.log.info(f"Successfully copied {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'} {new_room_id}") - await evt.respond(f"Copied topic to new {'space' if is_space else 'room'}") + self.log.info( + f"Successfully copied {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'} {new_room_id}" + ) + await evt.respond( + f"Copied topic to new {'space' if is_space else 'room'}" + ) except Exception as e: - self.log.error(f"Failed to copy {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'}: {e}") + self.log.error( + f"Failed to copy {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'}: {e}" + ) # await evt.respond(f"Failed to copy {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'}: {e}") else: await evt.respond("No topic to copy") @@ -2068,7 +2285,9 @@ class CommunityBot(Plugin): f"Failed to archive old {'space' if is_space else 'room'}, but new {'space' if is_space else 'room'} has been created" ) else: - await evt.respond(f"Successfully archived old {'space' if is_space else 'room'}") + await evt.respond( + f"Successfully archived old {'space' if is_space else 'room'}" + ) # If we're replacing a space, we need to handle child room relationships if is_space: @@ -2079,11 +2298,15 @@ class CommunityBot(Plugin): for event in state_events: if event.type == EventType.SPACE_CHILD: old_child_rooms.append(event.state_key) - + if old_child_rooms: - self.log.info(f"Found {len(old_child_rooms)} child rooms in old space") - await evt.respond(f"Migrating {len(old_child_rooms)} child rooms from old space to new space...") - + self.log.info( + f"Found {len(old_child_rooms)} child rooms in old space" + ) + await evt.respond( + f"Migrating {len(old_child_rooms)} child rooms from old space to new space..." + ) + # Update child rooms to point to the new space for child_room_id in old_child_rooms: try: @@ -2092,40 +2315,42 @@ class CommunityBot(Plugin): child_room_id, EventType.SPACE_PARENT, {}, # Empty content removes the state - state_key=room_id + state_key=room_id, ) # Add new space parent reference server = self.client.parse_user_id(self.client.mxid)[1] await self.client.send_state_event( child_room_id, EventType.SPACE_PARENT, - { - "via": [server], - "canonical": True - }, - state_key=new_room_id + {"via": [server], "canonical": True}, + state_key=new_room_id, ) # Update space child reference await self.client.send_state_event( new_room_id, EventType.SPACE_CHILD, - { - "via": [server], - "suggested": False - }, - state_key=child_room_id + {"via": [server], "suggested": False}, + state_key=child_room_id, + ) + self.log.info( + f"Updated child room {child_room_id} to point to new space" ) - self.log.info(f"Updated child room {child_room_id} to point to new space") await asyncio.sleep(self.config["sleep"]) except Exception as e: - self.log.error(f"Failed to update child room {child_room_id}: {e}") - - await evt.respond(f"Successfully migrated {len(old_child_rooms)} child rooms to new space") + self.log.error( + f"Failed to update child room {child_room_id}: {e}" + ) + + await evt.respond( + f"Successfully migrated {len(old_child_rooms)} child rooms to new space" + ) else: await evt.respond("No child rooms found in old space") except Exception as e: self.log.error(f"Failed to handle child room relationships: {e}") - await evt.respond(f"Warning: Failed to handle child room relationships: {e}") + await evt.respond( + f"Warning: Failed to handle child room relationships: {e}" + ) # update instances of the old room id in any config values that use it config_keys = [ @@ -2134,9 +2359,9 @@ class CommunityBot(Plugin): "censor", "check_if_human", "banlists", - "greeting_rooms" + "greeting_rooms", ] - + for key in config_keys: value = self.config[key] if isinstance(value, str): @@ -2145,7 +2370,9 @@ class CommunityBot(Plugin): elif isinstance(value, list): # Handle lists that might contain room IDs if room_id in value: - self.config[key] = [new_room_id if x == room_id else x for x in value] + self.config[key] = [ + new_room_id if x == room_id else x for x in value + ] elif isinstance(value, dict): # Handle dictionaries that might use room IDs as keys if room_id in value: @@ -2157,7 +2384,7 @@ class CommunityBot(Plugin): # Save the updated config self.config.save() - + # Final success message if is_space: await evt.respond( @@ -2213,9 +2440,7 @@ class CommunityBot(Plugin): except Exception as e: await evt.respond(f"something went wrong: {e}") - @room.subcommand( - "id", help="return the matrix room ID of this, or a given, room" - ) + @room.subcommand("id", help="return the matrix room ID of this, or a given, room") @command.argument("room", required=False) @decorators.require_parent_room @decorators.require_permission() @@ -2258,44 +2483,43 @@ class CommunityBot(Plugin): room_id = room else: room_id = evt.room_id - + try: room_version, creators = await self.get_room_version_and_creators(room_id) - + # Get room name if available room_name = room_id try: - room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME) + room_name_event = await self.client.get_state_event( + room_id, EventType.ROOM_NAME + ) room_name = room_name_event.name except: pass - + response = f"Room: {room_name}
" response += f"Room ID: {room_id}
" response += f"Room Version: {room_version}
" - + if creators: response += f"Creators: {', '.join(creators)}
" if self.is_modern_room_version(room_version): response += f"
ℹ️ Note: This room uses version {room_version}, which means creators have unlimited power and cannot be restricted by power levels." else: response += "Creators: None found
" - + await evt.reply(response, allow_html=True) except Exception as e: await evt.respond(f"something went wrong: {e}") @room.subcommand( - "setpower", help="sync user power levels from parent room to all child rooms. this will override existing user power levels in child rooms!" + "setpower", + help="sync user power levels from parent room to all child rooms. this will override existing user power levels in child rooms!", ) @command.argument("target_room", required=False) @decorators.require_parent_room @decorators.require_permission(min_level=100) - async def room_setpower( - self, - evt: MessageEvent, - target_room: str = None - ) -> None: + async def room_setpower(self, evt: MessageEvent, target_room: str = None) -> None: await evt.mark_read() if target_room: @@ -2305,11 +2529,10 @@ class CommunityBot(Plugin): roomlist = await self.get_space_roomlist() target_msg = "space rooms" - msg = await evt.respond( f"Syncing power levels from parent room to {target_msg}..." ) - + success_list = [] skipped_list = [] error_list = [] @@ -2319,13 +2542,17 @@ class CommunityBot(Plugin): parent_power_levels = await self.client.get_state_event( self.config["parent_room"], EventType.ROOM_POWER_LEVELS ) - parent_version, parent_creators = await self.get_room_version_and_creators(self.config["parent_room"]) - + parent_version, parent_creators = await self.get_room_version_and_creators( + self.config["parent_room"] + ) + self.log.info(f"Parent room version: {parent_version}") self.log.info(f"Parent room creators: {parent_creators}") self.log.info(f"Bot MXID: {self.client.mxid}") - self.log.info(f"Bot is creator in parent: {self.client.mxid in parent_creators}") - + self.log.info( + f"Bot is creator in parent: {self.client.mxid in parent_creators}" + ) + user_power_levels = parent_power_levels.users.copy() # Handle bot's power level based on room versions and actual creator status @@ -2334,15 +2561,21 @@ class CommunityBot(Plugin): if self.client.mxid in parent_creators: # Bot is a creator, remove from power levels to prevent errors user_power_levels.pop(self.client.mxid, None) - self.log.info(f"Parent room is modern (v{parent_version}), bot is creator and has unlimited power") + self.log.info( + f"Parent room is modern (v{parent_version}), bot is creator and has unlimited power" + ) else: # Bot is not a creator, set appropriate power level user_power_levels[self.client.mxid] = 1000 - self.log.info(f"Parent room is modern (v{parent_version}), bot is not creator, power level set to 1000") + self.log.info( + f"Parent room is modern (v{parent_version}), bot is not creator, power level set to 1000" + ) else: # In legacy parent rooms, ensure bot has highest power level user_power_levels[self.client.mxid] = 1000 - self.log.info(f"Parent room is legacy (v{parent_version}), bot power level set to 1000") + self.log.info( + f"Parent room is legacy (v{parent_version}), bot power level set to 1000" + ) for room in roomlist: try: @@ -2357,14 +2590,19 @@ class CommunityBot(Plugin): # Skip rooms that are protected by verification, unless its the only target room, # in which case we have explicitly asked to set power levels in that room - if ( - len(roomlist) > 1 and + if len(roomlist) > 1 and ( ( - (isinstance(self.config["check_if_human"], bool) and self.config["check_if_human"]) or - (isinstance(self.config["check_if_human"], list) and room in self.config["check_if_human"]) + isinstance(self.config["check_if_human"], bool) + and self.config["check_if_human"] + ) + or ( + isinstance(self.config["check_if_human"], list) + and room in self.config["check_if_human"] ) ): - self.log.info(f"Skipping {roomname or room} as it requires human verification. You can explicitly run this command for this room to override.") + self.log.info( + f"Skipping {roomname or room} as it requires human verification. You can explicitly run this command for this room to override." + ) skipped_list.append(roomname or room) continue @@ -2372,52 +2610,70 @@ class CommunityBot(Plugin): room_power_levels = await self.client.get_state_event( room, EventType.ROOM_POWER_LEVELS ) - room_version, room_creators = await self.get_room_version_and_creators(room) - - self.log.info(f"Processing room {roomname or room} (v{room_version}) - Parent is v{parent_version}") + room_version, room_creators = ( + await self.get_room_version_and_creators(room) + ) + + self.log.info( + f"Processing room {roomname or room} (v{room_version}) - Parent is v{parent_version}" + ) # Handle power level mapping based on room version differences if self.is_modern_room_version(room_version): # Target room is modern (v12+) - creators have unlimited power - self.log.info(f"Target room {roomname or room} is modern - preserving creator power levels") - + self.log.info( + f"Target room {roomname or room} is modern - preserving creator power levels" + ) + # Filter out any users who are creators in the target room filtered_user_power_levels = {} for user, level in user_power_levels.items(): if user not in room_creators: filtered_user_power_levels[user] = level else: - self.log.info(f"Skipping power level for creator {user} in modern room {roomname or room}") - + self.log.info( + f"Skipping power level for creator {user} in modern room {roomname or room}" + ) + # Preserve existing power levels for special cases (like verification rooms) # Only update non-creator users to avoid conflicts existing_users = set(room_power_levels.users.keys()) creators_set = set(room_creators) special_users = existing_users - creators_set - + # Keep existing power levels for special users unless explicitly overridden for user in special_users: if user not in filtered_user_power_levels: - filtered_user_power_levels[user] = room_power_levels.users[user] - self.log.info(f"Preserving existing power level for special user {user} in {roomname or room}") - + filtered_user_power_levels[user] = ( + room_power_levels.users[user] + ) + self.log.info( + f"Preserving existing power level for special user {user} in {roomname or room}" + ) + # Handle bot power level in modern target room if self.client.mxid in room_creators: # Bot is creator in target room - don't set power level - self.log.info(f"Bot is creator in modern target room {roomname or room} - no power level set") + self.log.info( + f"Bot is creator in modern target room {roomname or room} - no power level set" + ) else: # Bot is not creator in target room - set appropriate power level filtered_user_power_levels[self.client.mxid] = 1000 - self.log.info(f"Bot is not creator in modern target room {roomname or room} - power level set to 1000") - + self.log.info( + f"Bot is not creator in modern target room {roomname or room} - power level set to 1000" + ) + # Merge filtered power levels with existing room power levels room_power_levels.users.update(filtered_user_power_levels) - + elif self.is_modern_room_version(parent_version): # Target room is legacy but parent is modern # Map parent room "creators" to "admins" in legacy room - self.log.info(f"Target room {roomname or room} is legacy, parent is modern - mapping creators to admins") - + self.log.info( + f"Target room {roomname or room} is legacy, parent is modern - mapping creators to admins" + ) + # For legacy rooms, we can set all power levels including the bot # But map parent room creators to appropriate admin levels mapped_power_levels = {} @@ -2425,26 +2681,34 @@ class CommunityBot(Plugin): if user in parent_creators and user != self.client.mxid: # Map parent creators to admin level (100) in legacy rooms mapped_power_levels[user] = 100 - self.log.info(f"Mapping parent creator {user} to admin level 100 in legacy room {roomname or room}") + self.log.info( + f"Mapping parent creator {user} to admin level 100 in legacy room {roomname or room}" + ) else: mapped_power_levels[user] = level - + # Handle bot power level based on whether it's a creator in the parent if self.client.mxid in parent_creators: # Bot is a creator in parent, but this is a legacy room # Set bot to highest power level since creators don't have unlimited power in legacy rooms mapped_power_levels[self.client.mxid] = 1000 - self.log.info(f"Bot is creator in parent but target is legacy room - setting power level to 1000") + self.log.info( + f"Bot is creator in parent but target is legacy room - setting power level to 1000" + ) else: # Bot is not a creator in parent, set to highest power level mapped_power_levels[self.client.mxid] = 1000 - self.log.info(f"Bot is not creator in parent, setting power level to 1000 in legacy target room") - + self.log.info( + f"Bot is not creator in parent, setting power level to 1000 in legacy target room" + ) + room_power_levels.users = mapped_power_levels - + else: # Both rooms are legacy - direct power level transfer - self.log.info(f"Both rooms are legacy - direct power level transfer") + self.log.info( + f"Both rooms are legacy - direct power level transfer" + ) room_power_levels.users = user_power_levels # Send the updated power levels to this room @@ -2464,22 +2728,26 @@ class CommunityBot(Plugin): results += f"Parent room version: {parent_version}
" results += f"Parent room creators: {', '.join(parent_creators) if parent_creators else 'None'}
" results += f"Bot creator status: {'✅ Creator' if self.client.mxid in parent_creators else '❌ Not creator'} in parent room

" - + # Add explanation of power level mapping strategy if self.is_modern_room_version(parent_version): results += f"Mapping Strategy: Parent room is modern (v{parent_version}), creators have unlimited power.
" if self.client.mxid in parent_creators: results += "• Bot is creator in parent room (unlimited power)
" else: - results += "• Bot is not creator in parent room (power level 1000)
" + results += ( + "• Bot is not creator in parent room (power level 1000)
" + ) results += "• Parent creators mapped to admin level (100) in legacy child rooms
" results += "• Modern child rooms preserve their creator power levels

" else: results += f"Mapping Strategy: Parent room is legacy (v{parent_version}), using traditional power level system.
" - results += "• Bot power level set to 1000 for administrative control
" + results += ( + "• Bot power level set to 1000 for administrative control
" + ) results += "• Direct power level transfer to legacy child rooms
" results += "• Modern child rooms preserve their creator power levels

" - + if success_list: results += f"Successfully updated rooms:
{', '.join(success_list)}

" if skipped_list: @@ -2534,7 +2802,7 @@ class CommunityBot(Plugin): # Check if member has unlimited power if await self.user_has_unlimited_power(member, evt.room_id): continue # Skip creators with unlimited power - + current_level = power_levels.get_user_level(member) if current_level < required_level: power_levels.users[member] = required_level @@ -2546,7 +2814,7 @@ class CommunityBot(Plugin): await evt.respond( f"Room migration complete. Current members can send messages, new joiners will require verification.", - edits=msg + edits=msg, ) except Exception as e: @@ -2562,17 +2830,22 @@ class CommunityBot(Plugin): (dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, \ required_power_level) VALUES ($1, $2, $3, $4, $5, $6)""" - await self.database.execute(insert_query, dm_room_id, - state["user"], - state["target_room"], - state["phrase"], - state["attempts"], - state["required_level"] - ) + await self.database.execute( + insert_query, + dm_room_id, + state["user"], + state["target_room"], + state["phrase"], + state["attempts"], + state["required_level"], + ) self.log.debug(f"Inserted new verification state for {dm_room_id}") except Exception as e: # If insert fails (likely due to existing record), try update - if "UNIQUE constraint failed" in str(e) or "duplicate key" in str(e).lower(): + if ( + "UNIQUE constraint failed" in str(e) + or "duplicate key" in str(e).lower() + ): self.log.debug(f"Record exists for {dm_room_id}, updating instead") update_query = """UPDATE verification_states SET verification_phrase = $4, \ @@ -2581,13 +2854,15 @@ class CommunityBot(Plugin): user_id = $2, \ target_room_id = $3 \ WHERE dm_room_id = $1""" - await self.database.execute(update_query, dm_room_id, - state["user"], - state["target_room"], - state["phrase"], - state["attempts"], - state["required_level"] - ) + await self.database.execute( + update_query, + dm_room_id, + state["user"], + state["target_room"], + state["phrase"], + state["attempts"], + state["required_level"], + ) self.log.debug(f"Updated verification state for {dm_room_id}") else: # Re-raise if it's not a constraint violation @@ -2596,8 +2871,7 @@ class CommunityBot(Plugin): async def get_verification_state(self, dm_room_id: str) -> Optional[dict]: """Retrieve verification state from the database.""" row = await self.database.fetchrow( - "SELECT * FROM verification_states WHERE dm_room_id = $1", - dm_room_id + "SELECT * FROM verification_states WHERE dm_room_id = $1", dm_room_id ) if not row: return None @@ -2606,26 +2880,27 @@ class CommunityBot(Plugin): "target_room": row["target_room_id"], "phrase": row["verification_phrase"], "attempts": row["attempts_remaining"], - "required_level": row["required_power_level"] + "required_level": row["required_power_level"], } async def delete_verification_state(self, dm_room_id: str) -> None: """Delete verification state from the database.""" await self.database.execute( - "DELETE FROM verification_states WHERE dm_room_id = $1", - dm_room_id + "DELETE FROM verification_states WHERE dm_room_id = $1", dm_room_id ) async def cleanup_stale_verification_states(self) -> None: """Clean up verification states that are no longer valid.""" # Get all verification states states = await self.database.fetch("SELECT * FROM verification_states") - + for state in states: try: # Check if DM room still exists and bot is still in it try: - await self.client.get_state_event(state["dm_room_id"], EventType.ROOM_MEMBER, self.client.mxid) + await self.client.get_state_event( + state["dm_room_id"], EventType.ROOM_MEMBER, self.client.mxid + ) except Exception: # Bot is not in the DM room anymore, state is stale await self.delete_verification_state(state["dm_room_id"]) @@ -2633,7 +2908,9 @@ class CommunityBot(Plugin): # Check if user is still in the target room try: - await self.client.get_state_event(state["target_room_id"], EventType.ROOM_MEMBER, state["user_id"]) + await self.client.get_state_event( + state["target_room_id"], EventType.ROOM_MEMBER, state["user_id"] + ) except Exception: # User is not in the target room anymore, state is stale await self.delete_verification_state(state["dm_room_id"]) @@ -2645,12 +2922,12 @@ class CommunityBot(Plugin): continue except Exception as e: - self.log.error(f"Error checking verification state {state['dm_room_id']}: {e}") + self.log.error( + f"Error checking verification state {state['dm_room_id']}: {e}" + ) # If we can't check the state, assume it's stale await self.delete_verification_state(state["dm_room_id"]) - - @classmethod def get_db_upgrade_table(cls) -> None: return upgrade_table @@ -2661,20 +2938,26 @@ class CommunityBot(Plugin): @community.subcommand( "initialize", - help="initialize a new community space with the given name. this command can only be used if no parent room is configured." + help="initialize a new community space with the given name. this command can only be used if no parent room is configured.", ) @command.argument("community_name", pass_raw=True, required=True) - async def initialize_community(self, evt: MessageEvent, community_name: str) -> None: + async def initialize_community( + self, evt: MessageEvent, community_name: str + ) -> None: await evt.mark_read() # Check if parent room is already configured if self.config["parent_room"]: - await evt.reply("Cannot initialize: a parent room is already configured. Please remove the parent_room configuration first.") + await evt.reply( + "Cannot initialize: a parent room is already configured. Please remove the parent_room configuration first." + ) return # Validate community name if not community_name or community_name.isspace(): - await evt.reply("Please provide a community name. Usage: !community initialize ") + await evt.reply( + "Please provide a community name. Usage: !community initialize " + ) return msg = await evt.respond("Initializing new community space...") @@ -2685,17 +2968,22 @@ class CommunityBot(Plugin): community_slug = self.generate_community_slug(community_name) self.config["community_slug"] = community_slug self.log.info(f"Generated community slug: {community_slug}") - + # Define child rooms that will be created during initialization (excluding the space itself) child_rooms_to_create = [ f"{community_name} Moderators", # Moderators room - f"{community_name} Waiting Room" # Waiting room + f"{community_name} Waiting Room", # Waiting room ] - + # Validate child room aliases before creating any rooms - is_valid, conflicting_aliases = await self.validate_room_aliases(child_rooms_to_create, evt) + is_valid, conflicting_aliases = await self.validate_room_aliases( + child_rooms_to_create, evt + ) if not is_valid: - error_msg = f"Cannot initialize community: The following room aliases already exist:\n" + "\n".join(conflicting_aliases) + error_msg = ( + f"Cannot initialize community: The following room aliases already exist:\n" + + "\n".join(conflicting_aliases) + ) await evt.respond(error_msg, edits=msg) return @@ -2711,31 +2999,27 @@ class CommunityBot(Plugin): # Set up power levels for the space power_levels = PowerLevelStateEventContent() - + # Set up power levels for users # For modern room versions (12+), the bot (creator) has unlimited power by default # but we still need to set power levels for other users if self.is_modern_room_version(self.config.get("room_version", "1")): # For modern rooms, don't set bot power level (it has unlimited power) # but still set power levels for other users - power_levels.users = { - evt.sender: 100 # Initiator gets admin power - } + power_levels.users = {evt.sender: 100} # Initiator gets admin power else: # For legacy rooms, set both bot and initiator power levels power_levels.users = { self.client.mxid: 1000, # Bot gets highest power - evt.sender: 100 # Initiator gets admin power + evt.sender: 100, # Initiator gets admin power } - + # Set invite power level from config power_levels.invite = self.config.get("invite_power_level", 50) # Create the space with appropriate metadata and power levels space_id, space_alias = await self.create_space( - community_name, - evt, - power_level_override=power_levels + community_name, evt, power_level_override=power_levels ) if not space_id: @@ -2752,8 +3036,10 @@ class CommunityBot(Plugin): # Verify the space exists and has correct power levels try: - space_power_levels = await self.client.get_state_event(space_id, EventType.ROOM_POWER_LEVELS) - + space_power_levels = await self.client.get_state_event( + space_id, EventType.ROOM_POWER_LEVELS + ) + # For modern room versions, creators have unlimited power and don't appear in power levels if self.is_modern_room_version(self.config.get("room_version", "1")): # Just verify the space exists and has power levels @@ -2774,7 +3060,7 @@ class CommunityBot(Plugin): # Create moderators room # Include the initiator as a moderator, plus any other moderators from the space moderators = [evt.sender] # Always include the initiator - + # Also get any other moderators from the space try: space_moderators = await self.get_moderators_and_above() @@ -2785,28 +3071,30 @@ class CommunityBot(Plugin): moderators.append(user) except Exception as e: self.log.warning(f"Could not get additional moderators from space: {e}") - - self.log.info(f"Moderators room will be created with initial members: {moderators}") - + + self.log.info( + f"Moderators room will be created with initial members: {moderators}" + ) + room_result = await self.create_room( f"{community_name} Moderators", evt, - invitees=moderators # Use moderators list instead of config invitees + invitees=moderators, # Use moderators list instead of config invitees ) - + if not room_result: error_msg = "Failed to create moderators room" self.log.error(error_msg) await evt.respond(error_msg, edits=msg) return - + mod_room_id, mod_room_alias = room_result # Set moderators room to invite-only await self.client.send_state_event( mod_room_id, EventType.ROOM_JOIN_RULES, - JoinRulesStateEventContent(join_rule=JoinRule.INVITE) + JoinRulesStateEventContent(join_rule=JoinRule.INVITE), ) # Create waiting room (force unencrypted for public access) @@ -2815,8 +3103,8 @@ class CommunityBot(Plugin): evt, creation_content={ "m.federate": True, - "m.room.history_visibility": "joined" - } + "m.room.history_visibility": "joined", + }, ) if not waiting_room_result: @@ -2824,14 +3112,14 @@ class CommunityBot(Plugin): self.log.error(error_msg) await evt.respond(error_msg, edits=msg) return - + waiting_room_id, waiting_room_alias = waiting_room_result # Set waiting room to be joinable by anyone await self.client.send_state_event( waiting_room_id, EventType.ROOM_JOIN_RULES, - JoinRulesStateEventContent(join_rule=JoinRule.PUBLIC) + JoinRulesStateEventContent(join_rule=JoinRule.PUBLIC), ) # Update censor configuration based on current value @@ -2839,7 +3127,10 @@ class CommunityBot(Plugin): if current_censor is False: # If censor is false, set it to a list with just the waiting room self.config["censor"] = [waiting_room_id] - elif isinstance(current_censor, list) and waiting_room_id not in current_censor: + elif ( + isinstance(current_censor, list) + and waiting_room_id not in current_censor + ): # If censor is already a list and waiting room isn't in it, append it current_censor.append(waiting_room_id) self.config["censor"] = current_censor @@ -2861,7 +3152,7 @@ class CommunityBot(Plugin): f"Moderators Room: {mod_room_alias}
" f"Waiting Room: {waiting_room_alias}{warning_msg}", edits=msg, - allow_html=True + allow_html=True, ) except Exception as e: @@ -2871,7 +3162,7 @@ class CommunityBot(Plugin): @community.subcommand( "doctor", - help="review bot permissions across the space and all rooms to identify potential issues" + help="review bot permissions across the space and all rooms to identify potential issues", ) @command.argument("room", required=False) async def doctor_check(self, evt: MessageEvent, room: str = None) -> None: @@ -2889,21 +3180,20 @@ class CommunityBot(Plugin): msg = await evt.respond("Running diagnostic check...") try: - report = { - "space": {}, - "rooms": {}, - "issues": [], - "warnings": [] - } + report = {"space": {}, "rooms": {}, "issues": [], "warnings": []} # Check parent space permissions report["space"] = await diagnostic_utils.check_space_permissions( self.client, self.config["parent_room"], self.log ) if "error" in report["space"]: - report["issues"].append(f"Failed to check parent space permissions: {report['space']['error']}") + report["issues"].append( + f"Failed to check parent space permissions: {report['space']['error']}" + ) elif report["space"].get("bot_power_level", 0) < 100: - report["issues"].append(f"Bot lacks administrative privileges in parent space (level: {report['space']['bot_power_level']})") + report["issues"].append( + f"Bot lacks administrative privileges in parent space (level: {report['space']['bot_power_level']})" + ) # Check all rooms in the space space_rooms = await self.get_space_roomlist() @@ -2912,25 +3202,33 @@ class CommunityBot(Plugin): self.client, room_id, self.log ) report["rooms"][room_id] = room_data - + # Add issues for problematic rooms if "error" in room_data: if room_data["error"] == "Bot not in room": - report["issues"].append(f"Bot is not a member of room '{room_id}' that is part of the space") + report["issues"].append( + f"Bot is not a member of room '{room_id}' that is part of the space" + ) else: - report["issues"].append(f"Failed to check room {room_id}: {room_data['error']}") + report["issues"].append( + f"Failed to check room {room_id}: {room_data['error']}" + ) elif not room_data.get("has_admin", False): - report["issues"].append(f"Bot lacks administrative privileges in room '{room_data.get('room_name', room_id)}' ({room_id}) - level: {room_data.get('bot_power_level', 0)}") + report["issues"].append( + f"Bot lacks administrative privileges in room '{room_data.get('room_name', room_id)}' ({room_id}) - level: {room_data.get('bot_power_level', 0)}" + ) # Generate response using helper functions response = "

🔍 Bot Permission Diagnostic Summary



" # Space summary - only show if there are issues - space_has_issues = ("error" in report["space"] or - report["space"].get("bot_power_level", 0) < 100 or - report["space"].get("users_higher") or - report["space"].get("users_equal")) - + space_has_issues = ( + "error" in report["space"] + or report["space"].get("bot_power_level", 0) < 100 + or report["space"].get("users_higher") + or report["space"].get("users_equal") + ) + if space_has_issues: response += diagnostic_utils.generate_space_summary(report["space"]) @@ -2941,7 +3239,9 @@ class CommunityBot(Plugin): response += room_summary # Summary statistics - response += diagnostic_utils.generate_summary_stats(report["space"], room_stats) + response += diagnostic_utils.generate_summary_stats( + report["space"], room_stats + ) # Issues and warnings response += diagnostic_utils.generate_issues_and_warnings( @@ -2949,7 +3249,12 @@ class CommunityBot(Plugin): ) # All clear message if no issues - if not report["issues"] and not report["warnings"] and not space_has_issues and not room_summary: + if ( + not report["issues"] + and not report["warnings"] + and not space_has_issues + and not room_summary + ): response += diagnostic_utils.generate_all_clear_message() # Try to send the response, and if it's too large, break it up @@ -2957,20 +3262,33 @@ class CommunityBot(Plugin): await evt.respond(response, edits=msg, allow_html=True) except Exception as e: error_str = str(e).lower() - if any(phrase in error_str for phrase in ["event too large", "413", "payload too large", "message too long"]): - self.log.info(f"Doctor report too large ({len(response)} chars), breaking into multiple messages") - + if any( + phrase in error_str + for phrase in [ + "event too large", + "413", + "payload too large", + "message too long", + ] + ): + self.log.info( + f"Doctor report too large ({len(response)} chars), breaking into multiple messages" + ) + # Break up the response into smaller chunks chunks = self._split_doctor_report(response) self.log.info(f"Split report into {len(chunks)} chunks") - + # Send the first chunk as an edit to the original message if chunks: await evt.respond(chunks[0], edits=msg, allow_html=True) - + # Send remaining chunks as new messages for i, chunk in enumerate(chunks[1:], 2): - await evt.respond(f"

🔍 Bot Permission Diagnostic Report (Part {i}/{len(chunks)})

\n{chunk}", allow_html=True) + await evt.respond( + f"

🔍 Bot Permission Diagnostic Report (Part {i}/{len(chunks)})

\n{chunk}", + allow_html=True, + ) await asyncio.sleep(0.5) # Small delay between messages else: # Re-raise if it's not a size issue @@ -2981,25 +3299,27 @@ class CommunityBot(Plugin): self.log.error(error_msg) await evt.respond(error_msg, edits=msg) - def _split_doctor_report(self, report_text: str, max_chunk_size: int = 4000) -> list[str]: + def _split_doctor_report( + self, report_text: str, max_chunk_size: int = 4000 + ) -> list[str]: """Split a large doctor report into smaller chunks. - + Args: report_text: The full report text to split max_chunk_size: Maximum size of each chunk in characters - + Returns: list: List of text chunks """ return report_utils.split_doctor_report(report_text, max_chunk_size) - + def _split_by_sections(self, text: str, max_size: int) -> list[str]: """Split text by section headers to maintain logical grouping. - + Args: text: Text to split max_size: Maximum size per chunk - + Returns: list: List of text chunks """ @@ -3007,13 +3327,13 @@ class CommunityBot(Plugin): async def _doctor_room_detail(self, evt: MessageEvent, room: str) -> None: """Generate detailed diagnostic report for a specific room. - + Args: evt: The message event room: Room ID or alias to analyze """ msg = await evt.respond(f"Analyzing room {room}...") - + try: # Resolve room ID if alias provided room_id = None @@ -3022,31 +3342,40 @@ class CommunityBot(Plugin): room_info = await self.client.resolve_room_alias(room) room_id = room_info["room_id"] except Exception as e: - await evt.respond(f"Could not resolve room alias {room}: {e}", edits=msg) + await evt.respond( + f"Could not resolve room alias {room}: {e}", edits=msg + ) return elif room.startswith("!"): room_id = room else: - await evt.respond(f"Invalid room format. Use room ID (!roomid:server) or alias (#alias:server)", edits=msg) + await evt.respond( + f"Invalid room format. Use room ID (!roomid:server) or alias (#alias:server)", + edits=msg, + ) return # Check if room is in the space space_rooms = await self.get_space_roomlist() if room_id not in space_rooms: - await evt.respond(f"Room {room} is not part of the configured space.", edits=msg) + await evt.respond( + f"Room {room} is not part of the configured space.", edits=msg + ) return # Get room name room_name = room_id try: - room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME) + room_name_event = await self.client.get_state_event( + room_id, EventType.ROOM_NAME + ) room_name = room_name_event.name except: pass response = f"

🔍 Detailed Analysis: {room_name}


" response += f"Room ID: {room_id}
" - + # Get room version and creators room_version, creators = await self.get_room_version_and_creators(room_id) response += f"Room Version: {room_version}
" @@ -3056,8 +3385,12 @@ class CommunityBot(Plugin): # Check if bot is in the room try: - await self.client.get_state_event(room_id, EventType.ROOM_MEMBER, self.client.mxid) - response += "✅ Bot membership: Bot is a member of this room

" + await self.client.get_state_event( + room_id, EventType.ROOM_MEMBER, self.client.mxid + ) + response += ( + "✅ Bot membership: Bot is a member of this room

" + ) except Exception: response += "❌ Bot membership: Bot is not a member of this room

" await evt.respond(response, edits=msg, allow_html=True) @@ -3065,19 +3398,25 @@ class CommunityBot(Plugin): # Get power levels try: - power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS) + power_levels = await self.client.get_state_event( + room_id, EventType.ROOM_POWER_LEVELS + ) bot_level = power_levels.get_user_level(self.client.mxid) - + # Check if bot has unlimited power (creator in modern room versions) - bot_has_unlimited_power = await self.user_has_unlimited_power(self.client.mxid, room_id) - + bot_has_unlimited_power = await self.user_has_unlimited_power( + self.client.mxid, room_id + ) + response += f"

📊 Power Level Analysis


" response += f"• Bot power level: {bot_level}
" if bot_has_unlimited_power: response += f"• Administrative privileges: ✅ Unlimited Power (Creator)
" else: response += f"• Administrative privileges: {'✅ Yes' if bot_level >= 100 else '❌ No'}
" - response += f"• Default user level: {power_levels.users_default}
" + response += ( + f"• Default user level: {power_levels.users_default}
" + ) response += f"• Invite level: {power_levels.invite}
" response += f"• Kick level: {power_levels.kick}
" response += f"• Ban level: {power_levels.ban}
" @@ -3086,7 +3425,7 @@ class CommunityBot(Plugin): # Check for users with equal or higher power level users_higher = [] users_equal = [] - + for user, level in power_levels.users.items(): if user != self.client.mxid and level >= bot_level: if level == bot_level: @@ -3111,8 +3450,10 @@ class CommunityBot(Plugin): response += "
" if not users_higher and not users_equal: - response += "✅ No power level conflicts detected

" - + response += ( + "✅ No power level conflicts detected

" + ) + # Add note about creators in modern room versions if self.is_modern_room_version(room_version): response += f"

ℹ️ Modern Room Version Note


" @@ -3120,22 +3461,28 @@ class CommunityBot(Plugin): # Check specific permissions response += f"

🔐 Permission Analysis


" - + # Get required levels for various actions events_default = power_levels.events_default events = power_levels.events - + permissions = [ - ("Send messages", events.get(str(EventType.ROOM_MESSAGE), events_default)), + ( + "Send messages", + events.get(str(EventType.ROOM_MESSAGE), events_default), + ), ("Send state events", power_levels.state_default), - ("Change power levels", events.get(str(EventType.ROOM_POWER_LEVELS), events_default)), + ( + "Change power levels", + events.get(str(EventType.ROOM_POWER_LEVELS), events_default), + ), ("Send tombstone", events.get("m.room.tombstone", events_default)), ("Invite users", power_levels.invite), ("Kick users", power_levels.kick), ("Ban users", power_levels.ban), - ("Redact messages", power_levels.redact) + ("Redact messages", power_levels.redact), ] - + for perm_name, required_level in permissions: has_perm = bot_level >= required_level or bot_has_unlimited_power status = "✅" if has_perm else "❌" @@ -3147,25 +3494,33 @@ class CommunityBot(Plugin): # Check room state try: response += f"

🏠 Room State


" - + # Check join rules try: - join_rules = await self.client.get_state_event(room_id, EventType.ROOM_JOIN_RULES) + join_rules = await self.client.get_state_event( + room_id, EventType.ROOM_JOIN_RULES + ) response += f"• Join rule: {join_rules.join_rule}
" except: response += "• Join rule: Could not determine
" # Check encryption try: - encryption = await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION) + encryption = await self.client.get_state_event( + room_id, EventType.ROOM_ENCRYPTION + ) response += f"• Encryption: ✅ Enabled ({encryption.algorithm})
" except: response += "• Encryption: ❌ Not enabled
" # Check space parent try: - space_parent = await self.client.get_state_event(room_id, EventType.SPACE_PARENT) - response += f"• Space parent: ✅ {space_parent.state_key}
" + space_parent = await self.client.get_state_event( + room_id, EventType.SPACE_PARENT + ) + response += ( + f"• Space parent: ✅ {space_parent.state_key}
" + ) except: response += "• Space parent: ❌ Not set
" diff --git a/community/helpers/__init__.py b/community/helpers/__init__.py index fba4c9a..f59d13e 100644 --- a/community/helpers/__init__.py +++ b/community/helpers/__init__.py @@ -1,2 +1,15 @@ # Helper modules for community bot -from . import message_utils, room_utils, user_utils, database_utils, report_utils, decorators, common_utils, room_creation_utils, config_manager, response_builder, diagnostic_utils, base_command_handler +from . import ( + message_utils, + room_utils, + user_utils, + database_utils, + report_utils, + decorators, + common_utils, + room_creation_utils, + config_manager, + response_builder, + diagnostic_utils, + base_command_handler, +) diff --git a/community/helpers/base_command_handler.py b/community/helpers/base_command_handler.py index 32ee22c..dc274da 100644 --- a/community/helpers/base_command_handler.py +++ b/community/helpers/base_command_handler.py @@ -8,10 +8,10 @@ from .decorators import require_permission, require_parent_room, handle_errors class BaseCommandHandler(ABC): """Base class for command handlers with common patterns.""" - + def __init__(self, bot): """Initialize with bot instance. - + Args: bot: CommunityBot instance """ @@ -21,92 +21,96 @@ class BaseCommandHandler(ABC): self.config_manager = bot.config_manager self.log = bot.log self.database = bot.database - + @abstractmethod async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute the command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ pass - - async def check_permissions(self, evt: MessageEvent, min_level: int = 50, room_id: str = None) -> bool: + + async def check_permissions( + self, evt: MessageEvent, min_level: int = 50, room_id: str = None + ) -> bool: """Check if user has required permissions. - + Args: evt: Message event min_level: Minimum required power level room_id: Room ID to check permissions in - + Returns: bool: True if user has permissions """ return await self.bot.user_permitted(evt.sender, min_level, room_id) - + async def check_parent_room(self, evt: MessageEvent) -> bool: """Check if parent room is configured. - + Args: evt: Message event - + Returns: bool: True if parent room is configured """ return await self.bot.check_parent_room(evt) - + async def reply_error(self, evt: MessageEvent, message: str) -> None: """Reply with an error message. - + Args: evt: Message event message: Error message """ await evt.reply(message) - + async def reply_success(self, evt: MessageEvent, message: str) -> None: """Reply with a success message. - + Args: evt: Message event message: Success message """ await evt.reply(message) - - async def respond_html(self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None) -> None: + + async def respond_html( + self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None + ) -> None: """Respond with HTML content. - + Args: evt: Message event message: HTML message edits: Optional message to edit """ await evt.respond(message, allow_html=True, edits=edits) - + def is_tracking_enabled(self) -> bool: """Check if user tracking is enabled. - + Returns: bool: True if tracking is enabled """ return self.config_manager.is_tracking_enabled() - + def is_verification_enabled(self) -> bool: """Check if verification is enabled. - + Returns: bool: True if verification is enabled """ return self.config_manager.is_verification_enabled() - + def get_parent_room(self) -> Optional[str]: """Get the parent room ID. - + Returns: str: Parent room ID or None """ @@ -115,23 +119,23 @@ class BaseCommandHandler(ABC): class TrackingCommandHandler(BaseCommandHandler): """Base handler for commands that require user tracking.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with tracking check.""" if not self.is_tracking_enabled(): await self.reply_error(evt, "user tracking is disabled") return return await self.execute_tracking_command(evt, *args, **kwargs) - + @abstractmethod async def execute_tracking_command(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute the tracking command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ @@ -140,23 +144,23 @@ class TrackingCommandHandler(BaseCommandHandler): class AdminCommandHandler(BaseCommandHandler): """Base handler for admin-only commands.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with admin permission check.""" if not await self.check_permissions(evt, min_level=100): await self.reply_error(evt, "You don't have permission to use this command") return return await self.execute_admin_command(evt, *args, **kwargs) - + @abstractmethod async def execute_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute the admin command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ @@ -165,23 +169,25 @@ class AdminCommandHandler(BaseCommandHandler): class ModeratorCommandHandler(BaseCommandHandler): """Base handler for moderator commands.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with moderator permission check.""" if not await self.check_permissions(evt, min_level=50): await self.reply_error(evt, "You don't have permission to use this command") return return await self.execute_moderator_command(evt, *args, **kwargs) - + @abstractmethod - async def execute_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any: + async def execute_moderator_command( + self, evt: MessageEvent, *args, **kwargs + ) -> Any: """Execute the moderator command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ @@ -190,22 +196,22 @@ class ModeratorCommandHandler(BaseCommandHandler): class SpaceCommandHandler(BaseCommandHandler): """Base handler for commands that require parent space.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with parent space check.""" if not await self.check_parent_room(evt): return return await self.execute_space_command(evt, *args, **kwargs) - + @abstractmethod async def execute_space_command(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute the space command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ @@ -214,7 +220,7 @@ class SpaceCommandHandler(BaseCommandHandler): class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler): """Base handler for commands that require both parent space and moderator permissions.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with both space and moderator checks.""" if not await self.check_parent_room(evt): @@ -223,16 +229,18 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler) await self.reply_error(evt, "You don't have permission to use this command") return return await self.execute_space_moderator_command(evt, *args, **kwargs) - + @abstractmethod - async def execute_space_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any: + async def execute_space_moderator_command( + self, evt: MessageEvent, *args, **kwargs + ) -> Any: """Execute the space moderator command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ @@ -241,7 +249,7 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler) class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler): """Base handler for commands that require both parent space and admin permissions.""" - + async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: """Execute command with both space and admin checks.""" if not await self.check_parent_room(evt): @@ -250,16 +258,18 @@ class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler): await self.reply_error(evt, "You don't have permission to use this command") return return await self.execute_space_admin_command(evt, *args, **kwargs) - + @abstractmethod - async def execute_space_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any: + async def execute_space_admin_command( + self, evt: MessageEvent, *args, **kwargs + ) -> Any: """Execute the space admin command logic. - + Args: evt: Message event *args: Command arguments **kwargs: Additional keyword arguments - + Returns: Command result """ diff --git a/community/helpers/common_utils.py b/community/helpers/common_utils.py index 8eab9b4..eaca635 100644 --- a/community/helpers/common_utils.py +++ b/community/helpers/common_utils.py @@ -6,12 +6,12 @@ from mautrix.types import EventType, MessageEvent async def get_room_name(client, room_id: str, logger) -> Optional[str]: """Get room name from room ID. - + Args: client: Matrix client instance room_id: Room ID to get name for logger: Logger instance for error reporting - + Returns: str: Room name or None if not found/error """ @@ -25,12 +25,12 @@ async def get_room_name(client, room_id: str, logger) -> Optional[str]: async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]: """Get power levels for a room. - + Args: client: Matrix client instance room_id: Room ID to get power levels for logger: Logger instance for error reporting - + Returns: PowerLevelStateEventContent or None if error """ @@ -43,13 +43,13 @@ async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]: async def check_room_membership(client, room_id: str, user_id: str, logger) -> bool: """Check if a user is a member of a room. - + Args: client: Matrix client instance room_id: Room ID to check user_id: User ID to check logger: Logger instance for error reporting - + Returns: bool: True if user is a member, False otherwise """ @@ -62,11 +62,11 @@ async def check_room_membership(client, room_id: str, user_id: str, logger) -> b def format_room_info(room_id: str, room_name: Optional[str] = None) -> str: """Format room information for display. - + Args: room_id: Room ID room_name: Optional room name - + Returns: str: Formatted room info """ @@ -77,12 +77,12 @@ def format_room_info(room_id: str, room_name: Optional[str] = None) -> str: def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any: """Safely get a value from a dictionary with a default. - + Args: dictionary: Dictionary to get value from key: Key to look up default: Default value if key not found - + Returns: Value from dictionary or default """ diff --git a/community/helpers/config_manager.py b/community/helpers/config_manager.py index 26f412a..c72f82a 100644 --- a/community/helpers/config_manager.py +++ b/community/helpers/config_manager.py @@ -5,219 +5,214 @@ from typing import List, Dict, Any, Optional class ConfigManager: """Centralized configuration management for the community bot.""" - + def __init__(self, config: Dict[str, Any]): """Initialize with bot configuration. - + Args: config: Bot configuration dictionary """ self.config = config - + def is_tracking_enabled(self) -> bool: """Check if user tracking is enabled. - + Returns: bool: True if tracking is enabled """ track_users = self.config.get("track_users", []) - + # Handle legacy boolean configuration if isinstance(track_users, bool): return track_users - + # Handle new list configuration return isinstance(track_users, list) and len(track_users) > 0 - + def is_message_tracking_enabled(self) -> bool: """Check if message tracking is enabled. - + Returns: bool: True if message tracking is enabled """ track_users = self.config.get("track_users", []) - + # Handle legacy boolean configuration - if True, enable both messages and reactions if isinstance(track_users, bool): return track_users - + # Handle new list configuration return isinstance(track_users, list) and "messages" in track_users - + def is_reaction_tracking_enabled(self) -> bool: """Check if reaction tracking is enabled. - + Returns: bool: True if reaction tracking is enabled """ track_users = self.config.get("track_users", []) - + # Handle legacy boolean configuration - if True, enable both messages and reactions if isinstance(track_users, bool): return track_users - + # Handle new list configuration return isinstance(track_users, list) and "reactions" in track_users - + def is_verification_enabled(self) -> bool: """Check if verification is enabled. - + Returns: bool: True if verification is enabled """ return self.config.get("verification_enabled", False) - + def is_proactive_banning_enabled(self) -> bool: """Check if proactive banning is enabled. - + Returns: bool: True if proactive banning is enabled """ return self.config.get("proactive_banning", False) - + def is_encryption_enabled(self) -> bool: """Check if encryption is enabled by default. - + Returns: bool: True if encryption is enabled """ return self.config.get("encrypt", False) - + def get_room_version(self) -> str: """Get the configured room version. - + Returns: str: Room version string """ return self.config.get("room_version", "1") - + def get_community_slug(self) -> Optional[str]: """Get the community slug. - + Returns: str: Community slug or None if not configured """ return self.config.get("community_slug") - + def get_parent_room(self) -> Optional[str]: """Get the parent room ID. - + Returns: str: Parent room ID or None if not configured """ return self.config.get("parent_room") - + def get_invitees(self) -> List[str]: """Get the list of users to invite to new rooms. - + Returns: List[str]: List of user IDs to invite """ return self.config.get("invitees", []) - + def get_invite_power_level(self) -> int: """Get the power level required to invite users. - + Returns: int: Power level for inviting users """ return self.config.get("invite_power_level", 50) - + def get_sleep_duration(self) -> float: """Get the sleep duration between operations. - + Returns: float: Sleep duration in seconds """ return self.config.get("sleep", 1.0) - + def get_welcome_sleep_duration(self) -> float: """Get the sleep duration for welcome messages. - + Returns: float: Welcome sleep duration in seconds """ return self.config.get("welcome_sleep", 2.0) - + def get_warn_threshold_days(self) -> int: """Get the warning threshold for inactive users. - + Returns: int: Number of days before warning """ return self.config.get("warn_threshold_days", 30) - + def get_kick_threshold_days(self) -> int: """Get the kick threshold for inactive users. - + Returns: int: Number of days before kicking """ return self.config.get("kick_threshold_days", 60) - + def get_verification_phrase(self) -> str: """Get the verification phrase. - + Returns: str: Verification phrase """ return self.config.get("verification_phrase", "I agree to the rules") - + def get_verification_attempts(self) -> int: """Get the maximum verification attempts. - + Returns: int: Maximum verification attempts """ return self.config.get("verification_attempts", 3) - + def get_verification_timeout(self) -> int: """Get the verification timeout in seconds. - + Returns: int: Verification timeout in seconds """ return self.config.get("verification_timeout", 300) - - + def get_banlist_rooms(self) -> List[str]: """Get the list of banlist rooms. - + Returns: List[str]: List of banlist room IDs or aliases """ return self.config.get("banlist_rooms", []) - + def get_redaction_rooms(self) -> List[str]: """Get the list of rooms for redaction. - + Returns: List[str]: List of room IDs for redaction """ return self.config.get("redaction_rooms", []) - + def validate_required_configs(self) -> List[str]: """Validate that all required configurations are present. - + Returns: List[str]: List of missing required configuration keys """ - required_configs = [ - "parent_room", - "room_version", - "community_slug" - ] - + required_configs = ["parent_room", "room_version", "community_slug"] + missing = [] for config_key in required_configs: if not self.config.get(config_key): missing.append(config_key) - + return missing - + def is_modern_room_version(self) -> bool: """Check if the configured room version is modern (12+). - + Returns: bool: True if room version is 12 or higher """ @@ -226,10 +221,10 @@ class ConfigManager: return version >= 12 except (ValueError, TypeError): return False - + def get_room_creation_settings(self) -> Dict[str, Any]: """Get settings specific to room creation. - + Returns: Dict[str, Any]: Room creation settings """ @@ -239,12 +234,12 @@ class ConfigManager: "invitees": self.get_invitees(), "invite_power_level": self.get_invite_power_level(), "encrypt": self.is_encryption_enabled(), - "parent_room": self.get_parent_room() + "parent_room": self.get_parent_room(), } - + def get_tracking_settings(self) -> Dict[str, Any]: """Get settings specific to user tracking. - + Returns: Dict[str, Any]: Tracking settings """ @@ -253,12 +248,12 @@ class ConfigManager: "track_messages": self.is_message_tracking_enabled(), "track_reactions": self.is_reaction_tracking_enabled(), "warn_threshold_days": self.get_warn_threshold_days(), - "kick_threshold_days": self.get_kick_threshold_days() + "kick_threshold_days": self.get_kick_threshold_days(), } - + def get_verification_settings(self) -> Dict[str, Any]: """Get settings specific to verification. - + Returns: Dict[str, Any]: Verification settings """ @@ -266,5 +261,5 @@ class ConfigManager: "verification_enabled": self.is_verification_enabled(), "verification_phrase": self.get_verification_phrase(), "verification_attempts": self.get_verification_attempts(), - "verification_timeout": self.get_verification_timeout() + "verification_timeout": self.get_verification_timeout(), } diff --git a/community/helpers/database_utils.py b/community/helpers/database_utils.py index c88985f..1369a0e 100644 --- a/community/helpers/database_utils.py +++ b/community/helpers/database_utils.py @@ -8,13 +8,13 @@ from mautrix.types import PaginationDirection async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> List: """Get messages from a user in a room that should be redacted. - + Args: client: Matrix client instance room_id: The room ID to search in mxid: The user ID whose messages to find logger: Logger instance for error reporting - + Returns: list: List of message events to redact """ @@ -40,16 +40,18 @@ async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> Lis return [] -async def redact_messages(client, database, room_id: str, sleep_time: float, logger) -> Dict[str, int]: +async def redact_messages( + client, database, room_id: str, sleep_time: float, logger +) -> Dict[str, int]: """Redact messages queued for redaction in a room. - + Args: client: Matrix client instance database: Database instance room_id: The room ID to redact messages in sleep_time: Sleep time between redactions logger: Logger instance for error reporting - + Returns: dict: Counters for successful and failed redactions """ @@ -59,9 +61,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log ) for event in events: try: - await client.redact( - room_id, event["event_id"], reason="content removed" - ) + await client.redact(room_id, event["event_id"], reason="content removed") counters["success"] += 1 await database.execute( "DELETE FROM redaction_tasks WHERE event_id = $1", event["event_id"] @@ -81,7 +81,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) -> None: """Insert or update user activity timestamp. - + Args: database: Database instance mxid: User Matrix ID @@ -103,16 +103,17 @@ async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) -> logger.error(f"Failed to upsert user timestamp: {e}") -async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_days: int, - logger) -> Dict[str, List[str]]: +async def get_inactive_users( + database, warn_threshold_days: int, kick_threshold_days: int, logger +) -> Dict[str, List[str]]: """Get lists of users who should be warned or kicked for inactivity. - + Args: database: Database instance warn_threshold_days: Days threshold for warning kick_threshold_days: Days threshold for kicking logger: Logger instance for error reporting - + Returns: dict: Contains 'warn' and 'kick' lists of user IDs """ @@ -120,7 +121,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_ current_time = int(time.time()) warn_threshold = current_time - (warn_threshold_days * 24 * 60 * 60) kick_threshold = current_time - (kick_threshold_days * 24 * 60 * 60) - + # Get users to warn warn_results = await database.fetch( """ @@ -132,7 +133,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_ warn_threshold, kick_threshold, ) - + # Get users to kick kick_results = await database.fetch( """ @@ -142,10 +143,10 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_ """, kick_threshold, ) - + return { "warn": [row["mxid"] for row in warn_results], - "kick": [row["mxid"] for row in kick_results] + "kick": [row["mxid"] for row in kick_results], } except Exception as e: logger.error(f"Failed to get inactive users: {e}") @@ -154,7 +155,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_ async def cleanup_stale_verification_states(database, logger) -> None: """Clean up stale verification states older than 24 hours. - + Args: database: Database instance logger: Logger instance for error reporting @@ -172,29 +173,34 @@ async def cleanup_stale_verification_states(database, logger) -> None: async def get_verification_state(database, dm_room_id: str) -> Dict[str, Any]: """Get verification state for a DM room. - + Args: database: Database instance dm_room_id: The DM room ID - + Returns: dict: Verification state data or None if not found """ try: result = await database.fetchrow( - "SELECT * FROM verification_states WHERE dm_room_id = $1", - dm_room_id + "SELECT * FROM verification_states WHERE dm_room_id = $1", dm_room_id ) return dict(result) if result else None except Exception as e: return None -async def create_verification_state(database, dm_room_id: str, user_id: str, - target_room_id: str, verification_phrase: str, - attempts_remaining: int, required_power_level: int) -> None: +async def create_verification_state( + database, + dm_room_id: str, + user_id: str, + target_room_id: str, + verification_phrase: str, + attempts_remaining: int, + required_power_level: int, +) -> None: """Create a new verification state. - + Args: database: Database instance dm_room_id: The DM room ID @@ -211,16 +217,22 @@ async def create_verification_state(database, dm_room_id: str, user_id: str, (dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, required_power_level) VALUES ($1, $2, $3, $4, $5, $6) """, - dm_room_id, user_id, target_room_id, verification_phrase, - attempts_remaining, required_power_level + dm_room_id, + user_id, + target_room_id, + verification_phrase, + attempts_remaining, + required_power_level, ) except Exception as e: pass # Verification state creation is not critical -async def update_verification_attempts(database, dm_room_id: str, attempts_remaining: int) -> None: +async def update_verification_attempts( + database, dm_room_id: str, attempts_remaining: int +) -> None: """Update verification attempts remaining. - + Args: database: Database instance dm_room_id: The DM room ID @@ -229,7 +241,8 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai try: await database.execute( "UPDATE verification_states SET attempts_remaining = $1 WHERE dm_room_id = $2", - attempts_remaining, dm_room_id + attempts_remaining, + dm_room_id, ) except Exception as e: pass # Verification state update is not critical @@ -237,15 +250,14 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai async def delete_verification_state(database, dm_room_id: str) -> None: """Delete a verification state. - + Args: database: Database instance dm_room_id: The DM room ID """ try: await database.execute( - "DELETE FROM verification_states WHERE dm_room_id = $1", - dm_room_id + "DELETE FROM verification_states WHERE dm_room_id = $1", dm_room_id ) except Exception as e: pass # Verification state deletion is not critical diff --git a/community/helpers/decorators.py b/community/helpers/decorators.py index 7baee2b..807a822 100644 --- a/community/helpers/decorators.py +++ b/community/helpers/decorators.py @@ -7,11 +7,12 @@ from mautrix.types import UserID, MessageEvent def require_permission(min_level: int = 50, room_id: Optional[str] = None): """Decorator to require user permission for command execution. - + Args: min_level: Minimum required power level (default 50 for moderator) room_id: Room ID to check permissions in (None for parent room) """ + def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: @@ -19,26 +20,31 @@ def require_permission(min_level: int = 50, room_id: Optional[str] = None): await evt.reply("You don't have permission to use this command") return return await func(self, evt, *args, **kwargs) + return wrapper + return decorator def require_parent_room(func: Callable) -> Callable: """Decorator to require parent room to be configured.""" + @functools.wraps(func) async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: if not await self.check_parent_room(evt): return return await func(self, evt, *args, **kwargs) + return wrapper def handle_errors(error_message: str = "An error occurred"): """Decorator to handle common errors in command execution. - + Args: error_message: Default error message to show to user """ + def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: @@ -47,5 +53,7 @@ def handle_errors(error_message: str = "An error occurred"): except Exception as e: self.log.error(f"Error in {func.__name__}: {e}") await evt.reply(f"{error_message}: {e}") + return wrapper + return decorator diff --git a/community/helpers/diagnostic_utils.py b/community/helpers/diagnostic_utils.py index 2381bed..571a2cf 100644 --- a/community/helpers/diagnostic_utils.py +++ b/community/helpers/diagnostic_utils.py @@ -6,17 +6,15 @@ from mautrix.client import Client async def check_space_permissions( - client: Client, - parent_room: str, - logger + client: Client, parent_room: str, logger ) -> Dict[str, Any]: """Check bot permissions in the parent space. - + Args: client: Matrix client parent_room: Parent room ID logger: Logger instance - + Returns: Dict containing space permission information """ @@ -25,11 +23,14 @@ async def check_space_permissions( parent_room, EventType.ROOM_POWER_LEVELS ) bot_level = space_power_levels.get_user_level(client.mxid) - + # Check if bot has unlimited power (creator in modern room versions) from .room_utils import user_has_unlimited_power - bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, parent_room) - + + bot_has_unlimited_power = await user_has_unlimited_power( + client, client.mxid, parent_room + ) + space_info = { "room_id": parent_room, "bot_power_level": bot_level, @@ -37,48 +38,36 @@ async def check_space_permissions( "bot_has_unlimited_power": bot_has_unlimited_power, "users_higher_or_equal": [], "users_equal": [], - "users_higher": [] + "users_higher": [], } # Check for users with equal or higher power level for user, level in space_power_levels.users.items(): if user != client.mxid and level >= bot_level: if level == bot_level: - space_info["users_equal"].append({ - "user": user, - "level": level - }) + space_info["users_equal"].append({"user": user, "level": level}) else: - space_info["users_higher"].append({ - "user": user, - "level": level - }) - space_info["users_higher_or_equal"].append({ - "user": user, - "level": level - }) + space_info["users_higher"].append({"user": user, "level": level}) + space_info["users_higher_or_equal"].append( + {"user": user, "level": level} + ) return space_info except Exception as e: logger.error(f"Failed to check space permissions: {e}") - return { - "room_id": parent_room, - "error": str(e) - } + return {"room_id": parent_room, "error": str(e)} async def check_room_permissions( - client: Client, - room_id: str, - logger + client: Client, room_id: str, logger ) -> Dict[str, Any]: """Check bot permissions in a specific room. - + Args: client: Matrix client room_id: Room ID to check logger: Logger instance - + Returns: Dict containing room permission information """ @@ -87,33 +76,37 @@ async def check_room_permissions( try: await client.get_state_event(room_id, EventType.ROOM_MEMBER, client.mxid) except: - return { - "room_id": room_id, - "error": "Bot not in room" - } + return {"room_id": room_id, "error": "Bot not in room"} # Get power levels room_power_levels = await client.get_state_event( room_id, EventType.ROOM_POWER_LEVELS ) bot_level = room_power_levels.get_user_level(client.mxid) - + # Get room name if available room_name = room_id try: from .common_utils import get_room_name + room_name = await get_room_name(client, room_id, logger) or room_id except: pass # Get room version and creators from .room_utils import get_room_version_and_creators - room_version, creators = await get_room_version_and_creators(client, room_id, logger) - + + room_version, creators = await get_room_version_and_creators( + client, room_id, logger + ) + # Check if bot has unlimited power (creator in modern room versions) from .room_utils import user_has_unlimited_power - bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, room_id) - + + bot_has_unlimited_power = await user_has_unlimited_power( + client, client.mxid, room_id + ) + room_report = { "room_id": room_id, "room_name": room_name, @@ -124,46 +117,35 @@ async def check_room_permissions( "bot_has_unlimited_power": bot_has_unlimited_power, "users_higher_or_equal": [], "users_equal": [], - "users_higher": [] + "users_higher": [], } # Check for users with equal or higher power level for user, level in room_power_levels.users.items(): if user != client.mxid and level >= bot_level: if level == bot_level: - room_report["users_equal"].append({ - "user": user, - "level": level - }) + room_report["users_equal"].append({"user": user, "level": level}) else: - room_report["users_higher"].append({ - "user": user, - "level": level - }) - room_report["users_higher_or_equal"].append({ - "user": user, - "level": level - }) + room_report["users_higher"].append({"user": user, "level": level}) + room_report["users_higher_or_equal"].append( + {"user": user, "level": level} + ) return room_report except Exception as e: logger.error(f"Failed to check room permissions for {room_id}: {e}") - return { - "room_id": room_id, - "error": str(e) - } + return {"room_id": room_id, "error": str(e)} def analyze_room_data( - room_data: Dict[str, Any], - is_modern_room_version_func + room_data: Dict[str, Any], is_modern_room_version_func ) -> Tuple[str, str, bool, bool, bool]: """Analyze room data to determine status and categorization. - + Args: room_data: Room data dictionary is_modern_room_version_func: Function to check if room version is modern - + Returns: Tuple of (status, category, is_admin, is_modern, has_error) """ @@ -172,61 +154,58 @@ def analyze_room_data( return "not_in_room", "error", False, False, True else: return "error", "error", False, False, True - + # Check if modern room version is_modern = is_modern_room_version_func(room_data.get("room_version", "1")) - + # Check admin status is_admin = room_data.get("has_admin", False) - + if is_admin: return "admin", "admin", True, is_modern, False else: return "no_admin", "problematic", False, is_modern, False -def generate_space_summary( - space_data: Dict[str, Any] -) -> str: +def generate_space_summary(space_data: Dict[str, Any]) -> str: """Generate HTML summary for space permissions. - + Args: space_data: Space permission data - + Returns: str: HTML formatted space summary """ if "error" in space_data: return f"

📋 Parent Space


Error: {space_data['error']}

" - + space_status = "✅" if space_data.get("has_admin", False) else "❌" response = f"

📋 Parent Space


" - + # Show admin status with appropriate details if space_data.get("bot_has_unlimited_power", False): response += f"{space_status} Administrative privileges: Yes (unlimited power - creator)
" else: response += f"{space_status} Administrative privileges: {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})
" - + if space_data.get("users_higher"): response += f"⚠️ Users with higher power: {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_higher']])}
" if space_data.get("users_equal"): response += f"⚠️ Users with equal power: {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_equal']])}
" - + response += "
" return response def generate_room_summary( - rooms_data: Dict[str, Any], - is_modern_room_version_func + rooms_data: Dict[str, Any], is_modern_room_version_func ) -> Tuple[str, Dict[str, int]]: """Generate HTML summary for room permissions. - + Args: rooms_data: Dictionary of room data is_modern_room_version_func: Function to check if room version is modern - + Returns: Tuple of (HTML response, statistics dict) """ @@ -237,14 +216,14 @@ def generate_room_summary( "error_rooms": 0, "not_in_room_count": 0, "modern_rooms": 0, - "legacy_rooms": 0 + "legacy_rooms": 0, } - + for room_id, room_data in rooms_data.items(): status, category, is_admin, is_modern, has_error = analyze_room_data( room_data, is_modern_room_version_func ) - + # Update statistics if has_error: stats["error_rooms"] += 1 @@ -255,26 +234,32 @@ def generate_room_summary( stats["admin_rooms"] += 1 else: stats["non_admin_rooms"] += 1 - + if is_modern: stats["modern_rooms"] += 1 else: stats["legacy_rooms"] += 1 - + # Generate room info for problematic rooms - if category in ["error", "problematic"] or (is_admin and (room_data.get("users_higher") or room_data.get("users_equal"))): + if category in ["error", "problematic"] or ( + is_admin and (room_data.get("users_higher") or room_data.get("users_equal")) + ): if has_error: if room_data["error"] == "Bot not in room": - problematic_rooms.append(f"❌ {room_data.get('room_name', room_id)} ({room_id}): Bot not in room") + problematic_rooms.append( + f"❌ {room_data.get('room_name', room_id)} ({room_id}): Bot not in room" + ) else: - problematic_rooms.append(f"❌ {room_data.get('room_name', room_id)} ({room_id}): Error - {room_data['error']}") + problematic_rooms.append( + f"❌ {room_data.get('room_name', room_id)} ({room_id}): Error - {room_data['error']}" + ) elif is_admin: # Show unlimited power status for modern rooms if room_data.get("bot_has_unlimited_power", False): room_info = f"✅ {room_data['room_name']} ({room_id}): Unlimited Power (Creator) [v{room_data.get('room_version', '1')}]" else: room_info = f"✅ {room_data['room_name']} ({room_id}): Admin: Yes (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]" - + # Add power level conflict info if room_data.get("users_higher") or room_data.get("users_equal"): if room_data.get("bot_has_unlimited_power", False): @@ -283,11 +268,15 @@ def generate_room_summary( if room_data.get("users_higher"): room_info += f" - Higher power users: {len(room_data['users_higher'])}" if room_data.get("users_equal"): - room_info += f" - Equal power users: {len(room_data['users_equal'])}" + room_info += ( + f" - Equal power users: {len(room_data['users_equal'])}" + ) problematic_rooms.append(room_info) else: - problematic_rooms.append(f"❌ {room_data['room_name']} ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]") - + problematic_rooms.append( + f"❌ {room_data['room_name']} ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]" + ) + # Generate HTML response response = "" if problematic_rooms: @@ -296,20 +285,19 @@ def generate_room_summary( for room_info in problematic_rooms: response += f"{room_info}
" response += "
" - + return response, stats def generate_summary_stats( - space_data: Dict[str, Any], - room_stats: Dict[str, int] + space_data: Dict[str, Any], room_stats: Dict[str, int] ) -> str: """Generate summary statistics HTML. - + Args: space_data: Space permission data room_stats: Room statistics - + Returns: str: HTML formatted summary statistics """ @@ -319,35 +307,32 @@ def generate_summary_stats( response += f"• Rooms without admin: {room_stats['non_admin_rooms']}
" response += f"• Modern room versions (12+): {room_stats['modern_rooms']}
" response += f"• Legacy room versions (1-11): {room_stats['legacy_rooms']}
" - + # Add note about unlimited power for modern rooms - if room_stats['modern_rooms'] > 0: + if room_stats["modern_rooms"] > 0: response += f"
ℹ️ Note: In modern room versions (12+), creators have unlimited power and cannot be restricted by power levels.
" - - if room_stats['not_in_room_count'] > 0: + + if room_stats["not_in_room_count"] > 0: response += f"• Rooms bot not in: {room_stats['not_in_room_count']}
" - if room_stats['error_rooms'] > 0: + if room_stats["error_rooms"] > 0: response += f"• Rooms with errors: {room_stats['error_rooms']}
" - + response += "
" return response -def generate_issues_and_warnings( - issues: List[str], - warnings: List[str] -) -> str: +def generate_issues_and_warnings(issues: List[str], warnings: List[str]) -> str: """Generate issues and warnings HTML. - + Args: issues: List of critical issues warnings: List of warnings - + Returns: str: HTML formatted issues and warnings """ response = "" - + if issues: response += f"

🚨 Critical Issues


" for issue in issues: @@ -359,13 +344,13 @@ def generate_issues_and_warnings( for warning in warnings: response += f"• {warning}
" response += "
" - + return response def generate_all_clear_message() -> str: """Generate all clear message HTML. - + Returns: str: HTML formatted all clear message """ diff --git a/community/helpers/message_utils.py b/community/helpers/message_utils.py index e6ae13e..bd6a284 100644 --- a/community/helpers/message_utils.py +++ b/community/helpers/message_utils.py @@ -7,12 +7,12 @@ from mautrix.types import MessageType, MediaMessageEventContent def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool: """Check if a message should be flagged for censorship. - + Args: msg: The message event to check censor_wordlist: List of regex patterns to check against censor_files: Whether to flag file messages - + Returns: bool: True if message should be flagged """ @@ -30,17 +30,17 @@ def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool: except Exception: # Skip invalid regex patterns pass - + return False def flag_instaban(msg, instaban_wordlist: list) -> bool: """Check if a message should trigger an instant ban. - + Args: msg: The message event to check instaban_wordlist: List of regex patterns that trigger instant ban - + Returns: bool: True if message should trigger instant ban """ @@ -51,17 +51,17 @@ def flag_instaban(msg, instaban_wordlist: list) -> bool: except Exception: # Skip invalid regex patterns pass - + return False def censor_room(msg, censor_config) -> bool: """Check if a message should be censored based on room configuration. - + Args: msg: The message event to check censor_config: Censor configuration (bool or list of room IDs) - + Returns: bool: True if message should be censored """ @@ -75,10 +75,10 @@ def censor_room(msg, censor_config) -> bool: def sanitize_room_name(room_name: str) -> str: """Sanitize a room name for use in aliases. - + Args: room_name: The room name to sanitize - + Returns: str: Sanitized room name (alphanumeric only, lowercase) """ @@ -87,12 +87,12 @@ def sanitize_room_name(room_name: str) -> str: def generate_community_slug(community_name: str) -> str: """Generate a community slug from the community name. - + Args: community_name: The full community name - + Returns: str: A slug made from the first letter of each word, lowercase """ words = community_name.strip().split() - return ''.join(word[0].lower() for word in words if word) + return "".join(word[0].lower() for word in words if word) diff --git a/community/helpers/report_utils.py b/community/helpers/report_utils.py index 36d723b..f246d8d 100644 --- a/community/helpers/report_utils.py +++ b/community/helpers/report_utils.py @@ -6,53 +6,53 @@ import time def generate_activity_report(database_results: Dict[str, List[Dict]]) -> Dict[str, Any]: """Generate an activity report from database results. - + Args: database_results: Dictionary containing 'warn_inactive', 'kick_inactive', 'ignored' results - + Returns: dict: Formatted activity report """ report = {} - + # Process warn inactive users (between warn and kick thresholds) warn_inactive_results = database_results.get("warn_inactive", []) report["warn_inactive"] = [row["mxid"] for row in warn_inactive_results] or ["none"] - + # Process kick inactive users (beyond kick threshold) kick_inactive_results = database_results.get("kick_inactive", []) report["kick_inactive"] = [row["mxid"] for row in kick_inactive_results] or ["none"] - + # Process ignored users ignored_results = database_results.get("ignored", []) report["ignored"] = [row["mxid"] for row in ignored_results] or ["none"] - + return report def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[str]: """Split a doctor report into chunks that fit within size limits. - + Args: report_text: The full report text max_chunk_size: Maximum size per chunk - + Returns: list: List of report chunks """ if len(report_text) <= max_chunk_size: return [report_text] - + # Try to split by sections first sections = _split_by_sections(report_text, max_chunk_size) if sections: return sections - + # Fall back to character-based splitting chunks = [] current_chunk = "" - - for line in report_text.split('\n'): + + for line in report_text.split("\n"): if len(current_chunk) + len(line) + 1 > max_chunk_size: if current_chunk: chunks.append(current_chunk.strip()) @@ -63,31 +63,31 @@ def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[st current_chunk = line[max_chunk_size:] else: if current_chunk: - current_chunk += '\n' + line + current_chunk += "\n" + line else: current_chunk = line - + if current_chunk: chunks.append(current_chunk.strip()) - + return chunks def _split_by_sections(text: str, max_size: int) -> List[str]: """Split text by sections (lines starting with specific patterns). - + Args: text: The text to split max_size: Maximum size per section - + Returns: list: List of text sections """ section_headers = ["Active users:", "Inactive users:", "Ignored users:"] sections = [] current_section = "" - - lines = text.split('\n') + + lines = text.split("\n") for line in lines: if any(line.startswith(header) for header in section_headers): if current_section and len(current_section) > max_size: @@ -101,54 +101,54 @@ def _split_by_sections(text: str, max_size: int) -> List[str]: # This section would be too big return [] if current_section: - current_section += '\n' + line + current_section += "\n" + line else: current_section = line - + if current_section: sections.append(current_section.strip()) - + return sections if all(len(s) <= max_size for s in sections) else [] def format_ban_results(ban_event_map: Dict[str, List[str]]) -> str: """Format ban results for display. - + Args: ban_event_map: Dictionary containing ban results - + Returns: str: Formatted ban results """ ban_list = ban_event_map.get("ban_list", {}) error_list = ban_event_map.get("error_list", {}) - + result_parts = [] - + for user, rooms in ban_list.items(): if rooms: result_parts.append(f"Banned {user} from: {', '.join(rooms)}") - + for user, rooms in error_list.items(): if rooms: result_parts.append(f"Failed to ban {user} from: {', '.join(rooms)}") - - return '\n'.join(result_parts) if result_parts else "No ban operations performed" + + return "\n".join(result_parts) if result_parts else "No ban operations performed" def format_sync_results(sync_results: Dict[str, List[str]]) -> str: """Format sync results for display. - + Args: sync_results: Dictionary containing sync results - + Returns: str: Formatted sync results """ added = sync_results.get("added", []) dropped = sync_results.get("dropped", []) - + added_str = "
".join(added) if added else "none" dropped_str = "
".join(dropped) if dropped else "none" - + return f"Added: {added_str}

Dropped: {dropped_str}" diff --git a/community/helpers/response_builder.py b/community/helpers/response_builder.py index 99ccca6..1daca66 100644 --- a/community/helpers/response_builder.py +++ b/community/helpers/response_builder.py @@ -6,16 +6,16 @@ from mautrix.types import MessageEvent class ResponseBuilder: """Builder for consistent response formatting.""" - + @staticmethod def build_html_response(title: str, content: str, allow_html: bool = True) -> str: """Build an HTML formatted response. - + Args: title: Response title content: Response content allow_html: Whether to allow HTML formatting - + Returns: str: Formatted response """ @@ -23,15 +23,15 @@ class ResponseBuilder: return f"

{title}
{content}

" else: return f"{title}\n{content}" - + @staticmethod def build_error_response(error: str, allow_html: bool = True) -> str: """Build an error response. - + Args: error: Error message allow_html: Whether to allow HTML formatting - + Returns: str: Formatted error response """ @@ -39,15 +39,15 @@ class ResponseBuilder: return f"

Error: {error}

" else: return f"Error: {error}" - + @staticmethod def build_success_response(message: str, allow_html: bool = True) -> str: """Build a success response. - + Args: message: Success message allow_html: Whether to allow HTML formatting - + Returns: str: Formatted success response """ @@ -55,186 +55,196 @@ class ResponseBuilder: return f"

Success: {message}

" else: return f"Success: {message}" - + @staticmethod - def build_list_response(title: str, items: List[str], allow_html: bool = True) -> str: + def build_list_response( + title: str, items: List[str], allow_html: bool = True + ) -> str: """Build a list response. - + Args: title: List title items: List items allow_html: Whether to allow HTML formatting - + Returns: str: Formatted list response """ if not items: - return ResponseBuilder.build_html_response(title, "No items found.", allow_html) - + return ResponseBuilder.build_html_response( + title, "No items found.", allow_html + ) + if allow_html: items_html = "
".join(items) return f"

{title}
{items_html}

" else: items_text = "\n".join(f"- {item}" for item in items) return f"{title}\n{items_text}" - + @staticmethod def build_room_link(alias: str, server: str) -> str: """Build a Matrix room link. - + Args: alias: Room alias server: Server name - + Returns: str: HTML room link """ return f"#{alias}:{server}" - + @staticmethod def build_user_link(user_id: str) -> str: """Build a Matrix user link. - + Args: user_id: User ID - + Returns: str: HTML user link """ return f"{user_id}" - + @staticmethod - def build_activity_report_response(report: Dict[str, List[str]], config: Dict[str, Any]) -> str: + def build_activity_report_response( + report: Dict[str, List[str]], config: Dict[str, Any] + ) -> str: """Build an activity report response. - + Args: report: Activity report data config: Bot configuration - + Returns: str: Formatted activity report """ warn_threshold = config.get("warn_threshold_days", 30) kick_threshold = config.get("kick_threshold_days", 60) - + response_parts = [] - + if report.get("warn_inactive"): warn_list = "
".join(report["warn_inactive"]) response_parts.append( f"

Users inactive for between {warn_threshold} and {kick_threshold} days:
" f"{warn_list}

" ) - + if report.get("kick_inactive"): kick_list = "
".join(report["kick_inactive"]) response_parts.append( f"

Users inactive for at least {kick_threshold} days:
" f"{kick_list}

" ) - + if report.get("ignored"): ignored_list = "
".join(report["ignored"]) - response_parts.append( - f"

Ignored users:
{ignored_list}

" - ) - + response_parts.append(f"

Ignored users:
{ignored_list}

") + return "".join(response_parts) - + @staticmethod def build_ban_results_response(results: Dict[str, Any]) -> str: """Build a ban results response. - + Args: results: Ban results data - + Returns: str: Formatted ban results """ ban_list = results.get("ban_list", []) error_list = results.get("error_list", []) - + response_parts = [] - + if ban_list: ban_list_html = "
".join(ban_list) - response_parts.append(f"

Users banned:
{ban_list_html}

") - + response_parts.append( + f"

Users banned:
{ban_list_html}

" + ) + if error_list: error_list_html = "
".join(error_list) - response_parts.append(f"

Errors:
{error_list_html}

") - + response_parts.append( + f"

Errors:
{error_list_html}

" + ) + if not response_parts: response_parts.append("

No users were banned.

") - + return "".join(response_parts) - + @staticmethod def build_sync_results_response(results: Dict[str, List[str]]) -> str: """Build a sync results response. - + Args: results: Sync results data - + Returns: str: Formatted sync results """ added = results.get("added", []) dropped = results.get("dropped", []) - + response_parts = [] - + if added: added_html = "
".join(added) response_parts.append(f"

Added:
{added_html}

") - + if dropped: dropped_html = "
".join(dropped) response_parts.append(f"

Dropped:
{dropped_html}

") - + if not response_parts: response_parts.append("

No changes made.

") - + return "".join(response_parts) - + @staticmethod def build_doctor_report_response(report: Dict[str, Any]) -> str: """Build a doctor report response. - + Args: report: Doctor report data - + Returns: str: Formatted doctor report """ response_parts = [] - + # Space information if report.get("space"): space = report["space"] space_info = f"Space: {space.get('room_id', 'Unknown')}
" - space_info += f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}
" + space_info += ( + f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}
" + ) space_info += f"Has Admin: {space.get('has_admin', False)}
" response_parts.append(f"

{space_info}

") - + # Room information if report.get("rooms"): rooms_info = "Rooms:
" for room_id, room_data in report["rooms"].items(): rooms_info += f"- {room_id}: {room_data.get('status', 'Unknown')}
" response_parts.append(f"

{rooms_info}

") - + # Issues if report.get("issues"): issues_html = "
".join(report["issues"]) response_parts.append(f"

Issues:
{issues_html}

") - + # Warnings if report.get("warnings"): warnings_html = "
".join(report["warnings"]) response_parts.append(f"

Warnings:
{warnings_html}

") - + if not response_parts: response_parts.append("

No issues found.

") - + return "".join(response_parts) diff --git a/community/helpers/room_creation_utils.py b/community/helpers/room_creation_utils.py index 5e79a26..2001a5f 100644 --- a/community/helpers/room_creation_utils.py +++ b/community/helpers/room_creation_utils.py @@ -8,17 +8,15 @@ from mautrix.client import Client async def validate_room_creation_params( - roomname: str, - config: dict, - evt: Optional[MessageEvent] = None + roomname: str, config: dict, evt: Optional[MessageEvent] = None ) -> Tuple[str, bool, bool, str]: """Validate and process room creation parameters. - + Args: roomname: Original room name config: Bot configuration evt: Optional MessageEvent for error responses - + Returns: Tuple of (sanitized_name, force_encryption, force_unencryption, error_msg) """ @@ -27,23 +25,23 @@ async def validate_room_creation_params( unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?(\s+|$)") force_encryption = bool(encrypted_flag_regex.search(roomname)) force_unencryption = bool(unencrypted_flag_regex.search(roomname)) - + # Clean up room name if force_encryption: roomname = encrypted_flag_regex.sub("", roomname) # Remove encryption flag if force_unencryption: roomname = unencrypted_flag_regex.sub("", roomname) # Remove unencryption flag - + # Clean up any extra whitespace roomname = re.sub(r"\s+", " ", roomname).strip() - + sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", roomname).lower() - + # Check if community slug is configured if not config.get("community_slug", ""): error_msg = "No community slug configured. Please run initialize command first." return sanitized_name, force_encryption, force_unencryption, error_msg, roomname - + return sanitized_name, force_encryption, force_unencryption, "", roomname @@ -51,27 +49,27 @@ async def prepare_room_creation_data( sanitized_name: str, config: dict, client: Client, - invitees: Optional[List[str]] = None + invitees: Optional[List[str]] = None, ) -> Tuple[str, str, List[str], str]: """Prepare data needed for room creation. - + Args: sanitized_name: Sanitized room name config: Bot configuration client: Matrix client invitees: Optional list of users to invite - + Returns: Tuple of (alias_localpart, server, room_invitees, parent_room) """ # Create alias with community slug alias_localpart = f"{sanitized_name}-{config.get('community_slug', '')}" - + # Get server and invitees server = client.parse_user_id(client.mxid)[1] room_invitees = invitees if invitees is not None else config.get("invitees", []) parent_room = config.get("parent_room", "") - + return alias_localpart, server, room_invitees, parent_room @@ -79,34 +77,38 @@ async def prepare_power_levels( client: Client, config: dict, parent_room: str, - power_level_override: Optional[PowerLevelStateEventContent] = None + power_level_override: Optional[PowerLevelStateEventContent] = None, ) -> PowerLevelStateEventContent: """Prepare power levels for room creation. - + Args: client: Matrix client config: Bot configuration parent_room: Parent room ID power_level_override: Optional existing power level override - + Returns: PowerLevelStateEventContent for room creation """ if power_level_override: return power_level_override - + if parent_room: try: # Get parent room power levels to extract user power levels parent_power_levels = await client.get_state_event( parent_room, EventType.ROOM_POWER_LEVELS ) - + # Create new power levels with server defaults, not copying all permissions from space power_levels = PowerLevelStateEventContent() - + # Copy only user power levels from parent space, not the entire permission set - if parent_power_levels and hasattr(parent_power_levels, 'users') and parent_power_levels.users: + if ( + parent_power_levels + and hasattr(parent_power_levels, "users") + and parent_power_levels.users + ): try: user_power_levels = parent_power_levels.users.copy() # Ensure bot has highest power @@ -121,10 +123,10 @@ async def prepare_power_levels( power_levels.users = { client.mxid: 1000, # Bot gets highest power } - + # Set explicit config values power_levels.invite = config.get("invite_power_level", 50) - + return power_levels except Exception as e: # If we can't get parent power levels, create default ones @@ -150,10 +152,10 @@ def prepare_initial_state( server: str, force_encryption: bool, force_unencryption: bool, - creation_content: Optional[Dict[str, Any]] = None + creation_content: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, Any]]: """Prepare initial state events for room creation. - + Args: config: Bot configuration parent_room: Parent room ID @@ -161,66 +163,67 @@ def prepare_initial_state( force_encryption: Whether to force encryption force_unencryption: Whether to force no encryption creation_content: Optional creation content - + Returns: List of initial state events """ initial_state = [] - + # Only add space parent state if we have a parent room if parent_room: - initial_state.extend([ - { - "type": str(EventType.SPACE_PARENT), - "state_key": parent_room, - "content": { - "via": [server], - "canonical": True - } - }, - { - "type": str(EventType.ROOM_JOIN_RULES), - "content": { - "join_rule": "restricted", - "allow": [{ - "type": "m.room_membership", - "room_id": parent_room - }] - } - } - ]) - + initial_state.extend( + [ + { + "type": str(EventType.SPACE_PARENT), + "state_key": parent_room, + "content": {"via": [server], "canonical": True}, + }, + { + "type": str(EventType.ROOM_JOIN_RULES), + "content": { + "join_rule": "restricted", + "allow": [ + {"type": "m.room_membership", "room_id": parent_room} + ], + }, + }, + ] + ) + # Add encryption if needed if (config.get("encrypt", False) and not force_unencryption) or force_encryption: - initial_state.append({ - "type": str(EventType.ROOM_ENCRYPTION), - "content": { - "algorithm": "m.megolm.v1.aes-sha2" + initial_state.append( + { + "type": str(EventType.ROOM_ENCRYPTION), + "content": {"algorithm": "m.megolm.v1.aes-sha2"}, } - }) - + ) + # Add history visibility if specified in creation_content if creation_content and "m.room.history_visibility" in creation_content: - initial_state.append({ - "type": str(EventType.ROOM_HISTORY_VISIBILITY), - "content": { - "history_visibility": creation_content.get("m.room.history_visibility", "joined") + initial_state.append( + { + "type": str(EventType.ROOM_HISTORY_VISIBILITY), + "content": { + "history_visibility": creation_content.get( + "m.room.history_visibility", "joined" + ) + }, } - }) - + ) + return initial_state def adjust_power_levels_for_modern_rooms( - power_levels: PowerLevelStateEventContent, - room_version: str + power_levels: PowerLevelStateEventContent, room_version: str ) -> PowerLevelStateEventContent: """Adjust power levels for modern room versions. - + Args: power_levels: Power level state content room_version: Room version string - + Returns: Adjusted power level state content """ @@ -229,20 +232,18 @@ def adjust_power_levels_for_modern_rooms( if room_version and int(room_version) >= 12 and power_levels: if power_levels.users: # Remove bot from users list but keep other important settings - power_levels.users.pop("bot_mxid", None) # Will be replaced with actual bot mxid - + power_levels.users.pop( + "bot_mxid", None + ) # Will be replaced with actual bot mxid + return power_levels async def add_room_to_space( - client: Client, - parent_room: str, - room_id: str, - server: str, - sleep_duration: float + client: Client, parent_room: str, room_id: str, server: str, sleep_duration: float ) -> None: """Add created room to parent space. - + Args: client: Matrix client parent_room: Parent room ID @@ -254,23 +255,17 @@ async def add_room_to_space( await client.send_state_event( parent_room, EventType.SPACE_CHILD, - { - "via": [server], - "suggested": False - }, - state_key=room_id + {"via": [server], "suggested": False}, + state_key=room_id, ) await asyncio.sleep(sleep_duration) async def verify_room_creation( - client: Client, - room_id: str, - expected_version: str, - logger + client: Client, room_id: str, expected_version: str, logger ) -> None: """Verify that room was created with correct settings. - + Args: client: Matrix client room_id: Created room ID @@ -279,9 +274,16 @@ async def verify_room_creation( """ try: from .room_utils import get_room_version_and_creators - actual_version, actual_creators = await get_room_version_and_creators(client, room_id, logger) - logger.info(f"Room {room_id} created with version {actual_version} (requested: {expected_version})") + + actual_version, actual_creators = await get_room_version_and_creators( + client, room_id, logger + ) + logger.info( + f"Room {room_id} created with version {actual_version} (requested: {expected_version})" + ) if actual_version != expected_version: - logger.warning(f"Room version mismatch: requested {expected_version}, got {actual_version}") + logger.warning( + f"Room version mismatch: requested {expected_version}, got {actual_version}" + ) except Exception as e: logger.warning(f"Could not verify room version for {room_id}: {e}") diff --git a/community/helpers/room_utils.py b/community/helpers/room_utils.py index f77644a..35a8315 100644 --- a/community/helpers/room_utils.py +++ b/community/helpers/room_utils.py @@ -8,12 +8,12 @@ from mautrix.errors import MNotFound async def validate_room_alias(client, alias_localpart: str, server: str) -> bool: """Check if a room alias already exists. - + Args: client: Matrix client instance alias_localpart: The localpart of the alias (without # and :server) server: The server domain - + Returns: bool: True if alias is available, False if it already exists """ @@ -30,76 +30,81 @@ async def validate_room_alias(client, alias_localpart: str, server: str) -> bool return True -async def validate_room_aliases(client, room_names: list[str], community_slug: str, server: str) -> Tuple[bool, List[str]]: +async def validate_room_aliases( + client, room_names: list[str], community_slug: str, server: str +) -> Tuple[bool, List[str]]: """Validate that all room aliases are available. - + Args: client: Matrix client instance room_names: List of room names to validate community_slug: The community slug to append server: The server domain - + Returns: tuple: (is_valid, list_of_conflicting_aliases) """ if not community_slug: return False, [] - + conflicting_aliases = [] - + for room_name in room_names: # Clean the room name and create alias from .message_utils import sanitize_room_name + sanitized_name = sanitize_room_name(room_name) alias_localpart = f"{sanitized_name}-{community_slug}" - + # Check if alias is available is_available = await validate_room_alias(client, alias_localpart, server) if not is_available: conflicting_aliases.append(f"#{alias_localpart}:{server}") - + return len(conflicting_aliases) == 0, conflicting_aliases -async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tuple[str, List[str]]: +async def get_room_version_and_creators( + client, room_id: str, logger=None +) -> Tuple[str, List[str]]: """Get the room version and creators for a room. - + Args: client: Matrix client instance room_id: The room ID to check - + Returns: tuple: (room_version, list_of_creators) """ try: # Get all state events to find the creation event state_events = await client.get_state(room_id) - + # Find the m.room.create event creation_event = None for event in state_events: if event.type == EventType.ROOM_CREATE: creation_event = event break - + if not creation_event: # Default to version 1 if no creation event found return "1", [] - + room_version = creation_event.content.get("room_version", "1") creators = [] - + # Add the sender of the creation event as a creator if creation_event.sender: creators.append(creation_event.sender) - + # Add any additional creators from the content additional_creators = creation_event.content.get("additional_creators", []) if isinstance(additional_creators, list): creators.extend(additional_creators) - + return room_version, creators - + except Exception: # Default to version 1 if there's an error return "1", [] @@ -107,10 +112,10 @@ async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tu def is_modern_room_version(room_version: str) -> bool: """Check if a room version is 12 or newer (modern room versions). - + Args: room_version: The room version string to check - + Returns: bool: True if room version is 12 or newer """ @@ -124,36 +129,38 @@ def is_modern_room_version(room_version: str) -> bool: async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool: """Check if a user has unlimited power in a room (creator in modern room versions). - + Args: client: Matrix client instance user_id: The user ID to check room_id: The room ID to check in - + Returns: bool: True if user has unlimited power """ try: - room_version, creators = await get_room_version_and_creators(client, room_id, None) - + room_version, creators = await get_room_version_and_creators( + client, room_id, None + ) + # In modern room versions (12+), creators have unlimited power if is_modern_room_version(room_version): return user_id in creators - + # In older room versions, creators don't have special unlimited power return False - + except Exception: return False async def get_moderators_and_above(client, parent_room: str) -> List[str]: """Get list of users with moderator or higher permissions from the parent space. - + Args: client: Matrix client instance parent_room: The parent room ID - + Returns: list: List of user IDs with power level >= 50 (moderator or above) """ diff --git a/community/helpers/user_utils.py b/community/helpers/user_utils.py index d473660..675d6da 100644 --- a/community/helpers/user_utils.py +++ b/community/helpers/user_utils.py @@ -10,13 +10,13 @@ from mautrix.errors import MNotFound async def check_if_banned(client, userid: str, banlists: List[str], logger) -> bool: """Check if a user is banned according to banlists. - + Args: client: Matrix client instance userid: The user ID to check banlists: List of banlist room IDs or aliases logger: Logger instance for error reporting - + Returns: bool: True if user is banned """ @@ -42,25 +42,25 @@ async def check_if_banned(client, userid: str, banlists: List[str], logger) -> b for rule in user_policies: try: - if bool( - fnmatch.fnmatch(userid, rule["content"]["entity"]) - ) and bool(re.search("ban$", rule["content"]["recommendation"])): + if bool(fnmatch.fnmatch(userid, rule["content"]["entity"])) and bool( + re.search("ban$", rule["content"]["recommendation"]) + ): return True except Exception: # Skip invalid rules pass - + return is_banned async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]: """Get room IDs for all configured banlists. - + Args: client: Matrix client instance banlists: List of banlist room IDs or aliases logger: Logger instance for error reporting - + Returns: list: List of room IDs for banlists """ @@ -81,12 +81,20 @@ async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]: return banlist_roomids -async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: str = "banned", - all_rooms: bool = False, redact_on_ban: bool = False, - get_messages_to_redact_func=None, database=None, - sleep_time: float = 0.1, logger=None) -> Dict: +async def ban_user_from_rooms( + client, + user: str, + roomlist: List[str], + reason: str = "banned", + all_rooms: bool = False, + redact_on_ban: bool = False, + get_messages_to_redact_func=None, + database=None, + sleep_time: float = 0.1, + logger=None, +) -> Dict: """Ban a user from a list of rooms. - + Args: client: Matrix client instance user: User ID to ban @@ -98,13 +106,13 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st database: Database instance for redaction tasks sleep_time: Sleep time between operations logger: Logger instance - + Returns: dict: Ban results with success/error lists """ ban_event_map = {"ban_list": {}, "error_list": {}} ban_event_map["ban_list"][user] = [] - + for room in roomlist: try: roomname = None @@ -149,10 +157,16 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st return ban_event_map -async def user_permitted(client, user_id: UserID, parent_room: str, min_level: int = 50, - room_id: str = None, logger=None) -> bool: +async def user_permitted( + client, + user_id: UserID, + parent_room: str, + min_level: int = 50, + room_id: str = None, + logger=None, +) -> bool: """Check if a user has sufficient power level in a room. - + Args: client: Matrix client instance user_id: The Matrix ID of the user to check @@ -160,18 +174,19 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i min_level: Minimum required power level (default 50 for moderator) room_id: The room ID to check permissions in. If None, uses parent room. logger: Logger instance for error reporting - + Returns: bool: True if user has sufficient power level """ try: target_room = room_id or parent_room - + # First check if user has unlimited power (creator in modern room versions) from .room_utils import user_has_unlimited_power + if await user_has_unlimited_power(client, user_id, target_room): return True - + # Then check power level power_levels = await client.get_state_event( target_room, EventType.ROOM_POWER_LEVELS @@ -186,14 +201,15 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool: """Check if a user has unlimited power in a room (creator in modern room versions). - + Args: client: Matrix client instance user_id: The user ID to check room_id: The room ID to check in - + Returns: bool: True if user has unlimited power """ from .room_utils import user_has_unlimited_power as room_user_has_unlimited_power + return await room_user_has_unlimited_power(client, user_id, room_id)