"
+ )
return
msg = await evt.respond("Initializing new community space...")
@@ -2685,17 +2968,22 @@ class CommunityBot(Plugin):
community_slug = self.generate_community_slug(community_name)
self.config["community_slug"] = community_slug
self.log.info(f"Generated community slug: {community_slug}")
-
+
# Define child rooms that will be created during initialization (excluding the space itself)
child_rooms_to_create = [
f"{community_name} Moderators", # Moderators room
- f"{community_name} Waiting Room" # Waiting room
+ f"{community_name} Waiting Room", # Waiting room
]
-
+
# Validate child room aliases before creating any rooms
- is_valid, conflicting_aliases = await self.validate_room_aliases(child_rooms_to_create, evt)
+ is_valid, conflicting_aliases = await self.validate_room_aliases(
+ child_rooms_to_create, evt
+ )
if not is_valid:
- error_msg = f"Cannot initialize community: The following room aliases already exist:\n" + "\n".join(conflicting_aliases)
+ error_msg = (
+ f"Cannot initialize community: The following room aliases already exist:\n"
+ + "\n".join(conflicting_aliases)
+ )
await evt.respond(error_msg, edits=msg)
return
@@ -2711,31 +2999,27 @@ class CommunityBot(Plugin):
# Set up power levels for the space
power_levels = PowerLevelStateEventContent()
-
+
# Set up power levels for users
# For modern room versions (12+), the bot (creator) has unlimited power by default
# but we still need to set power levels for other users
if self.is_modern_room_version(self.config.get("room_version", "1")):
# For modern rooms, don't set bot power level (it has unlimited power)
# but still set power levels for other users
- power_levels.users = {
- evt.sender: 100 # Initiator gets admin power
- }
+ power_levels.users = {evt.sender: 100} # Initiator gets admin power
else:
# For legacy rooms, set both bot and initiator power levels
power_levels.users = {
self.client.mxid: 1000, # Bot gets highest power
- evt.sender: 100 # Initiator gets admin power
+ evt.sender: 100, # Initiator gets admin power
}
-
+
# Set invite power level from config
power_levels.invite = self.config.get("invite_power_level", 50)
# Create the space with appropriate metadata and power levels
space_id, space_alias = await self.create_space(
- community_name,
- evt,
- power_level_override=power_levels
+ community_name, evt, power_level_override=power_levels
)
if not space_id:
@@ -2752,8 +3036,10 @@ class CommunityBot(Plugin):
# Verify the space exists and has correct power levels
try:
- space_power_levels = await self.client.get_state_event(space_id, EventType.ROOM_POWER_LEVELS)
-
+ space_power_levels = await self.client.get_state_event(
+ space_id, EventType.ROOM_POWER_LEVELS
+ )
+
# For modern room versions, creators have unlimited power and don't appear in power levels
if self.is_modern_room_version(self.config.get("room_version", "1")):
# Just verify the space exists and has power levels
@@ -2774,7 +3060,7 @@ class CommunityBot(Plugin):
# Create moderators room
# Include the initiator as a moderator, plus any other moderators from the space
moderators = [evt.sender] # Always include the initiator
-
+
# Also get any other moderators from the space
try:
space_moderators = await self.get_moderators_and_above()
@@ -2785,28 +3071,30 @@ class CommunityBot(Plugin):
moderators.append(user)
except Exception as e:
self.log.warning(f"Could not get additional moderators from space: {e}")
-
- self.log.info(f"Moderators room will be created with initial members: {moderators}")
-
+
+ self.log.info(
+ f"Moderators room will be created with initial members: {moderators}"
+ )
+
room_result = await self.create_room(
f"{community_name} Moderators",
evt,
- invitees=moderators # Use moderators list instead of config invitees
+ invitees=moderators, # Use moderators list instead of config invitees
)
-
+
if not room_result:
error_msg = "Failed to create moderators room"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
return
-
+
mod_room_id, mod_room_alias = room_result
# Set moderators room to invite-only
await self.client.send_state_event(
mod_room_id,
EventType.ROOM_JOIN_RULES,
- JoinRulesStateEventContent(join_rule=JoinRule.INVITE)
+ JoinRulesStateEventContent(join_rule=JoinRule.INVITE),
)
# Create waiting room (force unencrypted for public access)
@@ -2815,8 +3103,8 @@ class CommunityBot(Plugin):
evt,
creation_content={
"m.federate": True,
- "m.room.history_visibility": "joined"
- }
+ "m.room.history_visibility": "joined",
+ },
)
if not waiting_room_result:
@@ -2824,14 +3112,14 @@ class CommunityBot(Plugin):
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
return
-
+
waiting_room_id, waiting_room_alias = waiting_room_result
# Set waiting room to be joinable by anyone
await self.client.send_state_event(
waiting_room_id,
EventType.ROOM_JOIN_RULES,
- JoinRulesStateEventContent(join_rule=JoinRule.PUBLIC)
+ JoinRulesStateEventContent(join_rule=JoinRule.PUBLIC),
)
# Update censor configuration based on current value
@@ -2839,7 +3127,10 @@ class CommunityBot(Plugin):
if current_censor is False:
# If censor is false, set it to a list with just the waiting room
self.config["censor"] = [waiting_room_id]
- elif isinstance(current_censor, list) and waiting_room_id not in current_censor:
+ elif (
+ isinstance(current_censor, list)
+ and waiting_room_id not in current_censor
+ ):
# If censor is already a list and waiting room isn't in it, append it
current_censor.append(waiting_room_id)
self.config["censor"] = current_censor
@@ -2861,7 +3152,7 @@ class CommunityBot(Plugin):
f"Moderators Room: {mod_room_alias}
"
f"Waiting Room: {waiting_room_alias}{warning_msg}",
edits=msg,
- allow_html=True
+ allow_html=True,
)
except Exception as e:
@@ -2871,7 +3162,7 @@ class CommunityBot(Plugin):
@community.subcommand(
"doctor",
- help="review bot permissions across the space and all rooms to identify potential issues"
+ help="review bot permissions across the space and all rooms to identify potential issues",
)
@command.argument("room", required=False)
async def doctor_check(self, evt: MessageEvent, room: str = None) -> None:
@@ -2889,21 +3180,20 @@ class CommunityBot(Plugin):
msg = await evt.respond("Running diagnostic check...")
try:
- report = {
- "space": {},
- "rooms": {},
- "issues": [],
- "warnings": []
- }
+ report = {"space": {}, "rooms": {}, "issues": [], "warnings": []}
# Check parent space permissions
report["space"] = await diagnostic_utils.check_space_permissions(
self.client, self.config["parent_room"], self.log
)
if "error" in report["space"]:
- report["issues"].append(f"Failed to check parent space permissions: {report['space']['error']}")
+ report["issues"].append(
+ f"Failed to check parent space permissions: {report['space']['error']}"
+ )
elif report["space"].get("bot_power_level", 0) < 100:
- report["issues"].append(f"Bot lacks administrative privileges in parent space (level: {report['space']['bot_power_level']})")
+ report["issues"].append(
+ f"Bot lacks administrative privileges in parent space (level: {report['space']['bot_power_level']})"
+ )
# Check all rooms in the space
space_rooms = await self.get_space_roomlist()
@@ -2912,25 +3202,33 @@ class CommunityBot(Plugin):
self.client, room_id, self.log
)
report["rooms"][room_id] = room_data
-
+
# Add issues for problematic rooms
if "error" in room_data:
if room_data["error"] == "Bot not in room":
- report["issues"].append(f"Bot is not a member of room '{room_id}' that is part of the space")
+ report["issues"].append(
+ f"Bot is not a member of room '{room_id}' that is part of the space"
+ )
else:
- report["issues"].append(f"Failed to check room {room_id}: {room_data['error']}")
+ report["issues"].append(
+ f"Failed to check room {room_id}: {room_data['error']}"
+ )
elif not room_data.get("has_admin", False):
- report["issues"].append(f"Bot lacks administrative privileges in room '{room_data.get('room_name', room_id)}' ({room_id}) - level: {room_data.get('bot_power_level', 0)}")
+ report["issues"].append(
+ f"Bot lacks administrative privileges in room '{room_data.get('room_name', room_id)}' ({room_id}) - level: {room_data.get('bot_power_level', 0)}"
+ )
# Generate response using helper functions
response = "🔍 Bot Permission Diagnostic Summary
"
# Space summary - only show if there are issues
- space_has_issues = ("error" in report["space"] or
- report["space"].get("bot_power_level", 0) < 100 or
- report["space"].get("users_higher") or
- report["space"].get("users_equal"))
-
+ space_has_issues = (
+ "error" in report["space"]
+ or report["space"].get("bot_power_level", 0) < 100
+ or report["space"].get("users_higher")
+ or report["space"].get("users_equal")
+ )
+
if space_has_issues:
response += diagnostic_utils.generate_space_summary(report["space"])
@@ -2941,7 +3239,9 @@ class CommunityBot(Plugin):
response += room_summary
# Summary statistics
- response += diagnostic_utils.generate_summary_stats(report["space"], room_stats)
+ response += diagnostic_utils.generate_summary_stats(
+ report["space"], room_stats
+ )
# Issues and warnings
response += diagnostic_utils.generate_issues_and_warnings(
@@ -2949,7 +3249,12 @@ class CommunityBot(Plugin):
)
# All clear message if no issues
- if not report["issues"] and not report["warnings"] and not space_has_issues and not room_summary:
+ if (
+ not report["issues"]
+ and not report["warnings"]
+ and not space_has_issues
+ and not room_summary
+ ):
response += diagnostic_utils.generate_all_clear_message()
# Try to send the response, and if it's too large, break it up
@@ -2957,20 +3262,33 @@ class CommunityBot(Plugin):
await evt.respond(response, edits=msg, allow_html=True)
except Exception as e:
error_str = str(e).lower()
- if any(phrase in error_str for phrase in ["event too large", "413", "payload too large", "message too long"]):
- self.log.info(f"Doctor report too large ({len(response)} chars), breaking into multiple messages")
-
+ if any(
+ phrase in error_str
+ for phrase in [
+ "event too large",
+ "413",
+ "payload too large",
+ "message too long",
+ ]
+ ):
+ self.log.info(
+ f"Doctor report too large ({len(response)} chars), breaking into multiple messages"
+ )
+
# Break up the response into smaller chunks
chunks = self._split_doctor_report(response)
self.log.info(f"Split report into {len(chunks)} chunks")
-
+
# Send the first chunk as an edit to the original message
if chunks:
await evt.respond(chunks[0], edits=msg, allow_html=True)
-
+
# Send remaining chunks as new messages
for i, chunk in enumerate(chunks[1:], 2):
- await evt.respond(f"🔍 Bot Permission Diagnostic Report (Part {i}/{len(chunks)})
\n{chunk}", allow_html=True)
+ await evt.respond(
+ f"🔍 Bot Permission Diagnostic Report (Part {i}/{len(chunks)})
\n{chunk}",
+ allow_html=True,
+ )
await asyncio.sleep(0.5) # Small delay between messages
else:
# Re-raise if it's not a size issue
@@ -2981,25 +3299,27 @@ class CommunityBot(Plugin):
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
- def _split_doctor_report(self, report_text: str, max_chunk_size: int = 4000) -> list[str]:
+ def _split_doctor_report(
+ self, report_text: str, max_chunk_size: int = 4000
+ ) -> list[str]:
"""Split a large doctor report into smaller chunks.
-
+
Args:
report_text: The full report text to split
max_chunk_size: Maximum size of each chunk in characters
-
+
Returns:
list: List of text chunks
"""
return report_utils.split_doctor_report(report_text, max_chunk_size)
-
+
def _split_by_sections(self, text: str, max_size: int) -> list[str]:
"""Split text by section headers to maintain logical grouping.
-
+
Args:
text: Text to split
max_size: Maximum size per chunk
-
+
Returns:
list: List of text chunks
"""
@@ -3007,13 +3327,13 @@ class CommunityBot(Plugin):
async def _doctor_room_detail(self, evt: MessageEvent, room: str) -> None:
"""Generate detailed diagnostic report for a specific room.
-
+
Args:
evt: The message event
room: Room ID or alias to analyze
"""
msg = await evt.respond(f"Analyzing room {room}...")
-
+
try:
# Resolve room ID if alias provided
room_id = None
@@ -3022,31 +3342,40 @@ class CommunityBot(Plugin):
room_info = await self.client.resolve_room_alias(room)
room_id = room_info["room_id"]
except Exception as e:
- await evt.respond(f"Could not resolve room alias {room}: {e}", edits=msg)
+ await evt.respond(
+ f"Could not resolve room alias {room}: {e}", edits=msg
+ )
return
elif room.startswith("!"):
room_id = room
else:
- await evt.respond(f"Invalid room format. Use room ID (!roomid:server) or alias (#alias:server)", edits=msg)
+ await evt.respond(
+ f"Invalid room format. Use room ID (!roomid:server) or alias (#alias:server)",
+ edits=msg,
+ )
return
# Check if room is in the space
space_rooms = await self.get_space_roomlist()
if room_id not in space_rooms:
- await evt.respond(f"Room {room} is not part of the configured space.", edits=msg)
+ await evt.respond(
+ f"Room {room} is not part of the configured space.", edits=msg
+ )
return
# Get room name
room_name = room_id
try:
- room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
+ room_name_event = await self.client.get_state_event(
+ room_id, EventType.ROOM_NAME
+ )
room_name = room_name_event.name
except:
pass
response = f"🔍 Detailed Analysis: {room_name}
"
response += f"Room ID: {room_id}
"
-
+
# Get room version and creators
room_version, creators = await self.get_room_version_and_creators(room_id)
response += f"Room Version: {room_version}
"
@@ -3056,8 +3385,12 @@ class CommunityBot(Plugin):
# Check if bot is in the room
try:
- await self.client.get_state_event(room_id, EventType.ROOM_MEMBER, self.client.mxid)
- response += "✅ Bot membership: Bot is a member of this room
"
+ await self.client.get_state_event(
+ room_id, EventType.ROOM_MEMBER, self.client.mxid
+ )
+ response += (
+ "✅ Bot membership: Bot is a member of this room
"
+ )
except Exception:
response += "❌ Bot membership: Bot is not a member of this room
"
await evt.respond(response, edits=msg, allow_html=True)
@@ -3065,19 +3398,25 @@ class CommunityBot(Plugin):
# Get power levels
try:
- power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS)
+ power_levels = await self.client.get_state_event(
+ room_id, EventType.ROOM_POWER_LEVELS
+ )
bot_level = power_levels.get_user_level(self.client.mxid)
-
+
# Check if bot has unlimited power (creator in modern room versions)
- bot_has_unlimited_power = await self.user_has_unlimited_power(self.client.mxid, room_id)
-
+ bot_has_unlimited_power = await self.user_has_unlimited_power(
+ self.client.mxid, room_id
+ )
+
response += f"📊 Power Level Analysis
"
response += f"• Bot power level: {bot_level}
"
if bot_has_unlimited_power:
response += f"• Administrative privileges: ✅ Unlimited Power (Creator)
"
else:
response += f"• Administrative privileges: {'✅ Yes' if bot_level >= 100 else '❌ No'}
"
- response += f"• Default user level: {power_levels.users_default}
"
+ response += (
+ f"• Default user level: {power_levels.users_default}
"
+ )
response += f"• Invite level: {power_levels.invite}
"
response += f"• Kick level: {power_levels.kick}
"
response += f"• Ban level: {power_levels.ban}
"
@@ -3086,7 +3425,7 @@ class CommunityBot(Plugin):
# Check for users with equal or higher power level
users_higher = []
users_equal = []
-
+
for user, level in power_levels.users.items():
if user != self.client.mxid and level >= bot_level:
if level == bot_level:
@@ -3111,8 +3450,10 @@ class CommunityBot(Plugin):
response += "
"
if not users_higher and not users_equal:
- response += "✅ No power level conflicts detected
"
-
+ response += (
+ "✅ No power level conflicts detected
"
+ )
+
# Add note about creators in modern room versions
if self.is_modern_room_version(room_version):
response += f"ℹ️ Modern Room Version Note
"
@@ -3120,22 +3461,28 @@ class CommunityBot(Plugin):
# Check specific permissions
response += f"🔐 Permission Analysis
"
-
+
# Get required levels for various actions
events_default = power_levels.events_default
events = power_levels.events
-
+
permissions = [
- ("Send messages", events.get(str(EventType.ROOM_MESSAGE), events_default)),
+ (
+ "Send messages",
+ events.get(str(EventType.ROOM_MESSAGE), events_default),
+ ),
("Send state events", power_levels.state_default),
- ("Change power levels", events.get(str(EventType.ROOM_POWER_LEVELS), events_default)),
+ (
+ "Change power levels",
+ events.get(str(EventType.ROOM_POWER_LEVELS), events_default),
+ ),
("Send tombstone", events.get("m.room.tombstone", events_default)),
("Invite users", power_levels.invite),
("Kick users", power_levels.kick),
("Ban users", power_levels.ban),
- ("Redact messages", power_levels.redact)
+ ("Redact messages", power_levels.redact),
]
-
+
for perm_name, required_level in permissions:
has_perm = bot_level >= required_level or bot_has_unlimited_power
status = "✅" if has_perm else "❌"
@@ -3147,25 +3494,33 @@ class CommunityBot(Plugin):
# Check room state
try:
response += f"🏠 Room State
"
-
+
# Check join rules
try:
- join_rules = await self.client.get_state_event(room_id, EventType.ROOM_JOIN_RULES)
+ join_rules = await self.client.get_state_event(
+ room_id, EventType.ROOM_JOIN_RULES
+ )
response += f"• Join rule: {join_rules.join_rule}
"
except:
response += "• Join rule: Could not determine
"
# Check encryption
try:
- encryption = await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION)
+ encryption = await self.client.get_state_event(
+ room_id, EventType.ROOM_ENCRYPTION
+ )
response += f"• Encryption: ✅ Enabled ({encryption.algorithm})
"
except:
response += "• Encryption: ❌ Not enabled
"
# Check space parent
try:
- space_parent = await self.client.get_state_event(room_id, EventType.SPACE_PARENT)
- response += f"• Space parent: ✅ {space_parent.state_key}
"
+ space_parent = await self.client.get_state_event(
+ room_id, EventType.SPACE_PARENT
+ )
+ response += (
+ f"• Space parent: ✅ {space_parent.state_key}
"
+ )
except:
response += "• Space parent: ❌ Not set
"
diff --git a/community/helpers/__init__.py b/community/helpers/__init__.py
index fba4c9a..f59d13e 100644
--- a/community/helpers/__init__.py
+++ b/community/helpers/__init__.py
@@ -1,2 +1,15 @@
# Helper modules for community bot
-from . import message_utils, room_utils, user_utils, database_utils, report_utils, decorators, common_utils, room_creation_utils, config_manager, response_builder, diagnostic_utils, base_command_handler
+from . import (
+ message_utils,
+ room_utils,
+ user_utils,
+ database_utils,
+ report_utils,
+ decorators,
+ common_utils,
+ room_creation_utils,
+ config_manager,
+ response_builder,
+ diagnostic_utils,
+ base_command_handler,
+)
diff --git a/community/helpers/base_command_handler.py b/community/helpers/base_command_handler.py
index 32ee22c..dc274da 100644
--- a/community/helpers/base_command_handler.py
+++ b/community/helpers/base_command_handler.py
@@ -8,10 +8,10 @@ from .decorators import require_permission, require_parent_room, handle_errors
class BaseCommandHandler(ABC):
"""Base class for command handlers with common patterns."""
-
+
def __init__(self, bot):
"""Initialize with bot instance.
-
+
Args:
bot: CommunityBot instance
"""
@@ -21,92 +21,96 @@ class BaseCommandHandler(ABC):
self.config_manager = bot.config_manager
self.log = bot.log
self.database = bot.database
-
+
@abstractmethod
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
pass
-
- async def check_permissions(self, evt: MessageEvent, min_level: int = 50, room_id: str = None) -> bool:
+
+ async def check_permissions(
+ self, evt: MessageEvent, min_level: int = 50, room_id: str = None
+ ) -> bool:
"""Check if user has required permissions.
-
+
Args:
evt: Message event
min_level: Minimum required power level
room_id: Room ID to check permissions in
-
+
Returns:
bool: True if user has permissions
"""
return await self.bot.user_permitted(evt.sender, min_level, room_id)
-
+
async def check_parent_room(self, evt: MessageEvent) -> bool:
"""Check if parent room is configured.
-
+
Args:
evt: Message event
-
+
Returns:
bool: True if parent room is configured
"""
return await self.bot.check_parent_room(evt)
-
+
async def reply_error(self, evt: MessageEvent, message: str) -> None:
"""Reply with an error message.
-
+
Args:
evt: Message event
message: Error message
"""
await evt.reply(message)
-
+
async def reply_success(self, evt: MessageEvent, message: str) -> None:
"""Reply with a success message.
-
+
Args:
evt: Message event
message: Success message
"""
await evt.reply(message)
-
- async def respond_html(self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None) -> None:
+
+ async def respond_html(
+ self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None
+ ) -> None:
"""Respond with HTML content.
-
+
Args:
evt: Message event
message: HTML message
edits: Optional message to edit
"""
await evt.respond(message, allow_html=True, edits=edits)
-
+
def is_tracking_enabled(self) -> bool:
"""Check if user tracking is enabled.
-
+
Returns:
bool: True if tracking is enabled
"""
return self.config_manager.is_tracking_enabled()
-
+
def is_verification_enabled(self) -> bool:
"""Check if verification is enabled.
-
+
Returns:
bool: True if verification is enabled
"""
return self.config_manager.is_verification_enabled()
-
+
def get_parent_room(self) -> Optional[str]:
"""Get the parent room ID.
-
+
Returns:
str: Parent room ID or None
"""
@@ -115,23 +119,23 @@ class BaseCommandHandler(ABC):
class TrackingCommandHandler(BaseCommandHandler):
"""Base handler for commands that require user tracking."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with tracking check."""
if not self.is_tracking_enabled():
await self.reply_error(evt, "user tracking is disabled")
return
return await self.execute_tracking_command(evt, *args, **kwargs)
-
+
@abstractmethod
async def execute_tracking_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the tracking command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
@@ -140,23 +144,23 @@ class TrackingCommandHandler(BaseCommandHandler):
class AdminCommandHandler(BaseCommandHandler):
"""Base handler for admin-only commands."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with admin permission check."""
if not await self.check_permissions(evt, min_level=100):
await self.reply_error(evt, "You don't have permission to use this command")
return
return await self.execute_admin_command(evt, *args, **kwargs)
-
+
@abstractmethod
async def execute_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the admin command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
@@ -165,23 +169,25 @@ class AdminCommandHandler(BaseCommandHandler):
class ModeratorCommandHandler(BaseCommandHandler):
"""Base handler for moderator commands."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with moderator permission check."""
if not await self.check_permissions(evt, min_level=50):
await self.reply_error(evt, "You don't have permission to use this command")
return
return await self.execute_moderator_command(evt, *args, **kwargs)
-
+
@abstractmethod
- async def execute_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
+ async def execute_moderator_command(
+ self, evt: MessageEvent, *args, **kwargs
+ ) -> Any:
"""Execute the moderator command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
@@ -190,22 +196,22 @@ class ModeratorCommandHandler(BaseCommandHandler):
class SpaceCommandHandler(BaseCommandHandler):
"""Base handler for commands that require parent space."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with parent space check."""
if not await self.check_parent_room(evt):
return
return await self.execute_space_command(evt, *args, **kwargs)
-
+
@abstractmethod
async def execute_space_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the space command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
@@ -214,7 +220,7 @@ class SpaceCommandHandler(BaseCommandHandler):
class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler):
"""Base handler for commands that require both parent space and moderator permissions."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with both space and moderator checks."""
if not await self.check_parent_room(evt):
@@ -223,16 +229,18 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler)
await self.reply_error(evt, "You don't have permission to use this command")
return
return await self.execute_space_moderator_command(evt, *args, **kwargs)
-
+
@abstractmethod
- async def execute_space_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
+ async def execute_space_moderator_command(
+ self, evt: MessageEvent, *args, **kwargs
+ ) -> Any:
"""Execute the space moderator command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
@@ -241,7 +249,7 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler)
class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler):
"""Base handler for commands that require both parent space and admin permissions."""
-
+
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with both space and admin checks."""
if not await self.check_parent_room(evt):
@@ -250,16 +258,18 @@ class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler):
await self.reply_error(evt, "You don't have permission to use this command")
return
return await self.execute_space_admin_command(evt, *args, **kwargs)
-
+
@abstractmethod
- async def execute_space_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
+ async def execute_space_admin_command(
+ self, evt: MessageEvent, *args, **kwargs
+ ) -> Any:
"""Execute the space admin command logic.
-
+
Args:
evt: Message event
*args: Command arguments
**kwargs: Additional keyword arguments
-
+
Returns:
Command result
"""
diff --git a/community/helpers/common_utils.py b/community/helpers/common_utils.py
index 8eab9b4..eaca635 100644
--- a/community/helpers/common_utils.py
+++ b/community/helpers/common_utils.py
@@ -6,12 +6,12 @@ from mautrix.types import EventType, MessageEvent
async def get_room_name(client, room_id: str, logger) -> Optional[str]:
"""Get room name from room ID.
-
+
Args:
client: Matrix client instance
room_id: Room ID to get name for
logger: Logger instance for error reporting
-
+
Returns:
str: Room name or None if not found/error
"""
@@ -25,12 +25,12 @@ async def get_room_name(client, room_id: str, logger) -> Optional[str]:
async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]:
"""Get power levels for a room.
-
+
Args:
client: Matrix client instance
room_id: Room ID to get power levels for
logger: Logger instance for error reporting
-
+
Returns:
PowerLevelStateEventContent or None if error
"""
@@ -43,13 +43,13 @@ async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]:
async def check_room_membership(client, room_id: str, user_id: str, logger) -> bool:
"""Check if a user is a member of a room.
-
+
Args:
client: Matrix client instance
room_id: Room ID to check
user_id: User ID to check
logger: Logger instance for error reporting
-
+
Returns:
bool: True if user is a member, False otherwise
"""
@@ -62,11 +62,11 @@ async def check_room_membership(client, room_id: str, user_id: str, logger) -> b
def format_room_info(room_id: str, room_name: Optional[str] = None) -> str:
"""Format room information for display.
-
+
Args:
room_id: Room ID
room_name: Optional room name
-
+
Returns:
str: Formatted room info
"""
@@ -77,12 +77,12 @@ def format_room_info(room_id: str, room_name: Optional[str] = None) -> str:
def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any:
"""Safely get a value from a dictionary with a default.
-
+
Args:
dictionary: Dictionary to get value from
key: Key to look up
default: Default value if key not found
-
+
Returns:
Value from dictionary or default
"""
diff --git a/community/helpers/config_manager.py b/community/helpers/config_manager.py
index 26f412a..c72f82a 100644
--- a/community/helpers/config_manager.py
+++ b/community/helpers/config_manager.py
@@ -5,219 +5,214 @@ from typing import List, Dict, Any, Optional
class ConfigManager:
"""Centralized configuration management for the community bot."""
-
+
def __init__(self, config: Dict[str, Any]):
"""Initialize with bot configuration.
-
+
Args:
config: Bot configuration dictionary
"""
self.config = config
-
+
def is_tracking_enabled(self) -> bool:
"""Check if user tracking is enabled.
-
+
Returns:
bool: True if tracking is enabled
"""
track_users = self.config.get("track_users", [])
-
+
# Handle legacy boolean configuration
if isinstance(track_users, bool):
return track_users
-
+
# Handle new list configuration
return isinstance(track_users, list) and len(track_users) > 0
-
+
def is_message_tracking_enabled(self) -> bool:
"""Check if message tracking is enabled.
-
+
Returns:
bool: True if message tracking is enabled
"""
track_users = self.config.get("track_users", [])
-
+
# Handle legacy boolean configuration - if True, enable both messages and reactions
if isinstance(track_users, bool):
return track_users
-
+
# Handle new list configuration
return isinstance(track_users, list) and "messages" in track_users
-
+
def is_reaction_tracking_enabled(self) -> bool:
"""Check if reaction tracking is enabled.
-
+
Returns:
bool: True if reaction tracking is enabled
"""
track_users = self.config.get("track_users", [])
-
+
# Handle legacy boolean configuration - if True, enable both messages and reactions
if isinstance(track_users, bool):
return track_users
-
+
# Handle new list configuration
return isinstance(track_users, list) and "reactions" in track_users
-
+
def is_verification_enabled(self) -> bool:
"""Check if verification is enabled.
-
+
Returns:
bool: True if verification is enabled
"""
return self.config.get("verification_enabled", False)
-
+
def is_proactive_banning_enabled(self) -> bool:
"""Check if proactive banning is enabled.
-
+
Returns:
bool: True if proactive banning is enabled
"""
return self.config.get("proactive_banning", False)
-
+
def is_encryption_enabled(self) -> bool:
"""Check if encryption is enabled by default.
-
+
Returns:
bool: True if encryption is enabled
"""
return self.config.get("encrypt", False)
-
+
def get_room_version(self) -> str:
"""Get the configured room version.
-
+
Returns:
str: Room version string
"""
return self.config.get("room_version", "1")
-
+
def get_community_slug(self) -> Optional[str]:
"""Get the community slug.
-
+
Returns:
str: Community slug or None if not configured
"""
return self.config.get("community_slug")
-
+
def get_parent_room(self) -> Optional[str]:
"""Get the parent room ID.
-
+
Returns:
str: Parent room ID or None if not configured
"""
return self.config.get("parent_room")
-
+
def get_invitees(self) -> List[str]:
"""Get the list of users to invite to new rooms.
-
+
Returns:
List[str]: List of user IDs to invite
"""
return self.config.get("invitees", [])
-
+
def get_invite_power_level(self) -> int:
"""Get the power level required to invite users.
-
+
Returns:
int: Power level for inviting users
"""
return self.config.get("invite_power_level", 50)
-
+
def get_sleep_duration(self) -> float:
"""Get the sleep duration between operations.
-
+
Returns:
float: Sleep duration in seconds
"""
return self.config.get("sleep", 1.0)
-
+
def get_welcome_sleep_duration(self) -> float:
"""Get the sleep duration for welcome messages.
-
+
Returns:
float: Welcome sleep duration in seconds
"""
return self.config.get("welcome_sleep", 2.0)
-
+
def get_warn_threshold_days(self) -> int:
"""Get the warning threshold for inactive users.
-
+
Returns:
int: Number of days before warning
"""
return self.config.get("warn_threshold_days", 30)
-
+
def get_kick_threshold_days(self) -> int:
"""Get the kick threshold for inactive users.
-
+
Returns:
int: Number of days before kicking
"""
return self.config.get("kick_threshold_days", 60)
-
+
def get_verification_phrase(self) -> str:
"""Get the verification phrase.
-
+
Returns:
str: Verification phrase
"""
return self.config.get("verification_phrase", "I agree to the rules")
-
+
def get_verification_attempts(self) -> int:
"""Get the maximum verification attempts.
-
+
Returns:
int: Maximum verification attempts
"""
return self.config.get("verification_attempts", 3)
-
+
def get_verification_timeout(self) -> int:
"""Get the verification timeout in seconds.
-
+
Returns:
int: Verification timeout in seconds
"""
return self.config.get("verification_timeout", 300)
-
-
+
def get_banlist_rooms(self) -> List[str]:
"""Get the list of banlist rooms.
-
+
Returns:
List[str]: List of banlist room IDs or aliases
"""
return self.config.get("banlist_rooms", [])
-
+
def get_redaction_rooms(self) -> List[str]:
"""Get the list of rooms for redaction.
-
+
Returns:
List[str]: List of room IDs for redaction
"""
return self.config.get("redaction_rooms", [])
-
+
def validate_required_configs(self) -> List[str]:
"""Validate that all required configurations are present.
-
+
Returns:
List[str]: List of missing required configuration keys
"""
- required_configs = [
- "parent_room",
- "room_version",
- "community_slug"
- ]
-
+ required_configs = ["parent_room", "room_version", "community_slug"]
+
missing = []
for config_key in required_configs:
if not self.config.get(config_key):
missing.append(config_key)
-
+
return missing
-
+
def is_modern_room_version(self) -> bool:
"""Check if the configured room version is modern (12+).
-
+
Returns:
bool: True if room version is 12 or higher
"""
@@ -226,10 +221,10 @@ class ConfigManager:
return version >= 12
except (ValueError, TypeError):
return False
-
+
def get_room_creation_settings(self) -> Dict[str, Any]:
"""Get settings specific to room creation.
-
+
Returns:
Dict[str, Any]: Room creation settings
"""
@@ -239,12 +234,12 @@ class ConfigManager:
"invitees": self.get_invitees(),
"invite_power_level": self.get_invite_power_level(),
"encrypt": self.is_encryption_enabled(),
- "parent_room": self.get_parent_room()
+ "parent_room": self.get_parent_room(),
}
-
+
def get_tracking_settings(self) -> Dict[str, Any]:
"""Get settings specific to user tracking.
-
+
Returns:
Dict[str, Any]: Tracking settings
"""
@@ -253,12 +248,12 @@ class ConfigManager:
"track_messages": self.is_message_tracking_enabled(),
"track_reactions": self.is_reaction_tracking_enabled(),
"warn_threshold_days": self.get_warn_threshold_days(),
- "kick_threshold_days": self.get_kick_threshold_days()
+ "kick_threshold_days": self.get_kick_threshold_days(),
}
-
+
def get_verification_settings(self) -> Dict[str, Any]:
"""Get settings specific to verification.
-
+
Returns:
Dict[str, Any]: Verification settings
"""
@@ -266,5 +261,5 @@ class ConfigManager:
"verification_enabled": self.is_verification_enabled(),
"verification_phrase": self.get_verification_phrase(),
"verification_attempts": self.get_verification_attempts(),
- "verification_timeout": self.get_verification_timeout()
+ "verification_timeout": self.get_verification_timeout(),
}
diff --git a/community/helpers/database_utils.py b/community/helpers/database_utils.py
index c88985f..1369a0e 100644
--- a/community/helpers/database_utils.py
+++ b/community/helpers/database_utils.py
@@ -8,13 +8,13 @@ from mautrix.types import PaginationDirection
async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> List:
"""Get messages from a user in a room that should be redacted.
-
+
Args:
client: Matrix client instance
room_id: The room ID to search in
mxid: The user ID whose messages to find
logger: Logger instance for error reporting
-
+
Returns:
list: List of message events to redact
"""
@@ -40,16 +40,18 @@ async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> Lis
return []
-async def redact_messages(client, database, room_id: str, sleep_time: float, logger) -> Dict[str, int]:
+async def redact_messages(
+ client, database, room_id: str, sleep_time: float, logger
+) -> Dict[str, int]:
"""Redact messages queued for redaction in a room.
-
+
Args:
client: Matrix client instance
database: Database instance
room_id: The room ID to redact messages in
sleep_time: Sleep time between redactions
logger: Logger instance for error reporting
-
+
Returns:
dict: Counters for successful and failed redactions
"""
@@ -59,9 +61,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log
)
for event in events:
try:
- await client.redact(
- room_id, event["event_id"], reason="content removed"
- )
+ await client.redact(room_id, event["event_id"], reason="content removed")
counters["success"] += 1
await database.execute(
"DELETE FROM redaction_tasks WHERE event_id = $1", event["event_id"]
@@ -81,7 +81,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log
async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) -> None:
"""Insert or update user activity timestamp.
-
+
Args:
database: Database instance
mxid: User Matrix ID
@@ -103,16 +103,17 @@ async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) ->
logger.error(f"Failed to upsert user timestamp: {e}")
-async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_days: int,
- logger) -> Dict[str, List[str]]:
+async def get_inactive_users(
+ database, warn_threshold_days: int, kick_threshold_days: int, logger
+) -> Dict[str, List[str]]:
"""Get lists of users who should be warned or kicked for inactivity.
-
+
Args:
database: Database instance
warn_threshold_days: Days threshold for warning
kick_threshold_days: Days threshold for kicking
logger: Logger instance for error reporting
-
+
Returns:
dict: Contains 'warn' and 'kick' lists of user IDs
"""
@@ -120,7 +121,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
current_time = int(time.time())
warn_threshold = current_time - (warn_threshold_days * 24 * 60 * 60)
kick_threshold = current_time - (kick_threshold_days * 24 * 60 * 60)
-
+
# Get users to warn
warn_results = await database.fetch(
"""
@@ -132,7 +133,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
warn_threshold,
kick_threshold,
)
-
+
# Get users to kick
kick_results = await database.fetch(
"""
@@ -142,10 +143,10 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
""",
kick_threshold,
)
-
+
return {
"warn": [row["mxid"] for row in warn_results],
- "kick": [row["mxid"] for row in kick_results]
+ "kick": [row["mxid"] for row in kick_results],
}
except Exception as e:
logger.error(f"Failed to get inactive users: {e}")
@@ -154,7 +155,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
async def cleanup_stale_verification_states(database, logger) -> None:
"""Clean up stale verification states older than 24 hours.
-
+
Args:
database: Database instance
logger: Logger instance for error reporting
@@ -172,29 +173,34 @@ async def cleanup_stale_verification_states(database, logger) -> None:
async def get_verification_state(database, dm_room_id: str) -> Dict[str, Any]:
"""Get verification state for a DM room.
-
+
Args:
database: Database instance
dm_room_id: The DM room ID
-
+
Returns:
dict: Verification state data or None if not found
"""
try:
result = await database.fetchrow(
- "SELECT * FROM verification_states WHERE dm_room_id = $1",
- dm_room_id
+ "SELECT * FROM verification_states WHERE dm_room_id = $1", dm_room_id
)
return dict(result) if result else None
except Exception as e:
return None
-async def create_verification_state(database, dm_room_id: str, user_id: str,
- target_room_id: str, verification_phrase: str,
- attempts_remaining: int, required_power_level: int) -> None:
+async def create_verification_state(
+ database,
+ dm_room_id: str,
+ user_id: str,
+ target_room_id: str,
+ verification_phrase: str,
+ attempts_remaining: int,
+ required_power_level: int,
+) -> None:
"""Create a new verification state.
-
+
Args:
database: Database instance
dm_room_id: The DM room ID
@@ -211,16 +217,22 @@ async def create_verification_state(database, dm_room_id: str, user_id: str,
(dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, required_power_level)
VALUES ($1, $2, $3, $4, $5, $6)
""",
- dm_room_id, user_id, target_room_id, verification_phrase,
- attempts_remaining, required_power_level
+ dm_room_id,
+ user_id,
+ target_room_id,
+ verification_phrase,
+ attempts_remaining,
+ required_power_level,
)
except Exception as e:
pass # Verification state creation is not critical
-async def update_verification_attempts(database, dm_room_id: str, attempts_remaining: int) -> None:
+async def update_verification_attempts(
+ database, dm_room_id: str, attempts_remaining: int
+) -> None:
"""Update verification attempts remaining.
-
+
Args:
database: Database instance
dm_room_id: The DM room ID
@@ -229,7 +241,8 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai
try:
await database.execute(
"UPDATE verification_states SET attempts_remaining = $1 WHERE dm_room_id = $2",
- attempts_remaining, dm_room_id
+ attempts_remaining,
+ dm_room_id,
)
except Exception as e:
pass # Verification state update is not critical
@@ -237,15 +250,14 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai
async def delete_verification_state(database, dm_room_id: str) -> None:
"""Delete a verification state.
-
+
Args:
database: Database instance
dm_room_id: The DM room ID
"""
try:
await database.execute(
- "DELETE FROM verification_states WHERE dm_room_id = $1",
- dm_room_id
+ "DELETE FROM verification_states WHERE dm_room_id = $1", dm_room_id
)
except Exception as e:
pass # Verification state deletion is not critical
diff --git a/community/helpers/decorators.py b/community/helpers/decorators.py
index 7baee2b..807a822 100644
--- a/community/helpers/decorators.py
+++ b/community/helpers/decorators.py
@@ -7,11 +7,12 @@ from mautrix.types import UserID, MessageEvent
def require_permission(min_level: int = 50, room_id: Optional[str] = None):
"""Decorator to require user permission for command execution.
-
+
Args:
min_level: Minimum required power level (default 50 for moderator)
room_id: Room ID to check permissions in (None for parent room)
"""
+
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
@@ -19,26 +20,31 @@ def require_permission(min_level: int = 50, room_id: Optional[str] = None):
await evt.reply("You don't have permission to use this command")
return
return await func(self, evt, *args, **kwargs)
+
return wrapper
+
return decorator
def require_parent_room(func: Callable) -> Callable:
"""Decorator to require parent room to be configured."""
+
@functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
if not await self.check_parent_room(evt):
return
return await func(self, evt, *args, **kwargs)
+
return wrapper
def handle_errors(error_message: str = "An error occurred"):
"""Decorator to handle common errors in command execution.
-
+
Args:
error_message: Default error message to show to user
"""
+
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
@@ -47,5 +53,7 @@ def handle_errors(error_message: str = "An error occurred"):
except Exception as e:
self.log.error(f"Error in {func.__name__}: {e}")
await evt.reply(f"{error_message}: {e}")
+
return wrapper
+
return decorator
diff --git a/community/helpers/diagnostic_utils.py b/community/helpers/diagnostic_utils.py
index 2381bed..571a2cf 100644
--- a/community/helpers/diagnostic_utils.py
+++ b/community/helpers/diagnostic_utils.py
@@ -6,17 +6,15 @@ from mautrix.client import Client
async def check_space_permissions(
- client: Client,
- parent_room: str,
- logger
+ client: Client, parent_room: str, logger
) -> Dict[str, Any]:
"""Check bot permissions in the parent space.
-
+
Args:
client: Matrix client
parent_room: Parent room ID
logger: Logger instance
-
+
Returns:
Dict containing space permission information
"""
@@ -25,11 +23,14 @@ async def check_space_permissions(
parent_room, EventType.ROOM_POWER_LEVELS
)
bot_level = space_power_levels.get_user_level(client.mxid)
-
+
# Check if bot has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power
- bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, parent_room)
-
+
+ bot_has_unlimited_power = await user_has_unlimited_power(
+ client, client.mxid, parent_room
+ )
+
space_info = {
"room_id": parent_room,
"bot_power_level": bot_level,
@@ -37,48 +38,36 @@ async def check_space_permissions(
"bot_has_unlimited_power": bot_has_unlimited_power,
"users_higher_or_equal": [],
"users_equal": [],
- "users_higher": []
+ "users_higher": [],
}
# Check for users with equal or higher power level
for user, level in space_power_levels.users.items():
if user != client.mxid and level >= bot_level:
if level == bot_level:
- space_info["users_equal"].append({
- "user": user,
- "level": level
- })
+ space_info["users_equal"].append({"user": user, "level": level})
else:
- space_info["users_higher"].append({
- "user": user,
- "level": level
- })
- space_info["users_higher_or_equal"].append({
- "user": user,
- "level": level
- })
+ space_info["users_higher"].append({"user": user, "level": level})
+ space_info["users_higher_or_equal"].append(
+ {"user": user, "level": level}
+ )
return space_info
except Exception as e:
logger.error(f"Failed to check space permissions: {e}")
- return {
- "room_id": parent_room,
- "error": str(e)
- }
+ return {"room_id": parent_room, "error": str(e)}
async def check_room_permissions(
- client: Client,
- room_id: str,
- logger
+ client: Client, room_id: str, logger
) -> Dict[str, Any]:
"""Check bot permissions in a specific room.
-
+
Args:
client: Matrix client
room_id: Room ID to check
logger: Logger instance
-
+
Returns:
Dict containing room permission information
"""
@@ -87,33 +76,37 @@ async def check_room_permissions(
try:
await client.get_state_event(room_id, EventType.ROOM_MEMBER, client.mxid)
except:
- return {
- "room_id": room_id,
- "error": "Bot not in room"
- }
+ return {"room_id": room_id, "error": "Bot not in room"}
# Get power levels
room_power_levels = await client.get_state_event(
room_id, EventType.ROOM_POWER_LEVELS
)
bot_level = room_power_levels.get_user_level(client.mxid)
-
+
# Get room name if available
room_name = room_id
try:
from .common_utils import get_room_name
+
room_name = await get_room_name(client, room_id, logger) or room_id
except:
pass
# Get room version and creators
from .room_utils import get_room_version_and_creators
- room_version, creators = await get_room_version_and_creators(client, room_id, logger)
-
+
+ room_version, creators = await get_room_version_and_creators(
+ client, room_id, logger
+ )
+
# Check if bot has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power
- bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, room_id)
-
+
+ bot_has_unlimited_power = await user_has_unlimited_power(
+ client, client.mxid, room_id
+ )
+
room_report = {
"room_id": room_id,
"room_name": room_name,
@@ -124,46 +117,35 @@ async def check_room_permissions(
"bot_has_unlimited_power": bot_has_unlimited_power,
"users_higher_or_equal": [],
"users_equal": [],
- "users_higher": []
+ "users_higher": [],
}
# Check for users with equal or higher power level
for user, level in room_power_levels.users.items():
if user != client.mxid and level >= bot_level:
if level == bot_level:
- room_report["users_equal"].append({
- "user": user,
- "level": level
- })
+ room_report["users_equal"].append({"user": user, "level": level})
else:
- room_report["users_higher"].append({
- "user": user,
- "level": level
- })
- room_report["users_higher_or_equal"].append({
- "user": user,
- "level": level
- })
+ room_report["users_higher"].append({"user": user, "level": level})
+ room_report["users_higher_or_equal"].append(
+ {"user": user, "level": level}
+ )
return room_report
except Exception as e:
logger.error(f"Failed to check room permissions for {room_id}: {e}")
- return {
- "room_id": room_id,
- "error": str(e)
- }
+ return {"room_id": room_id, "error": str(e)}
def analyze_room_data(
- room_data: Dict[str, Any],
- is_modern_room_version_func
+ room_data: Dict[str, Any], is_modern_room_version_func
) -> Tuple[str, str, bool, bool, bool]:
"""Analyze room data to determine status and categorization.
-
+
Args:
room_data: Room data dictionary
is_modern_room_version_func: Function to check if room version is modern
-
+
Returns:
Tuple of (status, category, is_admin, is_modern, has_error)
"""
@@ -172,61 +154,58 @@ def analyze_room_data(
return "not_in_room", "error", False, False, True
else:
return "error", "error", False, False, True
-
+
# Check if modern room version
is_modern = is_modern_room_version_func(room_data.get("room_version", "1"))
-
+
# Check admin status
is_admin = room_data.get("has_admin", False)
-
+
if is_admin:
return "admin", "admin", True, is_modern, False
else:
return "no_admin", "problematic", False, is_modern, False
-def generate_space_summary(
- space_data: Dict[str, Any]
-) -> str:
+def generate_space_summary(space_data: Dict[str, Any]) -> str:
"""Generate HTML summary for space permissions.
-
+
Args:
space_data: Space permission data
-
+
Returns:
str: HTML formatted space summary
"""
if "error" in space_data:
return f"📋 Parent Space
❌ Error: {space_data['error']}
"
-
+
space_status = "✅" if space_data.get("has_admin", False) else "❌"
response = f"📋 Parent Space
"
-
+
# Show admin status with appropriate details
if space_data.get("bot_has_unlimited_power", False):
response += f"{space_status} Administrative privileges: Yes (unlimited power - creator)
"
else:
response += f"{space_status} Administrative privileges: {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})
"
-
+
if space_data.get("users_higher"):
response += f"⚠️ Users with higher power: {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_higher']])}
"
if space_data.get("users_equal"):
response += f"⚠️ Users with equal power: {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_equal']])}
"
-
+
response += "
"
return response
def generate_room_summary(
- rooms_data: Dict[str, Any],
- is_modern_room_version_func
+ rooms_data: Dict[str, Any], is_modern_room_version_func
) -> Tuple[str, Dict[str, int]]:
"""Generate HTML summary for room permissions.
-
+
Args:
rooms_data: Dictionary of room data
is_modern_room_version_func: Function to check if room version is modern
-
+
Returns:
Tuple of (HTML response, statistics dict)
"""
@@ -237,14 +216,14 @@ def generate_room_summary(
"error_rooms": 0,
"not_in_room_count": 0,
"modern_rooms": 0,
- "legacy_rooms": 0
+ "legacy_rooms": 0,
}
-
+
for room_id, room_data in rooms_data.items():
status, category, is_admin, is_modern, has_error = analyze_room_data(
room_data, is_modern_room_version_func
)
-
+
# Update statistics
if has_error:
stats["error_rooms"] += 1
@@ -255,26 +234,32 @@ def generate_room_summary(
stats["admin_rooms"] += 1
else:
stats["non_admin_rooms"] += 1
-
+
if is_modern:
stats["modern_rooms"] += 1
else:
stats["legacy_rooms"] += 1
-
+
# Generate room info for problematic rooms
- if category in ["error", "problematic"] or (is_admin and (room_data.get("users_higher") or room_data.get("users_equal"))):
+ if category in ["error", "problematic"] or (
+ is_admin and (room_data.get("users_higher") or room_data.get("users_equal"))
+ ):
if has_error:
if room_data["error"] == "Bot not in room":
- problematic_rooms.append(f"❌ {room_data.get('room_name', room_id)} ({room_id}): Bot not in room")
+ problematic_rooms.append(
+ f"❌ {room_data.get('room_name', room_id)} ({room_id}): Bot not in room"
+ )
else:
- problematic_rooms.append(f"❌ {room_data.get('room_name', room_id)} ({room_id}): Error - {room_data['error']}")
+ problematic_rooms.append(
+ f"❌ {room_data.get('room_name', room_id)} ({room_id}): Error - {room_data['error']}"
+ )
elif is_admin:
# Show unlimited power status for modern rooms
if room_data.get("bot_has_unlimited_power", False):
room_info = f"✅ {room_data['room_name']} ({room_id}): Unlimited Power (Creator) [v{room_data.get('room_version', '1')}]"
else:
room_info = f"✅ {room_data['room_name']} ({room_id}): Admin: Yes (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]"
-
+
# Add power level conflict info
if room_data.get("users_higher") or room_data.get("users_equal"):
if room_data.get("bot_has_unlimited_power", False):
@@ -283,11 +268,15 @@ def generate_room_summary(
if room_data.get("users_higher"):
room_info += f" - Higher power users: {len(room_data['users_higher'])}"
if room_data.get("users_equal"):
- room_info += f" - Equal power users: {len(room_data['users_equal'])}"
+ room_info += (
+ f" - Equal power users: {len(room_data['users_equal'])}"
+ )
problematic_rooms.append(room_info)
else:
- problematic_rooms.append(f"❌ {room_data['room_name']} ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]")
-
+ problematic_rooms.append(
+ f"❌ {room_data['room_name']} ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]"
+ )
+
# Generate HTML response
response = ""
if problematic_rooms:
@@ -296,20 +285,19 @@ def generate_room_summary(
for room_info in problematic_rooms:
response += f"{room_info}
"
response += "
"
-
+
return response, stats
def generate_summary_stats(
- space_data: Dict[str, Any],
- room_stats: Dict[str, int]
+ space_data: Dict[str, Any], room_stats: Dict[str, int]
) -> str:
"""Generate summary statistics HTML.
-
+
Args:
space_data: Space permission data
room_stats: Room statistics
-
+
Returns:
str: HTML formatted summary statistics
"""
@@ -319,35 +307,32 @@ def generate_summary_stats(
response += f"• Rooms without admin: {room_stats['non_admin_rooms']}
"
response += f"• Modern room versions (12+): {room_stats['modern_rooms']}
"
response += f"• Legacy room versions (1-11): {room_stats['legacy_rooms']}
"
-
+
# Add note about unlimited power for modern rooms
- if room_stats['modern_rooms'] > 0:
+ if room_stats["modern_rooms"] > 0:
response += f"
ℹ️ Note: In modern room versions (12+), creators have unlimited power and cannot be restricted by power levels.
"
-
- if room_stats['not_in_room_count'] > 0:
+
+ if room_stats["not_in_room_count"] > 0:
response += f"• Rooms bot not in: {room_stats['not_in_room_count']}
"
- if room_stats['error_rooms'] > 0:
+ if room_stats["error_rooms"] > 0:
response += f"• Rooms with errors: {room_stats['error_rooms']}
"
-
+
response += "
"
return response
-def generate_issues_and_warnings(
- issues: List[str],
- warnings: List[str]
-) -> str:
+def generate_issues_and_warnings(issues: List[str], warnings: List[str]) -> str:
"""Generate issues and warnings HTML.
-
+
Args:
issues: List of critical issues
warnings: List of warnings
-
+
Returns:
str: HTML formatted issues and warnings
"""
response = ""
-
+
if issues:
response += f"🚨 Critical Issues
"
for issue in issues:
@@ -359,13 +344,13 @@ def generate_issues_and_warnings(
for warning in warnings:
response += f"• {warning}
"
response += "
"
-
+
return response
def generate_all_clear_message() -> str:
"""Generate all clear message HTML.
-
+
Returns:
str: HTML formatted all clear message
"""
diff --git a/community/helpers/message_utils.py b/community/helpers/message_utils.py
index e6ae13e..bd6a284 100644
--- a/community/helpers/message_utils.py
+++ b/community/helpers/message_utils.py
@@ -7,12 +7,12 @@ from mautrix.types import MessageType, MediaMessageEventContent
def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool:
"""Check if a message should be flagged for censorship.
-
+
Args:
msg: The message event to check
censor_wordlist: List of regex patterns to check against
censor_files: Whether to flag file messages
-
+
Returns:
bool: True if message should be flagged
"""
@@ -30,17 +30,17 @@ def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool:
except Exception:
# Skip invalid regex patterns
pass
-
+
return False
def flag_instaban(msg, instaban_wordlist: list) -> bool:
"""Check if a message should trigger an instant ban.
-
+
Args:
msg: The message event to check
instaban_wordlist: List of regex patterns that trigger instant ban
-
+
Returns:
bool: True if message should trigger instant ban
"""
@@ -51,17 +51,17 @@ def flag_instaban(msg, instaban_wordlist: list) -> bool:
except Exception:
# Skip invalid regex patterns
pass
-
+
return False
def censor_room(msg, censor_config) -> bool:
"""Check if a message should be censored based on room configuration.
-
+
Args:
msg: The message event to check
censor_config: Censor configuration (bool or list of room IDs)
-
+
Returns:
bool: True if message should be censored
"""
@@ -75,10 +75,10 @@ def censor_room(msg, censor_config) -> bool:
def sanitize_room_name(room_name: str) -> str:
"""Sanitize a room name for use in aliases.
-
+
Args:
room_name: The room name to sanitize
-
+
Returns:
str: Sanitized room name (alphanumeric only, lowercase)
"""
@@ -87,12 +87,12 @@ def sanitize_room_name(room_name: str) -> str:
def generate_community_slug(community_name: str) -> str:
"""Generate a community slug from the community name.
-
+
Args:
community_name: The full community name
-
+
Returns:
str: A slug made from the first letter of each word, lowercase
"""
words = community_name.strip().split()
- return ''.join(word[0].lower() for word in words if word)
+ return "".join(word[0].lower() for word in words if word)
diff --git a/community/helpers/report_utils.py b/community/helpers/report_utils.py
index 36d723b..f246d8d 100644
--- a/community/helpers/report_utils.py
+++ b/community/helpers/report_utils.py
@@ -6,53 +6,53 @@ import time
def generate_activity_report(database_results: Dict[str, List[Dict]]) -> Dict[str, Any]:
"""Generate an activity report from database results.
-
+
Args:
database_results: Dictionary containing 'warn_inactive', 'kick_inactive', 'ignored' results
-
+
Returns:
dict: Formatted activity report
"""
report = {}
-
+
# Process warn inactive users (between warn and kick thresholds)
warn_inactive_results = database_results.get("warn_inactive", [])
report["warn_inactive"] = [row["mxid"] for row in warn_inactive_results] or ["none"]
-
+
# Process kick inactive users (beyond kick threshold)
kick_inactive_results = database_results.get("kick_inactive", [])
report["kick_inactive"] = [row["mxid"] for row in kick_inactive_results] or ["none"]
-
+
# Process ignored users
ignored_results = database_results.get("ignored", [])
report["ignored"] = [row["mxid"] for row in ignored_results] or ["none"]
-
+
return report
def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[str]:
"""Split a doctor report into chunks that fit within size limits.
-
+
Args:
report_text: The full report text
max_chunk_size: Maximum size per chunk
-
+
Returns:
list: List of report chunks
"""
if len(report_text) <= max_chunk_size:
return [report_text]
-
+
# Try to split by sections first
sections = _split_by_sections(report_text, max_chunk_size)
if sections:
return sections
-
+
# Fall back to character-based splitting
chunks = []
current_chunk = ""
-
- for line in report_text.split('\n'):
+
+ for line in report_text.split("\n"):
if len(current_chunk) + len(line) + 1 > max_chunk_size:
if current_chunk:
chunks.append(current_chunk.strip())
@@ -63,31 +63,31 @@ def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[st
current_chunk = line[max_chunk_size:]
else:
if current_chunk:
- current_chunk += '\n' + line
+ current_chunk += "\n" + line
else:
current_chunk = line
-
+
if current_chunk:
chunks.append(current_chunk.strip())
-
+
return chunks
def _split_by_sections(text: str, max_size: int) -> List[str]:
"""Split text by sections (lines starting with specific patterns).
-
+
Args:
text: The text to split
max_size: Maximum size per section
-
+
Returns:
list: List of text sections
"""
section_headers = ["Active users:", "Inactive users:", "Ignored users:"]
sections = []
current_section = ""
-
- lines = text.split('\n')
+
+ lines = text.split("\n")
for line in lines:
if any(line.startswith(header) for header in section_headers):
if current_section and len(current_section) > max_size:
@@ -101,54 +101,54 @@ def _split_by_sections(text: str, max_size: int) -> List[str]:
# This section would be too big
return []
if current_section:
- current_section += '\n' + line
+ current_section += "\n" + line
else:
current_section = line
-
+
if current_section:
sections.append(current_section.strip())
-
+
return sections if all(len(s) <= max_size for s in sections) else []
def format_ban_results(ban_event_map: Dict[str, List[str]]) -> str:
"""Format ban results for display.
-
+
Args:
ban_event_map: Dictionary containing ban results
-
+
Returns:
str: Formatted ban results
"""
ban_list = ban_event_map.get("ban_list", {})
error_list = ban_event_map.get("error_list", {})
-
+
result_parts = []
-
+
for user, rooms in ban_list.items():
if rooms:
result_parts.append(f"Banned {user} from: {', '.join(rooms)}")
-
+
for user, rooms in error_list.items():
if rooms:
result_parts.append(f"Failed to ban {user} from: {', '.join(rooms)}")
-
- return '\n'.join(result_parts) if result_parts else "No ban operations performed"
+
+ return "\n".join(result_parts) if result_parts else "No ban operations performed"
def format_sync_results(sync_results: Dict[str, List[str]]) -> str:
"""Format sync results for display.
-
+
Args:
sync_results: Dictionary containing sync results
-
+
Returns:
str: Formatted sync results
"""
added = sync_results.get("added", [])
dropped = sync_results.get("dropped", [])
-
+
added_str = "
".join(added) if added else "none"
dropped_str = "
".join(dropped) if dropped else "none"
-
+
return f"Added: {added_str}
Dropped: {dropped_str}"
diff --git a/community/helpers/response_builder.py b/community/helpers/response_builder.py
index 99ccca6..1daca66 100644
--- a/community/helpers/response_builder.py
+++ b/community/helpers/response_builder.py
@@ -6,16 +6,16 @@ from mautrix.types import MessageEvent
class ResponseBuilder:
"""Builder for consistent response formatting."""
-
+
@staticmethod
def build_html_response(title: str, content: str, allow_html: bool = True) -> str:
"""Build an HTML formatted response.
-
+
Args:
title: Response title
content: Response content
allow_html: Whether to allow HTML formatting
-
+
Returns:
str: Formatted response
"""
@@ -23,15 +23,15 @@ class ResponseBuilder:
return f"{title}
{content}
"
else:
return f"{title}\n{content}"
-
+
@staticmethod
def build_error_response(error: str, allow_html: bool = True) -> str:
"""Build an error response.
-
+
Args:
error: Error message
allow_html: Whether to allow HTML formatting
-
+
Returns:
str: Formatted error response
"""
@@ -39,15 +39,15 @@ class ResponseBuilder:
return f"Error: {error}
"
else:
return f"Error: {error}"
-
+
@staticmethod
def build_success_response(message: str, allow_html: bool = True) -> str:
"""Build a success response.
-
+
Args:
message: Success message
allow_html: Whether to allow HTML formatting
-
+
Returns:
str: Formatted success response
"""
@@ -55,186 +55,196 @@ class ResponseBuilder:
return f"Success: {message}
"
else:
return f"Success: {message}"
-
+
@staticmethod
- def build_list_response(title: str, items: List[str], allow_html: bool = True) -> str:
+ def build_list_response(
+ title: str, items: List[str], allow_html: bool = True
+ ) -> str:
"""Build a list response.
-
+
Args:
title: List title
items: List items
allow_html: Whether to allow HTML formatting
-
+
Returns:
str: Formatted list response
"""
if not items:
- return ResponseBuilder.build_html_response(title, "No items found.", allow_html)
-
+ return ResponseBuilder.build_html_response(
+ title, "No items found.", allow_html
+ )
+
if allow_html:
items_html = "
".join(items)
return f"{title}
{items_html}
"
else:
items_text = "\n".join(f"- {item}" for item in items)
return f"{title}\n{items_text}"
-
+
@staticmethod
def build_room_link(alias: str, server: str) -> str:
"""Build a Matrix room link.
-
+
Args:
alias: Room alias
server: Server name
-
+
Returns:
str: HTML room link
"""
return f"#{alias}:{server}"
-
+
@staticmethod
def build_user_link(user_id: str) -> str:
"""Build a Matrix user link.
-
+
Args:
user_id: User ID
-
+
Returns:
str: HTML user link
"""
return f"{user_id}"
-
+
@staticmethod
- def build_activity_report_response(report: Dict[str, List[str]], config: Dict[str, Any]) -> str:
+ def build_activity_report_response(
+ report: Dict[str, List[str]], config: Dict[str, Any]
+ ) -> str:
"""Build an activity report response.
-
+
Args:
report: Activity report data
config: Bot configuration
-
+
Returns:
str: Formatted activity report
"""
warn_threshold = config.get("warn_threshold_days", 30)
kick_threshold = config.get("kick_threshold_days", 60)
-
+
response_parts = []
-
+
if report.get("warn_inactive"):
warn_list = "
".join(report["warn_inactive"])
response_parts.append(
f"Users inactive for between {warn_threshold} and {kick_threshold} days:
"
f"{warn_list}
"
)
-
+
if report.get("kick_inactive"):
kick_list = "
".join(report["kick_inactive"])
response_parts.append(
f"Users inactive for at least {kick_threshold} days:
"
f"{kick_list}
"
)
-
+
if report.get("ignored"):
ignored_list = "
".join(report["ignored"])
- response_parts.append(
- f"Ignored users:
{ignored_list}
"
- )
-
+ response_parts.append(f"Ignored users:
{ignored_list}
")
+
return "".join(response_parts)
-
+
@staticmethod
def build_ban_results_response(results: Dict[str, Any]) -> str:
"""Build a ban results response.
-
+
Args:
results: Ban results data
-
+
Returns:
str: Formatted ban results
"""
ban_list = results.get("ban_list", [])
error_list = results.get("error_list", [])
-
+
response_parts = []
-
+
if ban_list:
ban_list_html = "
".join(ban_list)
- response_parts.append(f"Users banned:
{ban_list_html}
")
-
+ response_parts.append(
+ f"Users banned:
{ban_list_html}
"
+ )
+
if error_list:
error_list_html = "
".join(error_list)
- response_parts.append(f"Errors:
{error_list_html}
")
-
+ response_parts.append(
+ f"Errors:
{error_list_html}
"
+ )
+
if not response_parts:
response_parts.append("No users were banned.
")
-
+
return "".join(response_parts)
-
+
@staticmethod
def build_sync_results_response(results: Dict[str, List[str]]) -> str:
"""Build a sync results response.
-
+
Args:
results: Sync results data
-
+
Returns:
str: Formatted sync results
"""
added = results.get("added", [])
dropped = results.get("dropped", [])
-
+
response_parts = []
-
+
if added:
added_html = "
".join(added)
response_parts.append(f"Added:
{added_html}
")
-
+
if dropped:
dropped_html = "
".join(dropped)
response_parts.append(f"Dropped:
{dropped_html}
")
-
+
if not response_parts:
response_parts.append("No changes made.
")
-
+
return "".join(response_parts)
-
+
@staticmethod
def build_doctor_report_response(report: Dict[str, Any]) -> str:
"""Build a doctor report response.
-
+
Args:
report: Doctor report data
-
+
Returns:
str: Formatted doctor report
"""
response_parts = []
-
+
# Space information
if report.get("space"):
space = report["space"]
space_info = f"Space: {space.get('room_id', 'Unknown')}
"
- space_info += f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}
"
+ space_info += (
+ f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}
"
+ )
space_info += f"Has Admin: {space.get('has_admin', False)}
"
response_parts.append(f"{space_info}
")
-
+
# Room information
if report.get("rooms"):
rooms_info = "Rooms:
"
for room_id, room_data in report["rooms"].items():
rooms_info += f"- {room_id}: {room_data.get('status', 'Unknown')}
"
response_parts.append(f"{rooms_info}
")
-
+
# Issues
if report.get("issues"):
issues_html = "
".join(report["issues"])
response_parts.append(f"Issues:
{issues_html}
")
-
+
# Warnings
if report.get("warnings"):
warnings_html = "
".join(report["warnings"])
response_parts.append(f"Warnings:
{warnings_html}
")
-
+
if not response_parts:
response_parts.append("No issues found.
")
-
+
return "".join(response_parts)
diff --git a/community/helpers/room_creation_utils.py b/community/helpers/room_creation_utils.py
index 5e79a26..2001a5f 100644
--- a/community/helpers/room_creation_utils.py
+++ b/community/helpers/room_creation_utils.py
@@ -8,17 +8,15 @@ from mautrix.client import Client
async def validate_room_creation_params(
- roomname: str,
- config: dict,
- evt: Optional[MessageEvent] = None
+ roomname: str, config: dict, evt: Optional[MessageEvent] = None
) -> Tuple[str, bool, bool, str]:
"""Validate and process room creation parameters.
-
+
Args:
roomname: Original room name
config: Bot configuration
evt: Optional MessageEvent for error responses
-
+
Returns:
Tuple of (sanitized_name, force_encryption, force_unencryption, error_msg)
"""
@@ -27,23 +25,23 @@ async def validate_room_creation_params(
unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?(\s+|$)")
force_encryption = bool(encrypted_flag_regex.search(roomname))
force_unencryption = bool(unencrypted_flag_regex.search(roomname))
-
+
# Clean up room name
if force_encryption:
roomname = encrypted_flag_regex.sub("", roomname) # Remove encryption flag
if force_unencryption:
roomname = unencrypted_flag_regex.sub("", roomname) # Remove unencryption flag
-
+
# Clean up any extra whitespace
roomname = re.sub(r"\s+", " ", roomname).strip()
-
+
sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", roomname).lower()
-
+
# Check if community slug is configured
if not config.get("community_slug", ""):
error_msg = "No community slug configured. Please run initialize command first."
return sanitized_name, force_encryption, force_unencryption, error_msg, roomname
-
+
return sanitized_name, force_encryption, force_unencryption, "", roomname
@@ -51,27 +49,27 @@ async def prepare_room_creation_data(
sanitized_name: str,
config: dict,
client: Client,
- invitees: Optional[List[str]] = None
+ invitees: Optional[List[str]] = None,
) -> Tuple[str, str, List[str], str]:
"""Prepare data needed for room creation.
-
+
Args:
sanitized_name: Sanitized room name
config: Bot configuration
client: Matrix client
invitees: Optional list of users to invite
-
+
Returns:
Tuple of (alias_localpart, server, room_invitees, parent_room)
"""
# Create alias with community slug
alias_localpart = f"{sanitized_name}-{config.get('community_slug', '')}"
-
+
# Get server and invitees
server = client.parse_user_id(client.mxid)[1]
room_invitees = invitees if invitees is not None else config.get("invitees", [])
parent_room = config.get("parent_room", "")
-
+
return alias_localpart, server, room_invitees, parent_room
@@ -79,34 +77,38 @@ async def prepare_power_levels(
client: Client,
config: dict,
parent_room: str,
- power_level_override: Optional[PowerLevelStateEventContent] = None
+ power_level_override: Optional[PowerLevelStateEventContent] = None,
) -> PowerLevelStateEventContent:
"""Prepare power levels for room creation.
-
+
Args:
client: Matrix client
config: Bot configuration
parent_room: Parent room ID
power_level_override: Optional existing power level override
-
+
Returns:
PowerLevelStateEventContent for room creation
"""
if power_level_override:
return power_level_override
-
+
if parent_room:
try:
# Get parent room power levels to extract user power levels
parent_power_levels = await client.get_state_event(
parent_room, EventType.ROOM_POWER_LEVELS
)
-
+
# Create new power levels with server defaults, not copying all permissions from space
power_levels = PowerLevelStateEventContent()
-
+
# Copy only user power levels from parent space, not the entire permission set
- if parent_power_levels and hasattr(parent_power_levels, 'users') and parent_power_levels.users:
+ if (
+ parent_power_levels
+ and hasattr(parent_power_levels, "users")
+ and parent_power_levels.users
+ ):
try:
user_power_levels = parent_power_levels.users.copy()
# Ensure bot has highest power
@@ -121,10 +123,10 @@ async def prepare_power_levels(
power_levels.users = {
client.mxid: 1000, # Bot gets highest power
}
-
+
# Set explicit config values
power_levels.invite = config.get("invite_power_level", 50)
-
+
return power_levels
except Exception as e:
# If we can't get parent power levels, create default ones
@@ -150,10 +152,10 @@ def prepare_initial_state(
server: str,
force_encryption: bool,
force_unencryption: bool,
- creation_content: Optional[Dict[str, Any]] = None
+ creation_content: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""Prepare initial state events for room creation.
-
+
Args:
config: Bot configuration
parent_room: Parent room ID
@@ -161,66 +163,67 @@ def prepare_initial_state(
force_encryption: Whether to force encryption
force_unencryption: Whether to force no encryption
creation_content: Optional creation content
-
+
Returns:
List of initial state events
"""
initial_state = []
-
+
# Only add space parent state if we have a parent room
if parent_room:
- initial_state.extend([
- {
- "type": str(EventType.SPACE_PARENT),
- "state_key": parent_room,
- "content": {
- "via": [server],
- "canonical": True
- }
- },
- {
- "type": str(EventType.ROOM_JOIN_RULES),
- "content": {
- "join_rule": "restricted",
- "allow": [{
- "type": "m.room_membership",
- "room_id": parent_room
- }]
- }
- }
- ])
-
+ initial_state.extend(
+ [
+ {
+ "type": str(EventType.SPACE_PARENT),
+ "state_key": parent_room,
+ "content": {"via": [server], "canonical": True},
+ },
+ {
+ "type": str(EventType.ROOM_JOIN_RULES),
+ "content": {
+ "join_rule": "restricted",
+ "allow": [
+ {"type": "m.room_membership", "room_id": parent_room}
+ ],
+ },
+ },
+ ]
+ )
+
# Add encryption if needed
if (config.get("encrypt", False) and not force_unencryption) or force_encryption:
- initial_state.append({
- "type": str(EventType.ROOM_ENCRYPTION),
- "content": {
- "algorithm": "m.megolm.v1.aes-sha2"
+ initial_state.append(
+ {
+ "type": str(EventType.ROOM_ENCRYPTION),
+ "content": {"algorithm": "m.megolm.v1.aes-sha2"},
}
- })
-
+ )
+
# Add history visibility if specified in creation_content
if creation_content and "m.room.history_visibility" in creation_content:
- initial_state.append({
- "type": str(EventType.ROOM_HISTORY_VISIBILITY),
- "content": {
- "history_visibility": creation_content.get("m.room.history_visibility", "joined")
+ initial_state.append(
+ {
+ "type": str(EventType.ROOM_HISTORY_VISIBILITY),
+ "content": {
+ "history_visibility": creation_content.get(
+ "m.room.history_visibility", "joined"
+ )
+ },
}
- })
-
+ )
+
return initial_state
def adjust_power_levels_for_modern_rooms(
- power_levels: PowerLevelStateEventContent,
- room_version: str
+ power_levels: PowerLevelStateEventContent, room_version: str
) -> PowerLevelStateEventContent:
"""Adjust power levels for modern room versions.
-
+
Args:
power_levels: Power level state content
room_version: Room version string
-
+
Returns:
Adjusted power level state content
"""
@@ -229,20 +232,18 @@ def adjust_power_levels_for_modern_rooms(
if room_version and int(room_version) >= 12 and power_levels:
if power_levels.users:
# Remove bot from users list but keep other important settings
- power_levels.users.pop("bot_mxid", None) # Will be replaced with actual bot mxid
-
+ power_levels.users.pop(
+ "bot_mxid", None
+ ) # Will be replaced with actual bot mxid
+
return power_levels
async def add_room_to_space(
- client: Client,
- parent_room: str,
- room_id: str,
- server: str,
- sleep_duration: float
+ client: Client, parent_room: str, room_id: str, server: str, sleep_duration: float
) -> None:
"""Add created room to parent space.
-
+
Args:
client: Matrix client
parent_room: Parent room ID
@@ -254,23 +255,17 @@ async def add_room_to_space(
await client.send_state_event(
parent_room,
EventType.SPACE_CHILD,
- {
- "via": [server],
- "suggested": False
- },
- state_key=room_id
+ {"via": [server], "suggested": False},
+ state_key=room_id,
)
await asyncio.sleep(sleep_duration)
async def verify_room_creation(
- client: Client,
- room_id: str,
- expected_version: str,
- logger
+ client: Client, room_id: str, expected_version: str, logger
) -> None:
"""Verify that room was created with correct settings.
-
+
Args:
client: Matrix client
room_id: Created room ID
@@ -279,9 +274,16 @@ async def verify_room_creation(
"""
try:
from .room_utils import get_room_version_and_creators
- actual_version, actual_creators = await get_room_version_and_creators(client, room_id, logger)
- logger.info(f"Room {room_id} created with version {actual_version} (requested: {expected_version})")
+
+ actual_version, actual_creators = await get_room_version_and_creators(
+ client, room_id, logger
+ )
+ logger.info(
+ f"Room {room_id} created with version {actual_version} (requested: {expected_version})"
+ )
if actual_version != expected_version:
- logger.warning(f"Room version mismatch: requested {expected_version}, got {actual_version}")
+ logger.warning(
+ f"Room version mismatch: requested {expected_version}, got {actual_version}"
+ )
except Exception as e:
logger.warning(f"Could not verify room version for {room_id}: {e}")
diff --git a/community/helpers/room_utils.py b/community/helpers/room_utils.py
index f77644a..35a8315 100644
--- a/community/helpers/room_utils.py
+++ b/community/helpers/room_utils.py
@@ -8,12 +8,12 @@ from mautrix.errors import MNotFound
async def validate_room_alias(client, alias_localpart: str, server: str) -> bool:
"""Check if a room alias already exists.
-
+
Args:
client: Matrix client instance
alias_localpart: The localpart of the alias (without # and :server)
server: The server domain
-
+
Returns:
bool: True if alias is available, False if it already exists
"""
@@ -30,76 +30,81 @@ async def validate_room_alias(client, alias_localpart: str, server: str) -> bool
return True
-async def validate_room_aliases(client, room_names: list[str], community_slug: str, server: str) -> Tuple[bool, List[str]]:
+async def validate_room_aliases(
+ client, room_names: list[str], community_slug: str, server: str
+) -> Tuple[bool, List[str]]:
"""Validate that all room aliases are available.
-
+
Args:
client: Matrix client instance
room_names: List of room names to validate
community_slug: The community slug to append
server: The server domain
-
+
Returns:
tuple: (is_valid, list_of_conflicting_aliases)
"""
if not community_slug:
return False, []
-
+
conflicting_aliases = []
-
+
for room_name in room_names:
# Clean the room name and create alias
from .message_utils import sanitize_room_name
+
sanitized_name = sanitize_room_name(room_name)
alias_localpart = f"{sanitized_name}-{community_slug}"
-
+
# Check if alias is available
is_available = await validate_room_alias(client, alias_localpart, server)
if not is_available:
conflicting_aliases.append(f"#{alias_localpart}:{server}")
-
+
return len(conflicting_aliases) == 0, conflicting_aliases
-async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tuple[str, List[str]]:
+async def get_room_version_and_creators(
+ client, room_id: str, logger=None
+) -> Tuple[str, List[str]]:
"""Get the room version and creators for a room.
-
+
Args:
client: Matrix client instance
room_id: The room ID to check
-
+
Returns:
tuple: (room_version, list_of_creators)
"""
try:
# Get all state events to find the creation event
state_events = await client.get_state(room_id)
-
+
# Find the m.room.create event
creation_event = None
for event in state_events:
if event.type == EventType.ROOM_CREATE:
creation_event = event
break
-
+
if not creation_event:
# Default to version 1 if no creation event found
return "1", []
-
+
room_version = creation_event.content.get("room_version", "1")
creators = []
-
+
# Add the sender of the creation event as a creator
if creation_event.sender:
creators.append(creation_event.sender)
-
+
# Add any additional creators from the content
additional_creators = creation_event.content.get("additional_creators", [])
if isinstance(additional_creators, list):
creators.extend(additional_creators)
-
+
return room_version, creators
-
+
except Exception:
# Default to version 1 if there's an error
return "1", []
@@ -107,10 +112,10 @@ async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tu
def is_modern_room_version(room_version: str) -> bool:
"""Check if a room version is 12 or newer (modern room versions).
-
+
Args:
room_version: The room version string to check
-
+
Returns:
bool: True if room version is 12 or newer
"""
@@ -124,36 +129,38 @@ def is_modern_room_version(room_version: str) -> bool:
async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
"""Check if a user has unlimited power in a room (creator in modern room versions).
-
+
Args:
client: Matrix client instance
user_id: The user ID to check
room_id: The room ID to check in
-
+
Returns:
bool: True if user has unlimited power
"""
try:
- room_version, creators = await get_room_version_and_creators(client, room_id, None)
-
+ room_version, creators = await get_room_version_and_creators(
+ client, room_id, None
+ )
+
# In modern room versions (12+), creators have unlimited power
if is_modern_room_version(room_version):
return user_id in creators
-
+
# In older room versions, creators don't have special unlimited power
return False
-
+
except Exception:
return False
async def get_moderators_and_above(client, parent_room: str) -> List[str]:
"""Get list of users with moderator or higher permissions from the parent space.
-
+
Args:
client: Matrix client instance
parent_room: The parent room ID
-
+
Returns:
list: List of user IDs with power level >= 50 (moderator or above)
"""
diff --git a/community/helpers/user_utils.py b/community/helpers/user_utils.py
index d473660..675d6da 100644
--- a/community/helpers/user_utils.py
+++ b/community/helpers/user_utils.py
@@ -10,13 +10,13 @@ from mautrix.errors import MNotFound
async def check_if_banned(client, userid: str, banlists: List[str], logger) -> bool:
"""Check if a user is banned according to banlists.
-
+
Args:
client: Matrix client instance
userid: The user ID to check
banlists: List of banlist room IDs or aliases
logger: Logger instance for error reporting
-
+
Returns:
bool: True if user is banned
"""
@@ -42,25 +42,25 @@ async def check_if_banned(client, userid: str, banlists: List[str], logger) -> b
for rule in user_policies:
try:
- if bool(
- fnmatch.fnmatch(userid, rule["content"]["entity"])
- ) and bool(re.search("ban$", rule["content"]["recommendation"])):
+ if bool(fnmatch.fnmatch(userid, rule["content"]["entity"])) and bool(
+ re.search("ban$", rule["content"]["recommendation"])
+ ):
return True
except Exception:
# Skip invalid rules
pass
-
+
return is_banned
async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]:
"""Get room IDs for all configured banlists.
-
+
Args:
client: Matrix client instance
banlists: List of banlist room IDs or aliases
logger: Logger instance for error reporting
-
+
Returns:
list: List of room IDs for banlists
"""
@@ -81,12 +81,20 @@ async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]:
return banlist_roomids
-async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: str = "banned",
- all_rooms: bool = False, redact_on_ban: bool = False,
- get_messages_to_redact_func=None, database=None,
- sleep_time: float = 0.1, logger=None) -> Dict:
+async def ban_user_from_rooms(
+ client,
+ user: str,
+ roomlist: List[str],
+ reason: str = "banned",
+ all_rooms: bool = False,
+ redact_on_ban: bool = False,
+ get_messages_to_redact_func=None,
+ database=None,
+ sleep_time: float = 0.1,
+ logger=None,
+) -> Dict:
"""Ban a user from a list of rooms.
-
+
Args:
client: Matrix client instance
user: User ID to ban
@@ -98,13 +106,13 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st
database: Database instance for redaction tasks
sleep_time: Sleep time between operations
logger: Logger instance
-
+
Returns:
dict: Ban results with success/error lists
"""
ban_event_map = {"ban_list": {}, "error_list": {}}
ban_event_map["ban_list"][user] = []
-
+
for room in roomlist:
try:
roomname = None
@@ -149,10 +157,16 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st
return ban_event_map
-async def user_permitted(client, user_id: UserID, parent_room: str, min_level: int = 50,
- room_id: str = None, logger=None) -> bool:
+async def user_permitted(
+ client,
+ user_id: UserID,
+ parent_room: str,
+ min_level: int = 50,
+ room_id: str = None,
+ logger=None,
+) -> bool:
"""Check if a user has sufficient power level in a room.
-
+
Args:
client: Matrix client instance
user_id: The Matrix ID of the user to check
@@ -160,18 +174,19 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i
min_level: Minimum required power level (default 50 for moderator)
room_id: The room ID to check permissions in. If None, uses parent room.
logger: Logger instance for error reporting
-
+
Returns:
bool: True if user has sufficient power level
"""
try:
target_room = room_id or parent_room
-
+
# First check if user has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power
+
if await user_has_unlimited_power(client, user_id, target_room):
return True
-
+
# Then check power level
power_levels = await client.get_state_event(
target_room, EventType.ROOM_POWER_LEVELS
@@ -186,14 +201,15 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i
async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
"""Check if a user has unlimited power in a room (creator in modern room versions).
-
+
Args:
client: Matrix client instance
user_id: The user ID to check
room_id: The room ID to check in
-
+
Returns:
bool: True if user has unlimited power
"""
from .room_utils import user_has_unlimited_power as room_user_has_unlimited_power
+
return await room_user_has_unlimited_power(client, user_id, room_id)