formatting

This commit is contained in:
William Kray
2025-09-09 21:21:51 -07:00
parent 5f42420619
commit a92759c100
14 changed files with 1380 additions and 967 deletions
+801 -446
View File
File diff suppressed because it is too large Load Diff
+14 -1
View File
@@ -1,2 +1,15 @@
# Helper modules for community bot # Helper modules for community bot
from . import message_utils, room_utils, user_utils, database_utils, report_utils, decorators, common_utils, room_creation_utils, config_manager, response_builder, diagnostic_utils, base_command_handler from . import (
message_utils,
room_utils,
user_utils,
database_utils,
report_utils,
decorators,
common_utils,
room_creation_utils,
config_manager,
response_builder,
diagnostic_utils,
base_command_handler,
)
+62 -52
View File
@@ -8,10 +8,10 @@ from .decorators import require_permission, require_parent_room, handle_errors
class BaseCommandHandler(ABC): class BaseCommandHandler(ABC):
"""Base class for command handlers with common patterns.""" """Base class for command handlers with common patterns."""
def __init__(self, bot): def __init__(self, bot):
"""Initialize with bot instance. """Initialize with bot instance.
Args: Args:
bot: CommunityBot instance bot: CommunityBot instance
""" """
@@ -21,92 +21,96 @@ class BaseCommandHandler(ABC):
self.config_manager = bot.config_manager self.config_manager = bot.config_manager
self.log = bot.log self.log = bot.log
self.database = bot.database self.database = bot.database
@abstractmethod @abstractmethod
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the command logic. """Execute the command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
pass pass
async def check_permissions(self, evt: MessageEvent, min_level: int = 50, room_id: str = None) -> bool: async def check_permissions(
self, evt: MessageEvent, min_level: int = 50, room_id: str = None
) -> bool:
"""Check if user has required permissions. """Check if user has required permissions.
Args: Args:
evt: Message event evt: Message event
min_level: Minimum required power level min_level: Minimum required power level
room_id: Room ID to check permissions in room_id: Room ID to check permissions in
Returns: Returns:
bool: True if user has permissions bool: True if user has permissions
""" """
return await self.bot.user_permitted(evt.sender, min_level, room_id) return await self.bot.user_permitted(evt.sender, min_level, room_id)
async def check_parent_room(self, evt: MessageEvent) -> bool: async def check_parent_room(self, evt: MessageEvent) -> bool:
"""Check if parent room is configured. """Check if parent room is configured.
Args: Args:
evt: Message event evt: Message event
Returns: Returns:
bool: True if parent room is configured bool: True if parent room is configured
""" """
return await self.bot.check_parent_room(evt) return await self.bot.check_parent_room(evt)
async def reply_error(self, evt: MessageEvent, message: str) -> None: async def reply_error(self, evt: MessageEvent, message: str) -> None:
"""Reply with an error message. """Reply with an error message.
Args: Args:
evt: Message event evt: Message event
message: Error message message: Error message
""" """
await evt.reply(message) await evt.reply(message)
async def reply_success(self, evt: MessageEvent, message: str) -> None: async def reply_success(self, evt: MessageEvent, message: str) -> None:
"""Reply with a success message. """Reply with a success message.
Args: Args:
evt: Message event evt: Message event
message: Success message message: Success message
""" """
await evt.reply(message) await evt.reply(message)
async def respond_html(self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None) -> None: async def respond_html(
self, evt: MessageEvent, message: str, edits: Optional[MessageEvent] = None
) -> None:
"""Respond with HTML content. """Respond with HTML content.
Args: Args:
evt: Message event evt: Message event
message: HTML message message: HTML message
edits: Optional message to edit edits: Optional message to edit
""" """
await evt.respond(message, allow_html=True, edits=edits) await evt.respond(message, allow_html=True, edits=edits)
def is_tracking_enabled(self) -> bool: def is_tracking_enabled(self) -> bool:
"""Check if user tracking is enabled. """Check if user tracking is enabled.
Returns: Returns:
bool: True if tracking is enabled bool: True if tracking is enabled
""" """
return self.config_manager.is_tracking_enabled() return self.config_manager.is_tracking_enabled()
def is_verification_enabled(self) -> bool: def is_verification_enabled(self) -> bool:
"""Check if verification is enabled. """Check if verification is enabled.
Returns: Returns:
bool: True if verification is enabled bool: True if verification is enabled
""" """
return self.config_manager.is_verification_enabled() return self.config_manager.is_verification_enabled()
def get_parent_room(self) -> Optional[str]: def get_parent_room(self) -> Optional[str]:
"""Get the parent room ID. """Get the parent room ID.
Returns: Returns:
str: Parent room ID or None str: Parent room ID or None
""" """
@@ -115,23 +119,23 @@ class BaseCommandHandler(ABC):
class TrackingCommandHandler(BaseCommandHandler): class TrackingCommandHandler(BaseCommandHandler):
"""Base handler for commands that require user tracking.""" """Base handler for commands that require user tracking."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with tracking check.""" """Execute command with tracking check."""
if not self.is_tracking_enabled(): if not self.is_tracking_enabled():
await self.reply_error(evt, "user tracking is disabled") await self.reply_error(evt, "user tracking is disabled")
return return
return await self.execute_tracking_command(evt, *args, **kwargs) return await self.execute_tracking_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_tracking_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_tracking_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the tracking command logic. """Execute the tracking command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
@@ -140,23 +144,23 @@ class TrackingCommandHandler(BaseCommandHandler):
class AdminCommandHandler(BaseCommandHandler): class AdminCommandHandler(BaseCommandHandler):
"""Base handler for admin-only commands.""" """Base handler for admin-only commands."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with admin permission check.""" """Execute command with admin permission check."""
if not await self.check_permissions(evt, min_level=100): if not await self.check_permissions(evt, min_level=100):
await self.reply_error(evt, "You don't have permission to use this command") await self.reply_error(evt, "You don't have permission to use this command")
return return
return await self.execute_admin_command(evt, *args, **kwargs) return await self.execute_admin_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the admin command logic. """Execute the admin command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
@@ -165,23 +169,25 @@ class AdminCommandHandler(BaseCommandHandler):
class ModeratorCommandHandler(BaseCommandHandler): class ModeratorCommandHandler(BaseCommandHandler):
"""Base handler for moderator commands.""" """Base handler for moderator commands."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with moderator permission check.""" """Execute command with moderator permission check."""
if not await self.check_permissions(evt, min_level=50): if not await self.check_permissions(evt, min_level=50):
await self.reply_error(evt, "You don't have permission to use this command") await self.reply_error(evt, "You don't have permission to use this command")
return return
return await self.execute_moderator_command(evt, *args, **kwargs) return await self.execute_moderator_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_moderator_command(
self, evt: MessageEvent, *args, **kwargs
) -> Any:
"""Execute the moderator command logic. """Execute the moderator command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
@@ -190,22 +196,22 @@ class ModeratorCommandHandler(BaseCommandHandler):
class SpaceCommandHandler(BaseCommandHandler): class SpaceCommandHandler(BaseCommandHandler):
"""Base handler for commands that require parent space.""" """Base handler for commands that require parent space."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with parent space check.""" """Execute command with parent space check."""
if not await self.check_parent_room(evt): if not await self.check_parent_room(evt):
return return
return await self.execute_space_command(evt, *args, **kwargs) return await self.execute_space_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_space_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_space_command(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute the space command logic. """Execute the space command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
@@ -214,7 +220,7 @@ class SpaceCommandHandler(BaseCommandHandler):
class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler): class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler):
"""Base handler for commands that require both parent space and moderator permissions.""" """Base handler for commands that require both parent space and moderator permissions."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with both space and moderator checks.""" """Execute command with both space and moderator checks."""
if not await self.check_parent_room(evt): if not await self.check_parent_room(evt):
@@ -223,16 +229,18 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler)
await self.reply_error(evt, "You don't have permission to use this command") await self.reply_error(evt, "You don't have permission to use this command")
return return
return await self.execute_space_moderator_command(evt, *args, **kwargs) return await self.execute_space_moderator_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_space_moderator_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_space_moderator_command(
self, evt: MessageEvent, *args, **kwargs
) -> Any:
"""Execute the space moderator command logic. """Execute the space moderator command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
@@ -241,7 +249,7 @@ class SpaceModeratorCommandHandler(SpaceCommandHandler, ModeratorCommandHandler)
class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler): class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler):
"""Base handler for commands that require both parent space and admin permissions.""" """Base handler for commands that require both parent space and admin permissions."""
async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute(self, evt: MessageEvent, *args, **kwargs) -> Any:
"""Execute command with both space and admin checks.""" """Execute command with both space and admin checks."""
if not await self.check_parent_room(evt): if not await self.check_parent_room(evt):
@@ -250,16 +258,18 @@ class SpaceAdminCommandHandler(SpaceCommandHandler, AdminCommandHandler):
await self.reply_error(evt, "You don't have permission to use this command") await self.reply_error(evt, "You don't have permission to use this command")
return return
return await self.execute_space_admin_command(evt, *args, **kwargs) return await self.execute_space_admin_command(evt, *args, **kwargs)
@abstractmethod @abstractmethod
async def execute_space_admin_command(self, evt: MessageEvent, *args, **kwargs) -> Any: async def execute_space_admin_command(
self, evt: MessageEvent, *args, **kwargs
) -> Any:
"""Execute the space admin command logic. """Execute the space admin command logic.
Args: Args:
evt: Message event evt: Message event
*args: Command arguments *args: Command arguments
**kwargs: Additional keyword arguments **kwargs: Additional keyword arguments
Returns: Returns:
Command result Command result
""" """
+10 -10
View File
@@ -6,12 +6,12 @@ from mautrix.types import EventType, MessageEvent
async def get_room_name(client, room_id: str, logger) -> Optional[str]: async def get_room_name(client, room_id: str, logger) -> Optional[str]:
"""Get room name from room ID. """Get room name from room ID.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_id: Room ID to get name for room_id: Room ID to get name for
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
str: Room name or None if not found/error str: Room name or None if not found/error
""" """
@@ -25,12 +25,12 @@ async def get_room_name(client, room_id: str, logger) -> Optional[str]:
async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]: async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]:
"""Get power levels for a room. """Get power levels for a room.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_id: Room ID to get power levels for room_id: Room ID to get power levels for
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
PowerLevelStateEventContent or None if error PowerLevelStateEventContent or None if error
""" """
@@ -43,13 +43,13 @@ async def get_room_power_levels(client, room_id: str, logger) -> Optional[Any]:
async def check_room_membership(client, room_id: str, user_id: str, logger) -> bool: async def check_room_membership(client, room_id: str, user_id: str, logger) -> bool:
"""Check if a user is a member of a room. """Check if a user is a member of a room.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_id: Room ID to check room_id: Room ID to check
user_id: User ID to check user_id: User ID to check
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
bool: True if user is a member, False otherwise bool: True if user is a member, False otherwise
""" """
@@ -62,11 +62,11 @@ async def check_room_membership(client, room_id: str, user_id: str, logger) -> b
def format_room_info(room_id: str, room_name: Optional[str] = None) -> str: def format_room_info(room_id: str, room_name: Optional[str] = None) -> str:
"""Format room information for display. """Format room information for display.
Args: Args:
room_id: Room ID room_id: Room ID
room_name: Optional room name room_name: Optional room name
Returns: Returns:
str: Formatted room info str: Formatted room info
""" """
@@ -77,12 +77,12 @@ def format_room_info(room_id: str, room_name: Optional[str] = None) -> str:
def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any: def safe_get(dictionary: Dict[str, Any], key: str, default: Any = None) -> Any:
"""Safely get a value from a dictionary with a default. """Safely get a value from a dictionary with a default.
Args: Args:
dictionary: Dictionary to get value from dictionary: Dictionary to get value from
key: Key to look up key: Key to look up
default: Default value if key not found default: Default value if key not found
Returns: Returns:
Value from dictionary or default Value from dictionary or default
""" """
+64 -69
View File
@@ -5,219 +5,214 @@ from typing import List, Dict, Any, Optional
class ConfigManager: class ConfigManager:
"""Centralized configuration management for the community bot.""" """Centralized configuration management for the community bot."""
def __init__(self, config: Dict[str, Any]): def __init__(self, config: Dict[str, Any]):
"""Initialize with bot configuration. """Initialize with bot configuration.
Args: Args:
config: Bot configuration dictionary config: Bot configuration dictionary
""" """
self.config = config self.config = config
def is_tracking_enabled(self) -> bool: def is_tracking_enabled(self) -> bool:
"""Check if user tracking is enabled. """Check if user tracking is enabled.
Returns: Returns:
bool: True if tracking is enabled bool: True if tracking is enabled
""" """
track_users = self.config.get("track_users", []) track_users = self.config.get("track_users", [])
# Handle legacy boolean configuration # Handle legacy boolean configuration
if isinstance(track_users, bool): if isinstance(track_users, bool):
return track_users return track_users
# Handle new list configuration # Handle new list configuration
return isinstance(track_users, list) and len(track_users) > 0 return isinstance(track_users, list) and len(track_users) > 0
def is_message_tracking_enabled(self) -> bool: def is_message_tracking_enabled(self) -> bool:
"""Check if message tracking is enabled. """Check if message tracking is enabled.
Returns: Returns:
bool: True if message tracking is enabled bool: True if message tracking is enabled
""" """
track_users = self.config.get("track_users", []) track_users = self.config.get("track_users", [])
# Handle legacy boolean configuration - if True, enable both messages and reactions # Handle legacy boolean configuration - if True, enable both messages and reactions
if isinstance(track_users, bool): if isinstance(track_users, bool):
return track_users return track_users
# Handle new list configuration # Handle new list configuration
return isinstance(track_users, list) and "messages" in track_users return isinstance(track_users, list) and "messages" in track_users
def is_reaction_tracking_enabled(self) -> bool: def is_reaction_tracking_enabled(self) -> bool:
"""Check if reaction tracking is enabled. """Check if reaction tracking is enabled.
Returns: Returns:
bool: True if reaction tracking is enabled bool: True if reaction tracking is enabled
""" """
track_users = self.config.get("track_users", []) track_users = self.config.get("track_users", [])
# Handle legacy boolean configuration - if True, enable both messages and reactions # Handle legacy boolean configuration - if True, enable both messages and reactions
if isinstance(track_users, bool): if isinstance(track_users, bool):
return track_users return track_users
# Handle new list configuration # Handle new list configuration
return isinstance(track_users, list) and "reactions" in track_users return isinstance(track_users, list) and "reactions" in track_users
def is_verification_enabled(self) -> bool: def is_verification_enabled(self) -> bool:
"""Check if verification is enabled. """Check if verification is enabled.
Returns: Returns:
bool: True if verification is enabled bool: True if verification is enabled
""" """
return self.config.get("verification_enabled", False) return self.config.get("verification_enabled", False)
def is_proactive_banning_enabled(self) -> bool: def is_proactive_banning_enabled(self) -> bool:
"""Check if proactive banning is enabled. """Check if proactive banning is enabled.
Returns: Returns:
bool: True if proactive banning is enabled bool: True if proactive banning is enabled
""" """
return self.config.get("proactive_banning", False) return self.config.get("proactive_banning", False)
def is_encryption_enabled(self) -> bool: def is_encryption_enabled(self) -> bool:
"""Check if encryption is enabled by default. """Check if encryption is enabled by default.
Returns: Returns:
bool: True if encryption is enabled bool: True if encryption is enabled
""" """
return self.config.get("encrypt", False) return self.config.get("encrypt", False)
def get_room_version(self) -> str: def get_room_version(self) -> str:
"""Get the configured room version. """Get the configured room version.
Returns: Returns:
str: Room version string str: Room version string
""" """
return self.config.get("room_version", "1") return self.config.get("room_version", "1")
def get_community_slug(self) -> Optional[str]: def get_community_slug(self) -> Optional[str]:
"""Get the community slug. """Get the community slug.
Returns: Returns:
str: Community slug or None if not configured str: Community slug or None if not configured
""" """
return self.config.get("community_slug") return self.config.get("community_slug")
def get_parent_room(self) -> Optional[str]: def get_parent_room(self) -> Optional[str]:
"""Get the parent room ID. """Get the parent room ID.
Returns: Returns:
str: Parent room ID or None if not configured str: Parent room ID or None if not configured
""" """
return self.config.get("parent_room") return self.config.get("parent_room")
def get_invitees(self) -> List[str]: def get_invitees(self) -> List[str]:
"""Get the list of users to invite to new rooms. """Get the list of users to invite to new rooms.
Returns: Returns:
List[str]: List of user IDs to invite List[str]: List of user IDs to invite
""" """
return self.config.get("invitees", []) return self.config.get("invitees", [])
def get_invite_power_level(self) -> int: def get_invite_power_level(self) -> int:
"""Get the power level required to invite users. """Get the power level required to invite users.
Returns: Returns:
int: Power level for inviting users int: Power level for inviting users
""" """
return self.config.get("invite_power_level", 50) return self.config.get("invite_power_level", 50)
def get_sleep_duration(self) -> float: def get_sleep_duration(self) -> float:
"""Get the sleep duration between operations. """Get the sleep duration between operations.
Returns: Returns:
float: Sleep duration in seconds float: Sleep duration in seconds
""" """
return self.config.get("sleep", 1.0) return self.config.get("sleep", 1.0)
def get_welcome_sleep_duration(self) -> float: def get_welcome_sleep_duration(self) -> float:
"""Get the sleep duration for welcome messages. """Get the sleep duration for welcome messages.
Returns: Returns:
float: Welcome sleep duration in seconds float: Welcome sleep duration in seconds
""" """
return self.config.get("welcome_sleep", 2.0) return self.config.get("welcome_sleep", 2.0)
def get_warn_threshold_days(self) -> int: def get_warn_threshold_days(self) -> int:
"""Get the warning threshold for inactive users. """Get the warning threshold for inactive users.
Returns: Returns:
int: Number of days before warning int: Number of days before warning
""" """
return self.config.get("warn_threshold_days", 30) return self.config.get("warn_threshold_days", 30)
def get_kick_threshold_days(self) -> int: def get_kick_threshold_days(self) -> int:
"""Get the kick threshold for inactive users. """Get the kick threshold for inactive users.
Returns: Returns:
int: Number of days before kicking int: Number of days before kicking
""" """
return self.config.get("kick_threshold_days", 60) return self.config.get("kick_threshold_days", 60)
def get_verification_phrase(self) -> str: def get_verification_phrase(self) -> str:
"""Get the verification phrase. """Get the verification phrase.
Returns: Returns:
str: Verification phrase str: Verification phrase
""" """
return self.config.get("verification_phrase", "I agree to the rules") return self.config.get("verification_phrase", "I agree to the rules")
def get_verification_attempts(self) -> int: def get_verification_attempts(self) -> int:
"""Get the maximum verification attempts. """Get the maximum verification attempts.
Returns: Returns:
int: Maximum verification attempts int: Maximum verification attempts
""" """
return self.config.get("verification_attempts", 3) return self.config.get("verification_attempts", 3)
def get_verification_timeout(self) -> int: def get_verification_timeout(self) -> int:
"""Get the verification timeout in seconds. """Get the verification timeout in seconds.
Returns: Returns:
int: Verification timeout in seconds int: Verification timeout in seconds
""" """
return self.config.get("verification_timeout", 300) return self.config.get("verification_timeout", 300)
def get_banlist_rooms(self) -> List[str]: def get_banlist_rooms(self) -> List[str]:
"""Get the list of banlist rooms. """Get the list of banlist rooms.
Returns: Returns:
List[str]: List of banlist room IDs or aliases List[str]: List of banlist room IDs or aliases
""" """
return self.config.get("banlist_rooms", []) return self.config.get("banlist_rooms", [])
def get_redaction_rooms(self) -> List[str]: def get_redaction_rooms(self) -> List[str]:
"""Get the list of rooms for redaction. """Get the list of rooms for redaction.
Returns: Returns:
List[str]: List of room IDs for redaction List[str]: List of room IDs for redaction
""" """
return self.config.get("redaction_rooms", []) return self.config.get("redaction_rooms", [])
def validate_required_configs(self) -> List[str]: def validate_required_configs(self) -> List[str]:
"""Validate that all required configurations are present. """Validate that all required configurations are present.
Returns: Returns:
List[str]: List of missing required configuration keys List[str]: List of missing required configuration keys
""" """
required_configs = [ required_configs = ["parent_room", "room_version", "community_slug"]
"parent_room",
"room_version",
"community_slug"
]
missing = [] missing = []
for config_key in required_configs: for config_key in required_configs:
if not self.config.get(config_key): if not self.config.get(config_key):
missing.append(config_key) missing.append(config_key)
return missing return missing
def is_modern_room_version(self) -> bool: def is_modern_room_version(self) -> bool:
"""Check if the configured room version is modern (12+). """Check if the configured room version is modern (12+).
Returns: Returns:
bool: True if room version is 12 or higher bool: True if room version is 12 or higher
""" """
@@ -226,10 +221,10 @@ class ConfigManager:
return version >= 12 return version >= 12
except (ValueError, TypeError): except (ValueError, TypeError):
return False return False
def get_room_creation_settings(self) -> Dict[str, Any]: def get_room_creation_settings(self) -> Dict[str, Any]:
"""Get settings specific to room creation. """Get settings specific to room creation.
Returns: Returns:
Dict[str, Any]: Room creation settings Dict[str, Any]: Room creation settings
""" """
@@ -239,12 +234,12 @@ class ConfigManager:
"invitees": self.get_invitees(), "invitees": self.get_invitees(),
"invite_power_level": self.get_invite_power_level(), "invite_power_level": self.get_invite_power_level(),
"encrypt": self.is_encryption_enabled(), "encrypt": self.is_encryption_enabled(),
"parent_room": self.get_parent_room() "parent_room": self.get_parent_room(),
} }
def get_tracking_settings(self) -> Dict[str, Any]: def get_tracking_settings(self) -> Dict[str, Any]:
"""Get settings specific to user tracking. """Get settings specific to user tracking.
Returns: Returns:
Dict[str, Any]: Tracking settings Dict[str, Any]: Tracking settings
""" """
@@ -253,12 +248,12 @@ class ConfigManager:
"track_messages": self.is_message_tracking_enabled(), "track_messages": self.is_message_tracking_enabled(),
"track_reactions": self.is_reaction_tracking_enabled(), "track_reactions": self.is_reaction_tracking_enabled(),
"warn_threshold_days": self.get_warn_threshold_days(), "warn_threshold_days": self.get_warn_threshold_days(),
"kick_threshold_days": self.get_kick_threshold_days() "kick_threshold_days": self.get_kick_threshold_days(),
} }
def get_verification_settings(self) -> Dict[str, Any]: def get_verification_settings(self) -> Dict[str, Any]:
"""Get settings specific to verification. """Get settings specific to verification.
Returns: Returns:
Dict[str, Any]: Verification settings Dict[str, Any]: Verification settings
""" """
@@ -266,5 +261,5 @@ class ConfigManager:
"verification_enabled": self.is_verification_enabled(), "verification_enabled": self.is_verification_enabled(),
"verification_phrase": self.get_verification_phrase(), "verification_phrase": self.get_verification_phrase(),
"verification_attempts": self.get_verification_attempts(), "verification_attempts": self.get_verification_attempts(),
"verification_timeout": self.get_verification_timeout() "verification_timeout": self.get_verification_timeout(),
} }
+46 -34
View File
@@ -8,13 +8,13 @@ from mautrix.types import PaginationDirection
async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> List: async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> List:
"""Get messages from a user in a room that should be redacted. """Get messages from a user in a room that should be redacted.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_id: The room ID to search in room_id: The room ID to search in
mxid: The user ID whose messages to find mxid: The user ID whose messages to find
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
list: List of message events to redact list: List of message events to redact
""" """
@@ -40,16 +40,18 @@ async def get_messages_to_redact(client, room_id: str, mxid: str, logger) -> Lis
return [] return []
async def redact_messages(client, database, room_id: str, sleep_time: float, logger) -> Dict[str, int]: async def redact_messages(
client, database, room_id: str, sleep_time: float, logger
) -> Dict[str, int]:
"""Redact messages queued for redaction in a room. """Redact messages queued for redaction in a room.
Args: Args:
client: Matrix client instance client: Matrix client instance
database: Database instance database: Database instance
room_id: The room ID to redact messages in room_id: The room ID to redact messages in
sleep_time: Sleep time between redactions sleep_time: Sleep time between redactions
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
dict: Counters for successful and failed redactions dict: Counters for successful and failed redactions
""" """
@@ -59,9 +61,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log
) )
for event in events: for event in events:
try: try:
await client.redact( await client.redact(room_id, event["event_id"], reason="content removed")
room_id, event["event_id"], reason="content removed"
)
counters["success"] += 1 counters["success"] += 1
await database.execute( await database.execute(
"DELETE FROM redaction_tasks WHERE event_id = $1", event["event_id"] "DELETE FROM redaction_tasks WHERE event_id = $1", event["event_id"]
@@ -81,7 +81,7 @@ async def redact_messages(client, database, room_id: str, sleep_time: float, log
async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) -> None: async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) -> None:
"""Insert or update user activity timestamp. """Insert or update user activity timestamp.
Args: Args:
database: Database instance database: Database instance
mxid: User Matrix ID mxid: User Matrix ID
@@ -103,16 +103,17 @@ async def upsert_user_timestamp(database, mxid: str, timestamp: int, logger) ->
logger.error(f"Failed to upsert user timestamp: {e}") logger.error(f"Failed to upsert user timestamp: {e}")
async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_days: int, async def get_inactive_users(
logger) -> Dict[str, List[str]]: database, warn_threshold_days: int, kick_threshold_days: int, logger
) -> Dict[str, List[str]]:
"""Get lists of users who should be warned or kicked for inactivity. """Get lists of users who should be warned or kicked for inactivity.
Args: Args:
database: Database instance database: Database instance
warn_threshold_days: Days threshold for warning warn_threshold_days: Days threshold for warning
kick_threshold_days: Days threshold for kicking kick_threshold_days: Days threshold for kicking
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
dict: Contains 'warn' and 'kick' lists of user IDs dict: Contains 'warn' and 'kick' lists of user IDs
""" """
@@ -120,7 +121,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
current_time = int(time.time()) current_time = int(time.time())
warn_threshold = current_time - (warn_threshold_days * 24 * 60 * 60) warn_threshold = current_time - (warn_threshold_days * 24 * 60 * 60)
kick_threshold = current_time - (kick_threshold_days * 24 * 60 * 60) kick_threshold = current_time - (kick_threshold_days * 24 * 60 * 60)
# Get users to warn # Get users to warn
warn_results = await database.fetch( warn_results = await database.fetch(
""" """
@@ -132,7 +133,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
warn_threshold, warn_threshold,
kick_threshold, kick_threshold,
) )
# Get users to kick # Get users to kick
kick_results = await database.fetch( kick_results = await database.fetch(
""" """
@@ -142,10 +143,10 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
""", """,
kick_threshold, kick_threshold,
) )
return { return {
"warn": [row["mxid"] for row in warn_results], "warn": [row["mxid"] for row in warn_results],
"kick": [row["mxid"] for row in kick_results] "kick": [row["mxid"] for row in kick_results],
} }
except Exception as e: except Exception as e:
logger.error(f"Failed to get inactive users: {e}") logger.error(f"Failed to get inactive users: {e}")
@@ -154,7 +155,7 @@ async def get_inactive_users(database, warn_threshold_days: int, kick_threshold_
async def cleanup_stale_verification_states(database, logger) -> None: async def cleanup_stale_verification_states(database, logger) -> None:
"""Clean up stale verification states older than 24 hours. """Clean up stale verification states older than 24 hours.
Args: Args:
database: Database instance database: Database instance
logger: Logger instance for error reporting logger: Logger instance for error reporting
@@ -172,29 +173,34 @@ async def cleanup_stale_verification_states(database, logger) -> None:
async def get_verification_state(database, dm_room_id: str) -> Dict[str, Any]: async def get_verification_state(database, dm_room_id: str) -> Dict[str, Any]:
"""Get verification state for a DM room. """Get verification state for a DM room.
Args: Args:
database: Database instance database: Database instance
dm_room_id: The DM room ID dm_room_id: The DM room ID
Returns: Returns:
dict: Verification state data or None if not found dict: Verification state data or None if not found
""" """
try: try:
result = await database.fetchrow( result = await database.fetchrow(
"SELECT * FROM verification_states WHERE dm_room_id = $1", "SELECT * FROM verification_states WHERE dm_room_id = $1", dm_room_id
dm_room_id
) )
return dict(result) if result else None return dict(result) if result else None
except Exception as e: except Exception as e:
return None return None
async def create_verification_state(database, dm_room_id: str, user_id: str, async def create_verification_state(
target_room_id: str, verification_phrase: str, database,
attempts_remaining: int, required_power_level: int) -> None: dm_room_id: str,
user_id: str,
target_room_id: str,
verification_phrase: str,
attempts_remaining: int,
required_power_level: int,
) -> None:
"""Create a new verification state. """Create a new verification state.
Args: Args:
database: Database instance database: Database instance
dm_room_id: The DM room ID dm_room_id: The DM room ID
@@ -211,16 +217,22 @@ async def create_verification_state(database, dm_room_id: str, user_id: str,
(dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, required_power_level) (dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, required_power_level)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6)
""", """,
dm_room_id, user_id, target_room_id, verification_phrase, dm_room_id,
attempts_remaining, required_power_level user_id,
target_room_id,
verification_phrase,
attempts_remaining,
required_power_level,
) )
except Exception as e: except Exception as e:
pass # Verification state creation is not critical pass # Verification state creation is not critical
async def update_verification_attempts(database, dm_room_id: str, attempts_remaining: int) -> None: async def update_verification_attempts(
database, dm_room_id: str, attempts_remaining: int
) -> None:
"""Update verification attempts remaining. """Update verification attempts remaining.
Args: Args:
database: Database instance database: Database instance
dm_room_id: The DM room ID dm_room_id: The DM room ID
@@ -229,7 +241,8 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai
try: try:
await database.execute( await database.execute(
"UPDATE verification_states SET attempts_remaining = $1 WHERE dm_room_id = $2", "UPDATE verification_states SET attempts_remaining = $1 WHERE dm_room_id = $2",
attempts_remaining, dm_room_id attempts_remaining,
dm_room_id,
) )
except Exception as e: except Exception as e:
pass # Verification state update is not critical pass # Verification state update is not critical
@@ -237,15 +250,14 @@ async def update_verification_attempts(database, dm_room_id: str, attempts_remai
async def delete_verification_state(database, dm_room_id: str) -> None: async def delete_verification_state(database, dm_room_id: str) -> None:
"""Delete a verification state. """Delete a verification state.
Args: Args:
database: Database instance database: Database instance
dm_room_id: The DM room ID dm_room_id: The DM room ID
""" """
try: try:
await database.execute( await database.execute(
"DELETE FROM verification_states WHERE dm_room_id = $1", "DELETE FROM verification_states WHERE dm_room_id = $1", dm_room_id
dm_room_id
) )
except Exception as e: except Exception as e:
pass # Verification state deletion is not critical pass # Verification state deletion is not critical
+10 -2
View File
@@ -7,11 +7,12 @@ from mautrix.types import UserID, MessageEvent
def require_permission(min_level: int = 50, room_id: Optional[str] = None): def require_permission(min_level: int = 50, room_id: Optional[str] = None):
"""Decorator to require user permission for command execution. """Decorator to require user permission for command execution.
Args: Args:
min_level: Minimum required power level (default 50 for moderator) min_level: Minimum required power level (default 50 for moderator)
room_id: Room ID to check permissions in (None for parent room) room_id: Room ID to check permissions in (None for parent room)
""" """
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
@functools.wraps(func) @functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
@@ -19,26 +20,31 @@ def require_permission(min_level: int = 50, room_id: Optional[str] = None):
await evt.reply("You don't have permission to use this command") await evt.reply("You don't have permission to use this command")
return return
return await func(self, evt, *args, **kwargs) return await func(self, evt, *args, **kwargs)
return wrapper return wrapper
return decorator return decorator
def require_parent_room(func: Callable) -> Callable: def require_parent_room(func: Callable) -> Callable:
"""Decorator to require parent room to be configured.""" """Decorator to require parent room to be configured."""
@functools.wraps(func) @functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
if not await self.check_parent_room(evt): if not await self.check_parent_room(evt):
return return
return await func(self, evt, *args, **kwargs) return await func(self, evt, *args, **kwargs)
return wrapper return wrapper
def handle_errors(error_message: str = "An error occurred"): def handle_errors(error_message: str = "An error occurred"):
"""Decorator to handle common errors in command execution. """Decorator to handle common errors in command execution.
Args: Args:
error_message: Default error message to show to user error_message: Default error message to show to user
""" """
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
@functools.wraps(func) @functools.wraps(func)
async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any: async def wrapper(self, evt: MessageEvent, *args, **kwargs) -> Any:
@@ -47,5 +53,7 @@ def handle_errors(error_message: str = "An error occurred"):
except Exception as e: except Exception as e:
self.log.error(f"Error in {func.__name__}: {e}") self.log.error(f"Error in {func.__name__}: {e}")
await evt.reply(f"{error_message}: {e}") await evt.reply(f"{error_message}: {e}")
return wrapper return wrapper
return decorator return decorator
+93 -108
View File
@@ -6,17 +6,15 @@ from mautrix.client import Client
async def check_space_permissions( async def check_space_permissions(
client: Client, client: Client, parent_room: str, logger
parent_room: str,
logger
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Check bot permissions in the parent space. """Check bot permissions in the parent space.
Args: Args:
client: Matrix client client: Matrix client
parent_room: Parent room ID parent_room: Parent room ID
logger: Logger instance logger: Logger instance
Returns: Returns:
Dict containing space permission information Dict containing space permission information
""" """
@@ -25,11 +23,14 @@ async def check_space_permissions(
parent_room, EventType.ROOM_POWER_LEVELS parent_room, EventType.ROOM_POWER_LEVELS
) )
bot_level = space_power_levels.get_user_level(client.mxid) bot_level = space_power_levels.get_user_level(client.mxid)
# Check if bot has unlimited power (creator in modern room versions) # Check if bot has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power from .room_utils import user_has_unlimited_power
bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, parent_room)
bot_has_unlimited_power = await user_has_unlimited_power(
client, client.mxid, parent_room
)
space_info = { space_info = {
"room_id": parent_room, "room_id": parent_room,
"bot_power_level": bot_level, "bot_power_level": bot_level,
@@ -37,48 +38,36 @@ async def check_space_permissions(
"bot_has_unlimited_power": bot_has_unlimited_power, "bot_has_unlimited_power": bot_has_unlimited_power,
"users_higher_or_equal": [], "users_higher_or_equal": [],
"users_equal": [], "users_equal": [],
"users_higher": [] "users_higher": [],
} }
# Check for users with equal or higher power level # Check for users with equal or higher power level
for user, level in space_power_levels.users.items(): for user, level in space_power_levels.users.items():
if user != client.mxid and level >= bot_level: if user != client.mxid and level >= bot_level:
if level == bot_level: if level == bot_level:
space_info["users_equal"].append({ space_info["users_equal"].append({"user": user, "level": level})
"user": user,
"level": level
})
else: else:
space_info["users_higher"].append({ space_info["users_higher"].append({"user": user, "level": level})
"user": user, space_info["users_higher_or_equal"].append(
"level": level {"user": user, "level": level}
}) )
space_info["users_higher_or_equal"].append({
"user": user,
"level": level
})
return space_info return space_info
except Exception as e: except Exception as e:
logger.error(f"Failed to check space permissions: {e}") logger.error(f"Failed to check space permissions: {e}")
return { return {"room_id": parent_room, "error": str(e)}
"room_id": parent_room,
"error": str(e)
}
async def check_room_permissions( async def check_room_permissions(
client: Client, client: Client, room_id: str, logger
room_id: str,
logger
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Check bot permissions in a specific room. """Check bot permissions in a specific room.
Args: Args:
client: Matrix client client: Matrix client
room_id: Room ID to check room_id: Room ID to check
logger: Logger instance logger: Logger instance
Returns: Returns:
Dict containing room permission information Dict containing room permission information
""" """
@@ -87,33 +76,37 @@ async def check_room_permissions(
try: try:
await client.get_state_event(room_id, EventType.ROOM_MEMBER, client.mxid) await client.get_state_event(room_id, EventType.ROOM_MEMBER, client.mxid)
except: except:
return { return {"room_id": room_id, "error": "Bot not in room"}
"room_id": room_id,
"error": "Bot not in room"
}
# Get power levels # Get power levels
room_power_levels = await client.get_state_event( room_power_levels = await client.get_state_event(
room_id, EventType.ROOM_POWER_LEVELS room_id, EventType.ROOM_POWER_LEVELS
) )
bot_level = room_power_levels.get_user_level(client.mxid) bot_level = room_power_levels.get_user_level(client.mxid)
# Get room name if available # Get room name if available
room_name = room_id room_name = room_id
try: try:
from .common_utils import get_room_name from .common_utils import get_room_name
room_name = await get_room_name(client, room_id, logger) or room_id room_name = await get_room_name(client, room_id, logger) or room_id
except: except:
pass pass
# Get room version and creators # Get room version and creators
from .room_utils import get_room_version_and_creators from .room_utils import get_room_version_and_creators
room_version, creators = await get_room_version_and_creators(client, room_id, logger)
room_version, creators = await get_room_version_and_creators(
client, room_id, logger
)
# Check if bot has unlimited power (creator in modern room versions) # Check if bot has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power from .room_utils import user_has_unlimited_power
bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, room_id)
bot_has_unlimited_power = await user_has_unlimited_power(
client, client.mxid, room_id
)
room_report = { room_report = {
"room_id": room_id, "room_id": room_id,
"room_name": room_name, "room_name": room_name,
@@ -124,46 +117,35 @@ async def check_room_permissions(
"bot_has_unlimited_power": bot_has_unlimited_power, "bot_has_unlimited_power": bot_has_unlimited_power,
"users_higher_or_equal": [], "users_higher_or_equal": [],
"users_equal": [], "users_equal": [],
"users_higher": [] "users_higher": [],
} }
# Check for users with equal or higher power level # Check for users with equal or higher power level
for user, level in room_power_levels.users.items(): for user, level in room_power_levels.users.items():
if user != client.mxid and level >= bot_level: if user != client.mxid and level >= bot_level:
if level == bot_level: if level == bot_level:
room_report["users_equal"].append({ room_report["users_equal"].append({"user": user, "level": level})
"user": user,
"level": level
})
else: else:
room_report["users_higher"].append({ room_report["users_higher"].append({"user": user, "level": level})
"user": user, room_report["users_higher_or_equal"].append(
"level": level {"user": user, "level": level}
}) )
room_report["users_higher_or_equal"].append({
"user": user,
"level": level
})
return room_report return room_report
except Exception as e: except Exception as e:
logger.error(f"Failed to check room permissions for {room_id}: {e}") logger.error(f"Failed to check room permissions for {room_id}: {e}")
return { return {"room_id": room_id, "error": str(e)}
"room_id": room_id,
"error": str(e)
}
def analyze_room_data( def analyze_room_data(
room_data: Dict[str, Any], room_data: Dict[str, Any], is_modern_room_version_func
is_modern_room_version_func
) -> Tuple[str, str, bool, bool, bool]: ) -> Tuple[str, str, bool, bool, bool]:
"""Analyze room data to determine status and categorization. """Analyze room data to determine status and categorization.
Args: Args:
room_data: Room data dictionary room_data: Room data dictionary
is_modern_room_version_func: Function to check if room version is modern is_modern_room_version_func: Function to check if room version is modern
Returns: Returns:
Tuple of (status, category, is_admin, is_modern, has_error) Tuple of (status, category, is_admin, is_modern, has_error)
""" """
@@ -172,61 +154,58 @@ def analyze_room_data(
return "not_in_room", "error", False, False, True return "not_in_room", "error", False, False, True
else: else:
return "error", "error", False, False, True return "error", "error", False, False, True
# Check if modern room version # Check if modern room version
is_modern = is_modern_room_version_func(room_data.get("room_version", "1")) is_modern = is_modern_room_version_func(room_data.get("room_version", "1"))
# Check admin status # Check admin status
is_admin = room_data.get("has_admin", False) is_admin = room_data.get("has_admin", False)
if is_admin: if is_admin:
return "admin", "admin", True, is_modern, False return "admin", "admin", True, is_modern, False
else: else:
return "no_admin", "problematic", False, is_modern, False return "no_admin", "problematic", False, is_modern, False
def generate_space_summary( def generate_space_summary(space_data: Dict[str, Any]) -> str:
space_data: Dict[str, Any]
) -> str:
"""Generate HTML summary for space permissions. """Generate HTML summary for space permissions.
Args: Args:
space_data: Space permission data space_data: Space permission data
Returns: Returns:
str: HTML formatted space summary str: HTML formatted space summary
""" """
if "error" in space_data: if "error" in space_data:
return f"<h4>📋 Parent Space</h4><br />❌ <b>Error:</b> {space_data['error']}<br /><br />" return f"<h4>📋 Parent Space</h4><br />❌ <b>Error:</b> {space_data['error']}<br /><br />"
space_status = "" if space_data.get("has_admin", False) else "" space_status = "" if space_data.get("has_admin", False) else ""
response = f"<h4>📋 Parent Space</h4><br />" response = f"<h4>📋 Parent Space</h4><br />"
# Show admin status with appropriate details # Show admin status with appropriate details
if space_data.get("bot_has_unlimited_power", False): if space_data.get("bot_has_unlimited_power", False):
response += f"{space_status} <b>Administrative privileges:</b> Yes (unlimited power - creator)<br />" response += f"{space_status} <b>Administrative privileges:</b> Yes (unlimited power - creator)<br />"
else: else:
response += f"{space_status} <b>Administrative privileges:</b> {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})<br />" response += f"{space_status} <b>Administrative privileges:</b> {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})<br />"
if space_data.get("users_higher"): if space_data.get("users_higher"):
response += f"⚠️ <b>Users with higher power:</b> {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_higher']])}<br />" response += f"⚠️ <b>Users with higher power:</b> {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_higher']])}<br />"
if space_data.get("users_equal"): if space_data.get("users_equal"):
response += f"⚠️ <b>Users with equal power:</b> {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_equal']])}<br />" response += f"⚠️ <b>Users with equal power:</b> {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_equal']])}<br />"
response += "<br />" response += "<br />"
return response return response
def generate_room_summary( def generate_room_summary(
rooms_data: Dict[str, Any], rooms_data: Dict[str, Any], is_modern_room_version_func
is_modern_room_version_func
) -> Tuple[str, Dict[str, int]]: ) -> Tuple[str, Dict[str, int]]:
"""Generate HTML summary for room permissions. """Generate HTML summary for room permissions.
Args: Args:
rooms_data: Dictionary of room data rooms_data: Dictionary of room data
is_modern_room_version_func: Function to check if room version is modern is_modern_room_version_func: Function to check if room version is modern
Returns: Returns:
Tuple of (HTML response, statistics dict) Tuple of (HTML response, statistics dict)
""" """
@@ -237,14 +216,14 @@ def generate_room_summary(
"error_rooms": 0, "error_rooms": 0,
"not_in_room_count": 0, "not_in_room_count": 0,
"modern_rooms": 0, "modern_rooms": 0,
"legacy_rooms": 0 "legacy_rooms": 0,
} }
for room_id, room_data in rooms_data.items(): for room_id, room_data in rooms_data.items():
status, category, is_admin, is_modern, has_error = analyze_room_data( status, category, is_admin, is_modern, has_error = analyze_room_data(
room_data, is_modern_room_version_func room_data, is_modern_room_version_func
) )
# Update statistics # Update statistics
if has_error: if has_error:
stats["error_rooms"] += 1 stats["error_rooms"] += 1
@@ -255,26 +234,32 @@ def generate_room_summary(
stats["admin_rooms"] += 1 stats["admin_rooms"] += 1
else: else:
stats["non_admin_rooms"] += 1 stats["non_admin_rooms"] += 1
if is_modern: if is_modern:
stats["modern_rooms"] += 1 stats["modern_rooms"] += 1
else: else:
stats["legacy_rooms"] += 1 stats["legacy_rooms"] += 1
# Generate room info for problematic rooms # Generate room info for problematic rooms
if category in ["error", "problematic"] or (is_admin and (room_data.get("users_higher") or room_data.get("users_equal"))): if category in ["error", "problematic"] or (
is_admin and (room_data.get("users_higher") or room_data.get("users_equal"))
):
if has_error: if has_error:
if room_data["error"] == "Bot not in room": if room_data["error"] == "Bot not in room":
problematic_rooms.append(f"❌ <b>{room_data.get('room_name', room_id)}</b> ({room_id}): Bot not in room") problematic_rooms.append(
f"❌ <b>{room_data.get('room_name', room_id)}</b> ({room_id}): Bot not in room"
)
else: else:
problematic_rooms.append(f"❌ <b>{room_data.get('room_name', room_id)}</b> ({room_id}): Error - {room_data['error']}") problematic_rooms.append(
f"❌ <b>{room_data.get('room_name', room_id)}</b> ({room_id}): Error - {room_data['error']}"
)
elif is_admin: elif is_admin:
# Show unlimited power status for modern rooms # Show unlimited power status for modern rooms
if room_data.get("bot_has_unlimited_power", False): if room_data.get("bot_has_unlimited_power", False):
room_info = f"✅ <b>{room_data['room_name']}</b> ({room_id}): Unlimited Power (Creator) [v{room_data.get('room_version', '1')}]" room_info = f"✅ <b>{room_data['room_name']}</b> ({room_id}): Unlimited Power (Creator) [v{room_data.get('room_version', '1')}]"
else: else:
room_info = f"✅ <b>{room_data['room_name']}</b> ({room_id}): Admin: Yes (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]" room_info = f"✅ <b>{room_data['room_name']}</b> ({room_id}): Admin: Yes (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]"
# Add power level conflict info # Add power level conflict info
if room_data.get("users_higher") or room_data.get("users_equal"): if room_data.get("users_higher") or room_data.get("users_equal"):
if room_data.get("bot_has_unlimited_power", False): if room_data.get("bot_has_unlimited_power", False):
@@ -283,11 +268,15 @@ def generate_room_summary(
if room_data.get("users_higher"): if room_data.get("users_higher"):
room_info += f" - Higher power users: {len(room_data['users_higher'])}" room_info += f" - Higher power users: {len(room_data['users_higher'])}"
if room_data.get("users_equal"): if room_data.get("users_equal"):
room_info += f" - Equal power users: {len(room_data['users_equal'])}" room_info += (
f" - Equal power users: {len(room_data['users_equal'])}"
)
problematic_rooms.append(room_info) problematic_rooms.append(room_info)
else: else:
problematic_rooms.append(f"❌ <b>{room_data['room_name']}</b> ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]") problematic_rooms.append(
f"❌ <b>{room_data['room_name']}</b> ({room_id}): Admin: No (level: {room_data['bot_power_level']}) [v{room_data.get('room_version', '1')}]"
)
# Generate HTML response # Generate HTML response
response = "" response = ""
if problematic_rooms: if problematic_rooms:
@@ -296,20 +285,19 @@ def generate_room_summary(
for room_info in problematic_rooms: for room_info in problematic_rooms:
response += f"{room_info}<br />" response += f"{room_info}<br />"
response += "<br />" response += "<br />"
return response, stats return response, stats
def generate_summary_stats( def generate_summary_stats(
space_data: Dict[str, Any], space_data: Dict[str, Any], room_stats: Dict[str, int]
room_stats: Dict[str, int]
) -> str: ) -> str:
"""Generate summary statistics HTML. """Generate summary statistics HTML.
Args: Args:
space_data: Space permission data space_data: Space permission data
room_stats: Room statistics room_stats: Room statistics
Returns: Returns:
str: HTML formatted summary statistics str: HTML formatted summary statistics
""" """
@@ -319,35 +307,32 @@ def generate_summary_stats(
response += f"• Rooms without admin: {room_stats['non_admin_rooms']}<br />" response += f"• Rooms without admin: {room_stats['non_admin_rooms']}<br />"
response += f"• Modern room versions (12+): {room_stats['modern_rooms']}<br />" response += f"• Modern room versions (12+): {room_stats['modern_rooms']}<br />"
response += f"• Legacy room versions (1-11): {room_stats['legacy_rooms']}<br />" response += f"• Legacy room versions (1-11): {room_stats['legacy_rooms']}<br />"
# Add note about unlimited power for modern rooms # Add note about unlimited power for modern rooms
if room_stats['modern_rooms'] > 0: if room_stats["modern_rooms"] > 0:
response += f"<br />️ <b>Note:</b> In modern room versions (12+), creators have unlimited power and cannot be restricted by power levels.<br />" response += f"<br />️ <b>Note:</b> In modern room versions (12+), creators have unlimited power and cannot be restricted by power levels.<br />"
if room_stats['not_in_room_count'] > 0: if room_stats["not_in_room_count"] > 0:
response += f"• Rooms bot not in: {room_stats['not_in_room_count']}<br />" response += f"• Rooms bot not in: {room_stats['not_in_room_count']}<br />"
if room_stats['error_rooms'] > 0: if room_stats["error_rooms"] > 0:
response += f"• Rooms with errors: {room_stats['error_rooms']}<br />" response += f"• Rooms with errors: {room_stats['error_rooms']}<br />"
response += "<br />" response += "<br />"
return response return response
def generate_issues_and_warnings( def generate_issues_and_warnings(issues: List[str], warnings: List[str]) -> str:
issues: List[str],
warnings: List[str]
) -> str:
"""Generate issues and warnings HTML. """Generate issues and warnings HTML.
Args: Args:
issues: List of critical issues issues: List of critical issues
warnings: List of warnings warnings: List of warnings
Returns: Returns:
str: HTML formatted issues and warnings str: HTML formatted issues and warnings
""" """
response = "" response = ""
if issues: if issues:
response += f"<h4>🚨 Critical Issues</h4><br />" response += f"<h4>🚨 Critical Issues</h4><br />"
for issue in issues: for issue in issues:
@@ -359,13 +344,13 @@ def generate_issues_and_warnings(
for warning in warnings: for warning in warnings:
response += f"{warning}<br />" response += f"{warning}<br />"
response += "<br />" response += "<br />"
return response return response
def generate_all_clear_message() -> str: def generate_all_clear_message() -> str:
"""Generate all clear message HTML. """Generate all clear message HTML.
Returns: Returns:
str: HTML formatted all clear message str: HTML formatted all clear message
""" """
+13 -13
View File
@@ -7,12 +7,12 @@ from mautrix.types import MessageType, MediaMessageEventContent
def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool: def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool:
"""Check if a message should be flagged for censorship. """Check if a message should be flagged for censorship.
Args: Args:
msg: The message event to check msg: The message event to check
censor_wordlist: List of regex patterns to check against censor_wordlist: List of regex patterns to check against
censor_files: Whether to flag file messages censor_files: Whether to flag file messages
Returns: Returns:
bool: True if message should be flagged bool: True if message should be flagged
""" """
@@ -30,17 +30,17 @@ def flag_message(msg, censor_wordlist: list, censor_files: bool) -> bool:
except Exception: except Exception:
# Skip invalid regex patterns # Skip invalid regex patterns
pass pass
return False return False
def flag_instaban(msg, instaban_wordlist: list) -> bool: def flag_instaban(msg, instaban_wordlist: list) -> bool:
"""Check if a message should trigger an instant ban. """Check if a message should trigger an instant ban.
Args: Args:
msg: The message event to check msg: The message event to check
instaban_wordlist: List of regex patterns that trigger instant ban instaban_wordlist: List of regex patterns that trigger instant ban
Returns: Returns:
bool: True if message should trigger instant ban bool: True if message should trigger instant ban
""" """
@@ -51,17 +51,17 @@ def flag_instaban(msg, instaban_wordlist: list) -> bool:
except Exception: except Exception:
# Skip invalid regex patterns # Skip invalid regex patterns
pass pass
return False return False
def censor_room(msg, censor_config) -> bool: def censor_room(msg, censor_config) -> bool:
"""Check if a message should be censored based on room configuration. """Check if a message should be censored based on room configuration.
Args: Args:
msg: The message event to check msg: The message event to check
censor_config: Censor configuration (bool or list of room IDs) censor_config: Censor configuration (bool or list of room IDs)
Returns: Returns:
bool: True if message should be censored bool: True if message should be censored
""" """
@@ -75,10 +75,10 @@ def censor_room(msg, censor_config) -> bool:
def sanitize_room_name(room_name: str) -> str: def sanitize_room_name(room_name: str) -> str:
"""Sanitize a room name for use in aliases. """Sanitize a room name for use in aliases.
Args: Args:
room_name: The room name to sanitize room_name: The room name to sanitize
Returns: Returns:
str: Sanitized room name (alphanumeric only, lowercase) str: Sanitized room name (alphanumeric only, lowercase)
""" """
@@ -87,12 +87,12 @@ def sanitize_room_name(room_name: str) -> str:
def generate_community_slug(community_name: str) -> str: def generate_community_slug(community_name: str) -> str:
"""Generate a community slug from the community name. """Generate a community slug from the community name.
Args: Args:
community_name: The full community name community_name: The full community name
Returns: Returns:
str: A slug made from the first letter of each word, lowercase str: A slug made from the first letter of each word, lowercase
""" """
words = community_name.strip().split() words = community_name.strip().split()
return ''.join(word[0].lower() for word in words if word) return "".join(word[0].lower() for word in words if word)
+33 -33
View File
@@ -6,53 +6,53 @@ import time
def generate_activity_report(database_results: Dict[str, List[Dict]]) -> Dict[str, Any]: def generate_activity_report(database_results: Dict[str, List[Dict]]) -> Dict[str, Any]:
"""Generate an activity report from database results. """Generate an activity report from database results.
Args: Args:
database_results: Dictionary containing 'warn_inactive', 'kick_inactive', 'ignored' results database_results: Dictionary containing 'warn_inactive', 'kick_inactive', 'ignored' results
Returns: Returns:
dict: Formatted activity report dict: Formatted activity report
""" """
report = {} report = {}
# Process warn inactive users (between warn and kick thresholds) # Process warn inactive users (between warn and kick thresholds)
warn_inactive_results = database_results.get("warn_inactive", []) warn_inactive_results = database_results.get("warn_inactive", [])
report["warn_inactive"] = [row["mxid"] for row in warn_inactive_results] or ["none"] report["warn_inactive"] = [row["mxid"] for row in warn_inactive_results] or ["none"]
# Process kick inactive users (beyond kick threshold) # Process kick inactive users (beyond kick threshold)
kick_inactive_results = database_results.get("kick_inactive", []) kick_inactive_results = database_results.get("kick_inactive", [])
report["kick_inactive"] = [row["mxid"] for row in kick_inactive_results] or ["none"] report["kick_inactive"] = [row["mxid"] for row in kick_inactive_results] or ["none"]
# Process ignored users # Process ignored users
ignored_results = database_results.get("ignored", []) ignored_results = database_results.get("ignored", [])
report["ignored"] = [row["mxid"] for row in ignored_results] or ["none"] report["ignored"] = [row["mxid"] for row in ignored_results] or ["none"]
return report return report
def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[str]: def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[str]:
"""Split a doctor report into chunks that fit within size limits. """Split a doctor report into chunks that fit within size limits.
Args: Args:
report_text: The full report text report_text: The full report text
max_chunk_size: Maximum size per chunk max_chunk_size: Maximum size per chunk
Returns: Returns:
list: List of report chunks list: List of report chunks
""" """
if len(report_text) <= max_chunk_size: if len(report_text) <= max_chunk_size:
return [report_text] return [report_text]
# Try to split by sections first # Try to split by sections first
sections = _split_by_sections(report_text, max_chunk_size) sections = _split_by_sections(report_text, max_chunk_size)
if sections: if sections:
return sections return sections
# Fall back to character-based splitting # Fall back to character-based splitting
chunks = [] chunks = []
current_chunk = "" current_chunk = ""
for line in report_text.split('\n'): for line in report_text.split("\n"):
if len(current_chunk) + len(line) + 1 > max_chunk_size: if len(current_chunk) + len(line) + 1 > max_chunk_size:
if current_chunk: if current_chunk:
chunks.append(current_chunk.strip()) chunks.append(current_chunk.strip())
@@ -63,31 +63,31 @@ def split_doctor_report(report_text: str, max_chunk_size: int = 4000) -> List[st
current_chunk = line[max_chunk_size:] current_chunk = line[max_chunk_size:]
else: else:
if current_chunk: if current_chunk:
current_chunk += '\n' + line current_chunk += "\n" + line
else: else:
current_chunk = line current_chunk = line
if current_chunk: if current_chunk:
chunks.append(current_chunk.strip()) chunks.append(current_chunk.strip())
return chunks return chunks
def _split_by_sections(text: str, max_size: int) -> List[str]: def _split_by_sections(text: str, max_size: int) -> List[str]:
"""Split text by sections (lines starting with specific patterns). """Split text by sections (lines starting with specific patterns).
Args: Args:
text: The text to split text: The text to split
max_size: Maximum size per section max_size: Maximum size per section
Returns: Returns:
list: List of text sections list: List of text sections
""" """
section_headers = ["Active users:", "Inactive users:", "Ignored users:"] section_headers = ["Active users:", "Inactive users:", "Ignored users:"]
sections = [] sections = []
current_section = "" current_section = ""
lines = text.split('\n') lines = text.split("\n")
for line in lines: for line in lines:
if any(line.startswith(header) for header in section_headers): if any(line.startswith(header) for header in section_headers):
if current_section and len(current_section) > max_size: if current_section and len(current_section) > max_size:
@@ -101,54 +101,54 @@ def _split_by_sections(text: str, max_size: int) -> List[str]:
# This section would be too big # This section would be too big
return [] return []
if current_section: if current_section:
current_section += '\n' + line current_section += "\n" + line
else: else:
current_section = line current_section = line
if current_section: if current_section:
sections.append(current_section.strip()) sections.append(current_section.strip())
return sections if all(len(s) <= max_size for s in sections) else [] return sections if all(len(s) <= max_size for s in sections) else []
def format_ban_results(ban_event_map: Dict[str, List[str]]) -> str: def format_ban_results(ban_event_map: Dict[str, List[str]]) -> str:
"""Format ban results for display. """Format ban results for display.
Args: Args:
ban_event_map: Dictionary containing ban results ban_event_map: Dictionary containing ban results
Returns: Returns:
str: Formatted ban results str: Formatted ban results
""" """
ban_list = ban_event_map.get("ban_list", {}) ban_list = ban_event_map.get("ban_list", {})
error_list = ban_event_map.get("error_list", {}) error_list = ban_event_map.get("error_list", {})
result_parts = [] result_parts = []
for user, rooms in ban_list.items(): for user, rooms in ban_list.items():
if rooms: if rooms:
result_parts.append(f"Banned {user} from: {', '.join(rooms)}") result_parts.append(f"Banned {user} from: {', '.join(rooms)}")
for user, rooms in error_list.items(): for user, rooms in error_list.items():
if rooms: if rooms:
result_parts.append(f"Failed to ban {user} from: {', '.join(rooms)}") result_parts.append(f"Failed to ban {user} from: {', '.join(rooms)}")
return '\n'.join(result_parts) if result_parts else "No ban operations performed" return "\n".join(result_parts) if result_parts else "No ban operations performed"
def format_sync_results(sync_results: Dict[str, List[str]]) -> str: def format_sync_results(sync_results: Dict[str, List[str]]) -> str:
"""Format sync results for display. """Format sync results for display.
Args: Args:
sync_results: Dictionary containing sync results sync_results: Dictionary containing sync results
Returns: Returns:
str: Formatted sync results str: Formatted sync results
""" """
added = sync_results.get("added", []) added = sync_results.get("added", [])
dropped = sync_results.get("dropped", []) dropped = sync_results.get("dropped", [])
added_str = "<br />".join(added) if added else "none" added_str = "<br />".join(added) if added else "none"
dropped_str = "<br />".join(dropped) if dropped else "none" dropped_str = "<br />".join(dropped) if dropped else "none"
return f"Added: {added_str}<br /><br />Dropped: {dropped_str}" return f"Added: {added_str}<br /><br />Dropped: {dropped_str}"
+71 -61
View File
@@ -6,16 +6,16 @@ from mautrix.types import MessageEvent
class ResponseBuilder: class ResponseBuilder:
"""Builder for consistent response formatting.""" """Builder for consistent response formatting."""
@staticmethod @staticmethod
def build_html_response(title: str, content: str, allow_html: bool = True) -> str: def build_html_response(title: str, content: str, allow_html: bool = True) -> str:
"""Build an HTML formatted response. """Build an HTML formatted response.
Args: Args:
title: Response title title: Response title
content: Response content content: Response content
allow_html: Whether to allow HTML formatting allow_html: Whether to allow HTML formatting
Returns: Returns:
str: Formatted response str: Formatted response
""" """
@@ -23,15 +23,15 @@ class ResponseBuilder:
return f"<p><b>{title}</b><br />{content}</p>" return f"<p><b>{title}</b><br />{content}</p>"
else: else:
return f"{title}\n{content}" return f"{title}\n{content}"
@staticmethod @staticmethod
def build_error_response(error: str, allow_html: bool = True) -> str: def build_error_response(error: str, allow_html: bool = True) -> str:
"""Build an error response. """Build an error response.
Args: Args:
error: Error message error: Error message
allow_html: Whether to allow HTML formatting allow_html: Whether to allow HTML formatting
Returns: Returns:
str: Formatted error response str: Formatted error response
""" """
@@ -39,15 +39,15 @@ class ResponseBuilder:
return f"<p><b>Error:</b> {error}</p>" return f"<p><b>Error:</b> {error}</p>"
else: else:
return f"Error: {error}" return f"Error: {error}"
@staticmethod @staticmethod
def build_success_response(message: str, allow_html: bool = True) -> str: def build_success_response(message: str, allow_html: bool = True) -> str:
"""Build a success response. """Build a success response.
Args: Args:
message: Success message message: Success message
allow_html: Whether to allow HTML formatting allow_html: Whether to allow HTML formatting
Returns: Returns:
str: Formatted success response str: Formatted success response
""" """
@@ -55,186 +55,196 @@ class ResponseBuilder:
return f"<p><b>Success:</b> {message}</p>" return f"<p><b>Success:</b> {message}</p>"
else: else:
return f"Success: {message}" return f"Success: {message}"
@staticmethod @staticmethod
def build_list_response(title: str, items: List[str], allow_html: bool = True) -> str: def build_list_response(
title: str, items: List[str], allow_html: bool = True
) -> str:
"""Build a list response. """Build a list response.
Args: Args:
title: List title title: List title
items: List items items: List items
allow_html: Whether to allow HTML formatting allow_html: Whether to allow HTML formatting
Returns: Returns:
str: Formatted list response str: Formatted list response
""" """
if not items: if not items:
return ResponseBuilder.build_html_response(title, "No items found.", allow_html) return ResponseBuilder.build_html_response(
title, "No items found.", allow_html
)
if allow_html: if allow_html:
items_html = "<br />".join(items) items_html = "<br />".join(items)
return f"<p><b>{title}</b><br />{items_html}</p>" return f"<p><b>{title}</b><br />{items_html}</p>"
else: else:
items_text = "\n".join(f"- {item}" for item in items) items_text = "\n".join(f"- {item}" for item in items)
return f"{title}\n{items_text}" return f"{title}\n{items_text}"
@staticmethod @staticmethod
def build_room_link(alias: str, server: str) -> str: def build_room_link(alias: str, server: str) -> str:
"""Build a Matrix room link. """Build a Matrix room link.
Args: Args:
alias: Room alias alias: Room alias
server: Server name server: Server name
Returns: Returns:
str: HTML room link str: HTML room link
""" """
return f"<a href='https://matrix.to/#/#{alias}:{server}'>#{alias}:{server}</a>" return f"<a href='https://matrix.to/#/#{alias}:{server}'>#{alias}:{server}</a>"
@staticmethod @staticmethod
def build_user_link(user_id: str) -> str: def build_user_link(user_id: str) -> str:
"""Build a Matrix user link. """Build a Matrix user link.
Args: Args:
user_id: User ID user_id: User ID
Returns: Returns:
str: HTML user link str: HTML user link
""" """
return f"<a href='https://matrix.to/#/{user_id}'>{user_id}</a>" return f"<a href='https://matrix.to/#/{user_id}'>{user_id}</a>"
@staticmethod @staticmethod
def build_activity_report_response(report: Dict[str, List[str]], config: Dict[str, Any]) -> str: def build_activity_report_response(
report: Dict[str, List[str]], config: Dict[str, Any]
) -> str:
"""Build an activity report response. """Build an activity report response.
Args: Args:
report: Activity report data report: Activity report data
config: Bot configuration config: Bot configuration
Returns: Returns:
str: Formatted activity report str: Formatted activity report
""" """
warn_threshold = config.get("warn_threshold_days", 30) warn_threshold = config.get("warn_threshold_days", 30)
kick_threshold = config.get("kick_threshold_days", 60) kick_threshold = config.get("kick_threshold_days", 60)
response_parts = [] response_parts = []
if report.get("warn_inactive"): if report.get("warn_inactive"):
warn_list = "<br />".join(report["warn_inactive"]) warn_list = "<br />".join(report["warn_inactive"])
response_parts.append( response_parts.append(
f"<p><b>Users inactive for between {warn_threshold} and {kick_threshold} days:</b><br />" f"<p><b>Users inactive for between {warn_threshold} and {kick_threshold} days:</b><br />"
f"{warn_list}<br /></p>" f"{warn_list}<br /></p>"
) )
if report.get("kick_inactive"): if report.get("kick_inactive"):
kick_list = "<br />".join(report["kick_inactive"]) kick_list = "<br />".join(report["kick_inactive"])
response_parts.append( response_parts.append(
f"<p><b>Users inactive for at least {kick_threshold} days:</b><br />" f"<p><b>Users inactive for at least {kick_threshold} days:</b><br />"
f"{kick_list}<br /></p>" f"{kick_list}<br /></p>"
) )
if report.get("ignored"): if report.get("ignored"):
ignored_list = "<br />".join(report["ignored"]) ignored_list = "<br />".join(report["ignored"])
response_parts.append( response_parts.append(f"<p><b>Ignored users:</b><br />{ignored_list}</p>")
f"<p><b>Ignored users:</b><br />{ignored_list}</p>"
)
return "".join(response_parts) return "".join(response_parts)
@staticmethod @staticmethod
def build_ban_results_response(results: Dict[str, Any]) -> str: def build_ban_results_response(results: Dict[str, Any]) -> str:
"""Build a ban results response. """Build a ban results response.
Args: Args:
results: Ban results data results: Ban results data
Returns: Returns:
str: Formatted ban results str: Formatted ban results
""" """
ban_list = results.get("ban_list", []) ban_list = results.get("ban_list", [])
error_list = results.get("error_list", []) error_list = results.get("error_list", [])
response_parts = [] response_parts = []
if ban_list: if ban_list:
ban_list_html = "<br />".join(ban_list) ban_list_html = "<br />".join(ban_list)
response_parts.append(f"<p><b>Users banned:</b><br /><code>{ban_list_html}</code></p>") response_parts.append(
f"<p><b>Users banned:</b><br /><code>{ban_list_html}</code></p>"
)
if error_list: if error_list:
error_list_html = "<br />".join(error_list) error_list_html = "<br />".join(error_list)
response_parts.append(f"<p><b>Errors:</b><br /><code>{error_list_html}</code></p>") response_parts.append(
f"<p><b>Errors:</b><br /><code>{error_list_html}</code></p>"
)
if not response_parts: if not response_parts:
response_parts.append("<p>No users were banned.</p>") response_parts.append("<p>No users were banned.</p>")
return "".join(response_parts) return "".join(response_parts)
@staticmethod @staticmethod
def build_sync_results_response(results: Dict[str, List[str]]) -> str: def build_sync_results_response(results: Dict[str, List[str]]) -> str:
"""Build a sync results response. """Build a sync results response.
Args: Args:
results: Sync results data results: Sync results data
Returns: Returns:
str: Formatted sync results str: Formatted sync results
""" """
added = results.get("added", []) added = results.get("added", [])
dropped = results.get("dropped", []) dropped = results.get("dropped", [])
response_parts = [] response_parts = []
if added: if added:
added_html = "<br />".join(added) added_html = "<br />".join(added)
response_parts.append(f"<p><b>Added:</b><br />{added_html}</p>") response_parts.append(f"<p><b>Added:</b><br />{added_html}</p>")
if dropped: if dropped:
dropped_html = "<br />".join(dropped) dropped_html = "<br />".join(dropped)
response_parts.append(f"<p><b>Dropped:</b><br />{dropped_html}</p>") response_parts.append(f"<p><b>Dropped:</b><br />{dropped_html}</p>")
if not response_parts: if not response_parts:
response_parts.append("<p>No changes made.</p>") response_parts.append("<p>No changes made.</p>")
return "".join(response_parts) return "".join(response_parts)
@staticmethod @staticmethod
def build_doctor_report_response(report: Dict[str, Any]) -> str: def build_doctor_report_response(report: Dict[str, Any]) -> str:
"""Build a doctor report response. """Build a doctor report response.
Args: Args:
report: Doctor report data report: Doctor report data
Returns: Returns:
str: Formatted doctor report str: Formatted doctor report
""" """
response_parts = [] response_parts = []
# Space information # Space information
if report.get("space"): if report.get("space"):
space = report["space"] space = report["space"]
space_info = f"<b>Space:</b> {space.get('room_id', 'Unknown')}<br />" space_info = f"<b>Space:</b> {space.get('room_id', 'Unknown')}<br />"
space_info += f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}<br />" space_info += (
f"Bot Power Level: {space.get('bot_power_level', 'Unknown')}<br />"
)
space_info += f"Has Admin: {space.get('has_admin', False)}<br />" space_info += f"Has Admin: {space.get('has_admin', False)}<br />"
response_parts.append(f"<p>{space_info}</p>") response_parts.append(f"<p>{space_info}</p>")
# Room information # Room information
if report.get("rooms"): if report.get("rooms"):
rooms_info = "<b>Rooms:</b><br />" rooms_info = "<b>Rooms:</b><br />"
for room_id, room_data in report["rooms"].items(): for room_id, room_data in report["rooms"].items():
rooms_info += f"- {room_id}: {room_data.get('status', 'Unknown')}<br />" rooms_info += f"- {room_id}: {room_data.get('status', 'Unknown')}<br />"
response_parts.append(f"<p>{rooms_info}</p>") response_parts.append(f"<p>{rooms_info}</p>")
# Issues # Issues
if report.get("issues"): if report.get("issues"):
issues_html = "<br />".join(report["issues"]) issues_html = "<br />".join(report["issues"])
response_parts.append(f"<p><b>Issues:</b><br />{issues_html}</p>") response_parts.append(f"<p><b>Issues:</b><br />{issues_html}</p>")
# Warnings # Warnings
if report.get("warnings"): if report.get("warnings"):
warnings_html = "<br />".join(report["warnings"]) warnings_html = "<br />".join(report["warnings"])
response_parts.append(f"<p><b>Warnings:</b><br />{warnings_html}</p>") response_parts.append(f"<p><b>Warnings:</b><br />{warnings_html}</p>")
if not response_parts: if not response_parts:
response_parts.append("<p>No issues found.</p>") response_parts.append("<p>No issues found.</p>")
return "".join(response_parts) return "".join(response_parts)
+88 -86
View File
@@ -8,17 +8,15 @@ from mautrix.client import Client
async def validate_room_creation_params( async def validate_room_creation_params(
roomname: str, roomname: str, config: dict, evt: Optional[MessageEvent] = None
config: dict,
evt: Optional[MessageEvent] = None
) -> Tuple[str, bool, bool, str]: ) -> Tuple[str, bool, bool, str]:
"""Validate and process room creation parameters. """Validate and process room creation parameters.
Args: Args:
roomname: Original room name roomname: Original room name
config: Bot configuration config: Bot configuration
evt: Optional MessageEvent for error responses evt: Optional MessageEvent for error responses
Returns: Returns:
Tuple of (sanitized_name, force_encryption, force_unencryption, error_msg) Tuple of (sanitized_name, force_encryption, force_unencryption, error_msg)
""" """
@@ -27,23 +25,23 @@ async def validate_room_creation_params(
unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?(\s+|$)") unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?(\s+|$)")
force_encryption = bool(encrypted_flag_regex.search(roomname)) force_encryption = bool(encrypted_flag_regex.search(roomname))
force_unencryption = bool(unencrypted_flag_regex.search(roomname)) force_unencryption = bool(unencrypted_flag_regex.search(roomname))
# Clean up room name # Clean up room name
if force_encryption: if force_encryption:
roomname = encrypted_flag_regex.sub("", roomname) # Remove encryption flag roomname = encrypted_flag_regex.sub("", roomname) # Remove encryption flag
if force_unencryption: if force_unencryption:
roomname = unencrypted_flag_regex.sub("", roomname) # Remove unencryption flag roomname = unencrypted_flag_regex.sub("", roomname) # Remove unencryption flag
# Clean up any extra whitespace # Clean up any extra whitespace
roomname = re.sub(r"\s+", " ", roomname).strip() roomname = re.sub(r"\s+", " ", roomname).strip()
sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", roomname).lower() sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", roomname).lower()
# Check if community slug is configured # Check if community slug is configured
if not config.get("community_slug", ""): if not config.get("community_slug", ""):
error_msg = "No community slug configured. Please run initialize command first." error_msg = "No community slug configured. Please run initialize command first."
return sanitized_name, force_encryption, force_unencryption, error_msg, roomname return sanitized_name, force_encryption, force_unencryption, error_msg, roomname
return sanitized_name, force_encryption, force_unencryption, "", roomname return sanitized_name, force_encryption, force_unencryption, "", roomname
@@ -51,27 +49,27 @@ async def prepare_room_creation_data(
sanitized_name: str, sanitized_name: str,
config: dict, config: dict,
client: Client, client: Client,
invitees: Optional[List[str]] = None invitees: Optional[List[str]] = None,
) -> Tuple[str, str, List[str], str]: ) -> Tuple[str, str, List[str], str]:
"""Prepare data needed for room creation. """Prepare data needed for room creation.
Args: Args:
sanitized_name: Sanitized room name sanitized_name: Sanitized room name
config: Bot configuration config: Bot configuration
client: Matrix client client: Matrix client
invitees: Optional list of users to invite invitees: Optional list of users to invite
Returns: Returns:
Tuple of (alias_localpart, server, room_invitees, parent_room) Tuple of (alias_localpart, server, room_invitees, parent_room)
""" """
# Create alias with community slug # Create alias with community slug
alias_localpart = f"{sanitized_name}-{config.get('community_slug', '')}" alias_localpart = f"{sanitized_name}-{config.get('community_slug', '')}"
# Get server and invitees # Get server and invitees
server = client.parse_user_id(client.mxid)[1] server = client.parse_user_id(client.mxid)[1]
room_invitees = invitees if invitees is not None else config.get("invitees", []) room_invitees = invitees if invitees is not None else config.get("invitees", [])
parent_room = config.get("parent_room", "") parent_room = config.get("parent_room", "")
return alias_localpart, server, room_invitees, parent_room return alias_localpart, server, room_invitees, parent_room
@@ -79,34 +77,38 @@ async def prepare_power_levels(
client: Client, client: Client,
config: dict, config: dict,
parent_room: str, parent_room: str,
power_level_override: Optional[PowerLevelStateEventContent] = None power_level_override: Optional[PowerLevelStateEventContent] = None,
) -> PowerLevelStateEventContent: ) -> PowerLevelStateEventContent:
"""Prepare power levels for room creation. """Prepare power levels for room creation.
Args: Args:
client: Matrix client client: Matrix client
config: Bot configuration config: Bot configuration
parent_room: Parent room ID parent_room: Parent room ID
power_level_override: Optional existing power level override power_level_override: Optional existing power level override
Returns: Returns:
PowerLevelStateEventContent for room creation PowerLevelStateEventContent for room creation
""" """
if power_level_override: if power_level_override:
return power_level_override return power_level_override
if parent_room: if parent_room:
try: try:
# Get parent room power levels to extract user power levels # Get parent room power levels to extract user power levels
parent_power_levels = await client.get_state_event( parent_power_levels = await client.get_state_event(
parent_room, EventType.ROOM_POWER_LEVELS parent_room, EventType.ROOM_POWER_LEVELS
) )
# Create new power levels with server defaults, not copying all permissions from space # Create new power levels with server defaults, not copying all permissions from space
power_levels = PowerLevelStateEventContent() power_levels = PowerLevelStateEventContent()
# Copy only user power levels from parent space, not the entire permission set # Copy only user power levels from parent space, not the entire permission set
if parent_power_levels and hasattr(parent_power_levels, 'users') and parent_power_levels.users: if (
parent_power_levels
and hasattr(parent_power_levels, "users")
and parent_power_levels.users
):
try: try:
user_power_levels = parent_power_levels.users.copy() user_power_levels = parent_power_levels.users.copy()
# Ensure bot has highest power # Ensure bot has highest power
@@ -121,10 +123,10 @@ async def prepare_power_levels(
power_levels.users = { power_levels.users = {
client.mxid: 1000, # Bot gets highest power client.mxid: 1000, # Bot gets highest power
} }
# Set explicit config values # Set explicit config values
power_levels.invite = config.get("invite_power_level", 50) power_levels.invite = config.get("invite_power_level", 50)
return power_levels return power_levels
except Exception as e: except Exception as e:
# If we can't get parent power levels, create default ones # If we can't get parent power levels, create default ones
@@ -150,10 +152,10 @@ def prepare_initial_state(
server: str, server: str,
force_encryption: bool, force_encryption: bool,
force_unencryption: bool, force_unencryption: bool,
creation_content: Optional[Dict[str, Any]] = None creation_content: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Prepare initial state events for room creation. """Prepare initial state events for room creation.
Args: Args:
config: Bot configuration config: Bot configuration
parent_room: Parent room ID parent_room: Parent room ID
@@ -161,66 +163,67 @@ def prepare_initial_state(
force_encryption: Whether to force encryption force_encryption: Whether to force encryption
force_unencryption: Whether to force no encryption force_unencryption: Whether to force no encryption
creation_content: Optional creation content creation_content: Optional creation content
Returns: Returns:
List of initial state events List of initial state events
""" """
initial_state = [] initial_state = []
# Only add space parent state if we have a parent room # Only add space parent state if we have a parent room
if parent_room: if parent_room:
initial_state.extend([ initial_state.extend(
{ [
"type": str(EventType.SPACE_PARENT), {
"state_key": parent_room, "type": str(EventType.SPACE_PARENT),
"content": { "state_key": parent_room,
"via": [server], "content": {"via": [server], "canonical": True},
"canonical": True },
} {
}, "type": str(EventType.ROOM_JOIN_RULES),
{ "content": {
"type": str(EventType.ROOM_JOIN_RULES), "join_rule": "restricted",
"content": { "allow": [
"join_rule": "restricted", {"type": "m.room_membership", "room_id": parent_room}
"allow": [{ ],
"type": "m.room_membership", },
"room_id": parent_room },
}] ]
} )
}
])
# Add encryption if needed # Add encryption if needed
if (config.get("encrypt", False) and not force_unencryption) or force_encryption: if (config.get("encrypt", False) and not force_unencryption) or force_encryption:
initial_state.append({ initial_state.append(
"type": str(EventType.ROOM_ENCRYPTION), {
"content": { "type": str(EventType.ROOM_ENCRYPTION),
"algorithm": "m.megolm.v1.aes-sha2" "content": {"algorithm": "m.megolm.v1.aes-sha2"},
} }
}) )
# Add history visibility if specified in creation_content # Add history visibility if specified in creation_content
if creation_content and "m.room.history_visibility" in creation_content: if creation_content and "m.room.history_visibility" in creation_content:
initial_state.append({ initial_state.append(
"type": str(EventType.ROOM_HISTORY_VISIBILITY), {
"content": { "type": str(EventType.ROOM_HISTORY_VISIBILITY),
"history_visibility": creation_content.get("m.room.history_visibility", "joined") "content": {
"history_visibility": creation_content.get(
"m.room.history_visibility", "joined"
)
},
} }
}) )
return initial_state return initial_state
def adjust_power_levels_for_modern_rooms( def adjust_power_levels_for_modern_rooms(
power_levels: PowerLevelStateEventContent, power_levels: PowerLevelStateEventContent, room_version: str
room_version: str
) -> PowerLevelStateEventContent: ) -> PowerLevelStateEventContent:
"""Adjust power levels for modern room versions. """Adjust power levels for modern room versions.
Args: Args:
power_levels: Power level state content power_levels: Power level state content
room_version: Room version string room_version: Room version string
Returns: Returns:
Adjusted power level state content Adjusted power level state content
""" """
@@ -229,20 +232,18 @@ def adjust_power_levels_for_modern_rooms(
if room_version and int(room_version) >= 12 and power_levels: if room_version and int(room_version) >= 12 and power_levels:
if power_levels.users: if power_levels.users:
# Remove bot from users list but keep other important settings # Remove bot from users list but keep other important settings
power_levels.users.pop("bot_mxid", None) # Will be replaced with actual bot mxid power_levels.users.pop(
"bot_mxid", None
) # Will be replaced with actual bot mxid
return power_levels return power_levels
async def add_room_to_space( async def add_room_to_space(
client: Client, client: Client, parent_room: str, room_id: str, server: str, sleep_duration: float
parent_room: str,
room_id: str,
server: str,
sleep_duration: float
) -> None: ) -> None:
"""Add created room to parent space. """Add created room to parent space.
Args: Args:
client: Matrix client client: Matrix client
parent_room: Parent room ID parent_room: Parent room ID
@@ -254,23 +255,17 @@ async def add_room_to_space(
await client.send_state_event( await client.send_state_event(
parent_room, parent_room,
EventType.SPACE_CHILD, EventType.SPACE_CHILD,
{ {"via": [server], "suggested": False},
"via": [server], state_key=room_id,
"suggested": False
},
state_key=room_id
) )
await asyncio.sleep(sleep_duration) await asyncio.sleep(sleep_duration)
async def verify_room_creation( async def verify_room_creation(
client: Client, client: Client, room_id: str, expected_version: str, logger
room_id: str,
expected_version: str,
logger
) -> None: ) -> None:
"""Verify that room was created with correct settings. """Verify that room was created with correct settings.
Args: Args:
client: Matrix client client: Matrix client
room_id: Created room ID room_id: Created room ID
@@ -279,9 +274,16 @@ async def verify_room_creation(
""" """
try: try:
from .room_utils import get_room_version_and_creators from .room_utils import get_room_version_and_creators
actual_version, actual_creators = await get_room_version_and_creators(client, room_id, logger)
logger.info(f"Room {room_id} created with version {actual_version} (requested: {expected_version})") actual_version, actual_creators = await get_room_version_and_creators(
client, room_id, logger
)
logger.info(
f"Room {room_id} created with version {actual_version} (requested: {expected_version})"
)
if actual_version != expected_version: if actual_version != expected_version:
logger.warning(f"Room version mismatch: requested {expected_version}, got {actual_version}") logger.warning(
f"Room version mismatch: requested {expected_version}, got {actual_version}"
)
except Exception as e: except Exception as e:
logger.warning(f"Could not verify room version for {room_id}: {e}") logger.warning(f"Could not verify room version for {room_id}: {e}")
+36 -29
View File
@@ -8,12 +8,12 @@ from mautrix.errors import MNotFound
async def validate_room_alias(client, alias_localpart: str, server: str) -> bool: async def validate_room_alias(client, alias_localpart: str, server: str) -> bool:
"""Check if a room alias already exists. """Check if a room alias already exists.
Args: Args:
client: Matrix client instance client: Matrix client instance
alias_localpart: The localpart of the alias (without # and :server) alias_localpart: The localpart of the alias (without # and :server)
server: The server domain server: The server domain
Returns: Returns:
bool: True if alias is available, False if it already exists bool: True if alias is available, False if it already exists
""" """
@@ -30,76 +30,81 @@ async def validate_room_alias(client, alias_localpart: str, server: str) -> bool
return True return True
async def validate_room_aliases(client, room_names: list[str], community_slug: str, server: str) -> Tuple[bool, List[str]]: async def validate_room_aliases(
client, room_names: list[str], community_slug: str, server: str
) -> Tuple[bool, List[str]]:
"""Validate that all room aliases are available. """Validate that all room aliases are available.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_names: List of room names to validate room_names: List of room names to validate
community_slug: The community slug to append community_slug: The community slug to append
server: The server domain server: The server domain
Returns: Returns:
tuple: (is_valid, list_of_conflicting_aliases) tuple: (is_valid, list_of_conflicting_aliases)
""" """
if not community_slug: if not community_slug:
return False, [] return False, []
conflicting_aliases = [] conflicting_aliases = []
for room_name in room_names: for room_name in room_names:
# Clean the room name and create alias # Clean the room name and create alias
from .message_utils import sanitize_room_name from .message_utils import sanitize_room_name
sanitized_name = sanitize_room_name(room_name) sanitized_name = sanitize_room_name(room_name)
alias_localpart = f"{sanitized_name}-{community_slug}" alias_localpart = f"{sanitized_name}-{community_slug}"
# Check if alias is available # Check if alias is available
is_available = await validate_room_alias(client, alias_localpart, server) is_available = await validate_room_alias(client, alias_localpart, server)
if not is_available: if not is_available:
conflicting_aliases.append(f"#{alias_localpart}:{server}") conflicting_aliases.append(f"#{alias_localpart}:{server}")
return len(conflicting_aliases) == 0, conflicting_aliases return len(conflicting_aliases) == 0, conflicting_aliases
async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tuple[str, List[str]]: async def get_room_version_and_creators(
client, room_id: str, logger=None
) -> Tuple[str, List[str]]:
"""Get the room version and creators for a room. """Get the room version and creators for a room.
Args: Args:
client: Matrix client instance client: Matrix client instance
room_id: The room ID to check room_id: The room ID to check
Returns: Returns:
tuple: (room_version, list_of_creators) tuple: (room_version, list_of_creators)
""" """
try: try:
# Get all state events to find the creation event # Get all state events to find the creation event
state_events = await client.get_state(room_id) state_events = await client.get_state(room_id)
# Find the m.room.create event # Find the m.room.create event
creation_event = None creation_event = None
for event in state_events: for event in state_events:
if event.type == EventType.ROOM_CREATE: if event.type == EventType.ROOM_CREATE:
creation_event = event creation_event = event
break break
if not creation_event: if not creation_event:
# Default to version 1 if no creation event found # Default to version 1 if no creation event found
return "1", [] return "1", []
room_version = creation_event.content.get("room_version", "1") room_version = creation_event.content.get("room_version", "1")
creators = [] creators = []
# Add the sender of the creation event as a creator # Add the sender of the creation event as a creator
if creation_event.sender: if creation_event.sender:
creators.append(creation_event.sender) creators.append(creation_event.sender)
# Add any additional creators from the content # Add any additional creators from the content
additional_creators = creation_event.content.get("additional_creators", []) additional_creators = creation_event.content.get("additional_creators", [])
if isinstance(additional_creators, list): if isinstance(additional_creators, list):
creators.extend(additional_creators) creators.extend(additional_creators)
return room_version, creators return room_version, creators
except Exception: except Exception:
# Default to version 1 if there's an error # Default to version 1 if there's an error
return "1", [] return "1", []
@@ -107,10 +112,10 @@ async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tu
def is_modern_room_version(room_version: str) -> bool: def is_modern_room_version(room_version: str) -> bool:
"""Check if a room version is 12 or newer (modern room versions). """Check if a room version is 12 or newer (modern room versions).
Args: Args:
room_version: The room version string to check room_version: The room version string to check
Returns: Returns:
bool: True if room version is 12 or newer bool: True if room version is 12 or newer
""" """
@@ -124,36 +129,38 @@ def is_modern_room_version(room_version: str) -> bool:
async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool: async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
"""Check if a user has unlimited power in a room (creator in modern room versions). """Check if a user has unlimited power in a room (creator in modern room versions).
Args: Args:
client: Matrix client instance client: Matrix client instance
user_id: The user ID to check user_id: The user ID to check
room_id: The room ID to check in room_id: The room ID to check in
Returns: Returns:
bool: True if user has unlimited power bool: True if user has unlimited power
""" """
try: try:
room_version, creators = await get_room_version_and_creators(client, room_id, None) room_version, creators = await get_room_version_and_creators(
client, room_id, None
)
# In modern room versions (12+), creators have unlimited power # In modern room versions (12+), creators have unlimited power
if is_modern_room_version(room_version): if is_modern_room_version(room_version):
return user_id in creators return user_id in creators
# In older room versions, creators don't have special unlimited power # In older room versions, creators don't have special unlimited power
return False return False
except Exception: except Exception:
return False return False
async def get_moderators_and_above(client, parent_room: str) -> List[str]: async def get_moderators_and_above(client, parent_room: str) -> List[str]:
"""Get list of users with moderator or higher permissions from the parent space. """Get list of users with moderator or higher permissions from the parent space.
Args: Args:
client: Matrix client instance client: Matrix client instance
parent_room: The parent room ID parent_room: The parent room ID
Returns: Returns:
list: List of user IDs with power level >= 50 (moderator or above) list: List of user IDs with power level >= 50 (moderator or above)
""" """
+39 -23
View File
@@ -10,13 +10,13 @@ from mautrix.errors import MNotFound
async def check_if_banned(client, userid: str, banlists: List[str], logger) -> bool: async def check_if_banned(client, userid: str, banlists: List[str], logger) -> bool:
"""Check if a user is banned according to banlists. """Check if a user is banned according to banlists.
Args: Args:
client: Matrix client instance client: Matrix client instance
userid: The user ID to check userid: The user ID to check
banlists: List of banlist room IDs or aliases banlists: List of banlist room IDs or aliases
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
bool: True if user is banned bool: True if user is banned
""" """
@@ -42,25 +42,25 @@ async def check_if_banned(client, userid: str, banlists: List[str], logger) -> b
for rule in user_policies: for rule in user_policies:
try: try:
if bool( if bool(fnmatch.fnmatch(userid, rule["content"]["entity"])) and bool(
fnmatch.fnmatch(userid, rule["content"]["entity"]) re.search("ban$", rule["content"]["recommendation"])
) and bool(re.search("ban$", rule["content"]["recommendation"])): ):
return True return True
except Exception: except Exception:
# Skip invalid rules # Skip invalid rules
pass pass
return is_banned return is_banned
async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]: async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]:
"""Get room IDs for all configured banlists. """Get room IDs for all configured banlists.
Args: Args:
client: Matrix client instance client: Matrix client instance
banlists: List of banlist room IDs or aliases banlists: List of banlist room IDs or aliases
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
list: List of room IDs for banlists list: List of room IDs for banlists
""" """
@@ -81,12 +81,20 @@ async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]:
return banlist_roomids return banlist_roomids
async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: str = "banned", async def ban_user_from_rooms(
all_rooms: bool = False, redact_on_ban: bool = False, client,
get_messages_to_redact_func=None, database=None, user: str,
sleep_time: float = 0.1, logger=None) -> Dict: roomlist: List[str],
reason: str = "banned",
all_rooms: bool = False,
redact_on_ban: bool = False,
get_messages_to_redact_func=None,
database=None,
sleep_time: float = 0.1,
logger=None,
) -> Dict:
"""Ban a user from a list of rooms. """Ban a user from a list of rooms.
Args: Args:
client: Matrix client instance client: Matrix client instance
user: User ID to ban user: User ID to ban
@@ -98,13 +106,13 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st
database: Database instance for redaction tasks database: Database instance for redaction tasks
sleep_time: Sleep time between operations sleep_time: Sleep time between operations
logger: Logger instance logger: Logger instance
Returns: Returns:
dict: Ban results with success/error lists dict: Ban results with success/error lists
""" """
ban_event_map = {"ban_list": {}, "error_list": {}} ban_event_map = {"ban_list": {}, "error_list": {}}
ban_event_map["ban_list"][user] = [] ban_event_map["ban_list"][user] = []
for room in roomlist: for room in roomlist:
try: try:
roomname = None roomname = None
@@ -149,10 +157,16 @@ async def ban_user_from_rooms(client, user: str, roomlist: List[str], reason: st
return ban_event_map return ban_event_map
async def user_permitted(client, user_id: UserID, parent_room: str, min_level: int = 50, async def user_permitted(
room_id: str = None, logger=None) -> bool: client,
user_id: UserID,
parent_room: str,
min_level: int = 50,
room_id: str = None,
logger=None,
) -> bool:
"""Check if a user has sufficient power level in a room. """Check if a user has sufficient power level in a room.
Args: Args:
client: Matrix client instance client: Matrix client instance
user_id: The Matrix ID of the user to check user_id: The Matrix ID of the user to check
@@ -160,18 +174,19 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i
min_level: Minimum required power level (default 50 for moderator) min_level: Minimum required power level (default 50 for moderator)
room_id: The room ID to check permissions in. If None, uses parent room. room_id: The room ID to check permissions in. If None, uses parent room.
logger: Logger instance for error reporting logger: Logger instance for error reporting
Returns: Returns:
bool: True if user has sufficient power level bool: True if user has sufficient power level
""" """
try: try:
target_room = room_id or parent_room target_room = room_id or parent_room
# First check if user has unlimited power (creator in modern room versions) # First check if user has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power from .room_utils import user_has_unlimited_power
if await user_has_unlimited_power(client, user_id, target_room): if await user_has_unlimited_power(client, user_id, target_room):
return True return True
# Then check power level # Then check power level
power_levels = await client.get_state_event( power_levels = await client.get_state_event(
target_room, EventType.ROOM_POWER_LEVELS target_room, EventType.ROOM_POWER_LEVELS
@@ -186,14 +201,15 @@ async def user_permitted(client, user_id: UserID, parent_room: str, min_level: i
async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool: async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
"""Check if a user has unlimited power in a room (creator in modern room versions). """Check if a user has unlimited power in a room (creator in modern room versions).
Args: Args:
client: Matrix client instance client: Matrix client instance
user_id: The user ID to check user_id: The user ID to check
room_id: The room ID to check in room_id: The room ID to check in
Returns: Returns:
bool: True if user has unlimited power bool: True if user has unlimited power
""" """
from .room_utils import user_has_unlimited_power as room_user_has_unlimited_power from .room_utils import user_has_unlimited_power as room_user_has_unlimited_power
return await room_user_has_unlimited_power(client, user_id, room_id) return await room_user_has_unlimited_power(client, user_id, room_id)