# kickbot - a maubot plugin to track user activity and remove inactive users from rooms/spaces. from typing import Awaitable, Type, Optional, Tuple import json import time import re import fnmatch import asyncio from mautrix.client import Client, InternalEventType, MembershipEventDispatcher, SyncStream from mautrix.types import (Event, StateEvent, EventID, UserID, FileInfo, EventType, MediaMessageEventContent, ReactionEvent, RedactionEvent, RoomID, RoomAlias, PowerLevelStateEventContent, MessageType, PaginationDirection) from mautrix.errors import MNotFound from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin, MessageEvent from maubot.handlers import command, event BAN_STATE_EVENT = EventType.find("m.policy.rule.user", EventType.Class.STATE) # database table related things from .db import upgrade_table class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("sleep") helper.copy("welcome_sleep") helper.copy("admins") helper.copy("moderators") helper.copy("parent_room") helper.copy("track_users") helper.copy("track_messages") helper.copy("track_reactions") helper.copy("warn_threshold_days") helper.copy("kick_threshold_days") helper.copy("encrypt") helper.copy("invitees") helper.copy("notification_room") helper.copy("join_notification_message") helper.copy_dict("greeting_rooms") helper.copy_dict("greetings") helper.copy("censor") helper.copy("uncensor_pl") helper.copy("censor_wordlist") helper.copy("censor_wordlist_instaban") helper.copy("censor_files") helper.copy("banlists") helper.copy("proactive_banning") helper.copy("redact_on_ban") class CommunityBot(Plugin): _redaction_tasks: asyncio.Task = None async def start(self) -> None: await super().start() self.config.load_and_update() self.client.add_dispatcher(MembershipEventDispatcher) # Start background redaction task self._redaction_tasks = asyncio.create_task(self._redaction_loop()) async def stop(self) -> None: if self._redaction_tasks: self._redaction_tasks.cancel() await super().stop() async def user_permitted(self, user_id: UserID, min_level: int = 50) -> bool: """Check if a user has sufficient power level in the parent room. Args: user_id: The Matrix ID of the user to check min_level: Minimum required power level (default 50 for moderator) Returns: bool: True if user has sufficient power level """ try: power_levels = await self.client.get_state_event( self.config["parent_room"], EventType.ROOM_POWER_LEVELS ) user_level = power_levels.get_user_level(user_id) return user_level >= min_level except Exception as e: self.log.error(f"Failed to check user power level: {e}") return False async def _redaction_loop(self) -> None: while True: try: # Get all rooms with pending redactions rooms = await self.database.fetch( "SELECT DISTINCT room_id FROM redaction_tasks" ) for room in rooms: await self.redact_messages(room['room_id']) await asyncio.sleep(60) # Run every minute except asyncio.CancelledError: break except Exception as e: self.log.error(f"Error in redaction loop: {e}") await asyncio.sleep(60) # Wait a minute before retrying on error async def do_sync(self) -> None: if not self.config["track_users"]: return "user tracking is disabled" space_members_obj = await self.client.get_joined_members(self.config["parent_room"]) space_members_list = space_members_obj.keys() table_users = await self.database.fetch("SELECT mxid FROM user_events") table_user_list = [ row["mxid"] for row in table_users ] untracked_users = set(space_members_list) - set(table_user_list) non_space_members = set(table_user_list) - set(space_members_list) results = {} results['added'] = [] results['dropped'] = [] try: for user in untracked_users: now = int(time.time() * 1000) q = """ INSERT INTO user_events (mxid, last_message_timestamp) VALUES ($1, $2) """ await self.database.execute(q, user, now) results['added'].append(user) self.log.info(f"{user} inserted into activity tracking table") for user in non_space_members: await self.database.execute("DELETE FROM user_events WHERE mxid = $1", user) self.log.info(f"{user} is not a space member, dropped from activity tracking table") results['dropped'].append(user) except Exception as e: self.log.exception(e) return results async def get_space_roomlist(self) -> None: space = self.config["parent_room"] rooms = [] state = await self.client.get_state(space) for evt in state: if evt.type == EventType.SPACE_CHILD: # only look for rooms that include a via path, otherwise they # are not really in the space! if evt.content and evt.content.via: rooms.append(evt.state_key) return rooms async def generate_report(self) -> None: now = int(time.time() * 1000) warn_days_ago = (now - (1000 * 60 * 60 * 24 * self.config["warn_threshold_days"])) kick_days_ago = (now - (1000 * 60 * 60 * 24 * self.config["kick_threshold_days"])) warn_q = """ SELECT mxid FROM user_events WHERE last_message_timestamp <= $1 AND last_message_timestamp >= $2 AND (ignore_inactivity < 1 OR ignore_inactivity IS NULL) """ kick_q = """ SELECT mxid FROM user_events WHERE last_message_timestamp <= $1 AND (ignore_inactivity < 1 OR ignore_inactivity IS NULL) """ ignored_q = """ SELECT mxid FROM user_events WHERE ignore_inactivity = 1 """ warn_inactive_results = await self.database.fetch(warn_q, warn_days_ago, kick_days_ago) kick_inactive_results = await self.database.fetch(kick_q, kick_days_ago) ignored_results = await self.database.fetch(ignored_q) report = {} report["warn_inactive"] = [ row["mxid"] for row in warn_inactive_results ] or ["none"] report["kick_inactive"] = [ row["mxid"] for row in kick_inactive_results ] or ["none"] report["ignored"] = [ row["mxid"] for row in ignored_results ] or ["none"] return report def flag_message(self, msg): if msg.content.msgtype in [MessageType.FILE, MessageType.IMAGE, MessageType.VIDEO]: return self.config['censor_files'] for w in self.config['censor_wordlist']: try: if bool(re.search(w, msg.content.body, re.IGNORECASE)): #self.log.debug(f"DEBUG message flagged for censorship") return True else: pass except Exception as e: self.log.error(f"Could not parse message for flagging: {e}") def flag_instaban(self, msg): for w in self.config['censor_wordlist_instaban']: try: if bool(re.search(w, msg.content.body, re.IGNORECASE)): #self.log.debug(f"DEBUG message flagged for instaban") return True else: pass except Exception as e: self.log.error(f"Could not parse message for flagging: {e}") def censor_room(self, msg): if isinstance(self.config['censor'], bool): #self.log.debug(f"DEBUG message will be redacted because censoring is enabled") return self.config['censor'] elif isinstance(self.config['censor'], list): if msg.room_id in self.config['censor']: #self.log.debug(f"DEBUG message will be redacted because censoring is enabled for THIS room") return True else: return False async def check_if_banned(self, userid): # fetch banlist data is_banned = False myrooms = await self.client.get_joined_rooms() banlist_roomids = await self.get_banlist_roomids() for list_id in banlist_roomids: if list_id not in myrooms: self.log.error(f"Bot must be in {list_id} before attempting to use it as a banlist.") pass #self.log.debug(f"DEBUG looking up state in {list_id}") list_state = await self.client.get_state(list_id) #self.log.debug(f"DEBUG state found: {list_state}") try: user_policies = list(filter(lambda p : p.type.t=='m.policy.rule.user', list_state)) #self.log.debug(f"DEBUG user policies found: {user_policies}") except Exception as e: self.log.error(e) for rule in user_policies: #self.log.debug(f"Checking match of user {userid} in banlist {l} for {rule['content']}") try: if bool(fnmatch.fnmatch(userid, rule["content"]["entity"])) and \ bool(re.search('ban$', rule["content"]["recommendation"])): #self.log.debug(f"DEBUG user {userid} matches ban rule {rule['content']['entity']}!") return True else: pass except Exception as e: self.log.debug(f"Found something funny in the banlist {list_id} for {rule['content']}: {e}") pass # if we haven't exited by now, we must not be banned! return is_banned async def get_messages_to_redact(self, room_id, mxid): try: messages = await self.client.get_messages( room_id, limit=100, filter_json={ "senders": [mxid], "not_types": ["m.room.redaction"] }, direction=PaginationDirection.BACKWARD ) # Filter out events with empty content filtered_events = [event for event in messages.events if event.content and event.content.serialize()] self.log.debug(f"DEBUG found {len(filtered_events)} messages to redact in {room_id} (after filtering empty content)") return filtered_events except Exception as e: self.log.error(f"Error getting messages to redact: {e}") return [] async def redact_messages(self, room_id): counters = {'success': 0, 'failure': 0} sleep_time = self.config['sleep'] events = await self.database.fetch( "SELECT event_id FROM redaction_tasks WHERE room_id = $1", room_id ) for event in events: try: await self.client.redact(room_id, event['event_id'], reason="content removed") counters['success'] += 1 await self.database.execute( "DELETE FROM redaction_tasks WHERE event_id = $1", event['event_id'] ) await asyncio.sleep(sleep_time) except Exception as e: if "Too Many Requests" in str(e): self.log.warning(f"Rate limited while redacting messages in {room_id}, will try again in next loop") return counters self.log.error(f"Failed to redact message: {e}") counters['failure'] += 1 await asyncio.sleep(sleep_time) return counters async def check_bot_permissions(self, room_id: str, evt: MessageEvent = None, required_permissions: list[str] = None) -> tuple[bool, str, dict]: """Check if the bot has necessary permissions in a room. Args: room_id: The ID of the room to check permissions in evt: Optional MessageEvent for progress updates required_permissions: List of specific permissions to check. If None, checks basic room access. Returns: tuple: (bool, str, dict) - (has_permissions, error_message, permission_details) """ try: # Check if bot is in the room try: await self.client.get_state_event(room_id, EventType.ROOM_MEMBER, self.client.mxid) except MNotFound: return False, "Bot is not a member of this room", {} # Get power levels power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS) bot_level = power_levels.users.get(self.client.mxid, power_levels.users_default) # Define required power levels for different actions permission_requirements = { "redact": power_levels.redact, "kick": power_levels.kick, "ban": power_levels.ban, "invite": power_levels.invite, "tombstone": power_levels.events.get("m.room.tombstone", power_levels.events_default), "power_levels": power_levels.events.get("m.room.power_levels", power_levels.events_default), "state": power_levels.state_default } # Check each required permission permission_status = {} if required_permissions: for perm in required_permissions: if perm in permission_requirements: required_level = permission_requirements[perm] permission_status[perm] = { "has_permission": bot_level >= required_level, "required_level": required_level, "bot_level": bot_level } # If no specific permissions requested, just check basic access if not required_permissions: if bot_level < 50: # Basic moderator level return False, "Bot does not have sufficient power level (needs at least moderator level)", permission_status return True, "", permission_status # Check if all requested permissions are granted missing_permissions = [perm for perm, status in permission_status.items() if not status["has_permission"]] if missing_permissions: error_msg = "Bot is missing required permissions: " + ", ".join(missing_permissions) return False, error_msg, permission_status return True, "", permission_status except Exception as e: error_msg = f"Failed to check bot permissions: {e}" self.log.error(error_msg) if evt: await evt.respond(error_msg) return False, error_msg, {} async def do_archive_room(self, room_id: str, evt: MessageEvent = None, replacement_room: str = "") -> bool: """Handle common room archival activities like removing from space, removing aliases, and setting tombstone. Args: room_id: The ID of the room to archive evt: Optional MessageEvent for progress updates replacement_room: Optional room ID to point to in the tombstone event Returns: bool: True if all operations succeeded, False otherwise """ try: # Check permissions for all required operations has_perms, error_msg, _ = await self.check_bot_permissions( room_id, evt, ["state", "tombstone", "power_levels"] ) if not has_perms: if evt: await evt.respond(f"Cannot archive room: {error_msg}") return False # Try to remove the room from the space first self.log.debug(f"DEBUG removing space state reference from {room_id}") await self.client.send_state_event( room_id=room_id, event_type="m.space.parent", content={}, # Empty content removes the state state_key=self.config["parent_room"] ) self.log.info(f"Removed parent space reference from room {room_id}") # Remove the child reference from the space self.log.debug(f"DEBUG removing child state reference from {self.config['parent_room']}") await self.client.send_state_event( self.config["parent_room"], event_type="m.space.child", content={}, # Empty content removes the state state_key=room_id ) self.log.info(f"Removed child room reference from space {self.config['parent_room']}") # Remove room aliases to release them await self.remove_room_aliases(room_id, evt) # Send the tombstone tombstone_content = { "body": "This room has been archived." if not replacement_room else "This room has been replaced. Please join the new room.", "replacement_room": replacement_room } await self.client.send_state_event( room_id=room_id, event_type=EventType.ROOM_TOMBSTONE, content=tombstone_content ) self.log.info(f"Successfully added tombstone to room {room_id}") return True except Exception as e: error_msg = f"Failed to archive room: {e}" self.log.error(error_msg) if evt: await evt.respond(error_msg) return False async def remove_room_aliases(self, room_id: str, evt: MessageEvent = None) -> list: """Remove all aliases from a room. Args: room_id: The ID of the room whose aliases to remove evt: Optional MessageEvent for progress updates Returns: list: List of aliases that were successfully removed """ removed_aliases = [] try: aliases = await self.client.get_state_event( room_id=room_id, event_type=EventType.ROOM_CANONICAL_ALIAS ) except Exception as e: self.log.warning(f"Failed to get room alias state event, skipping: {e}") return removed_aliases if aliases.alt_aliases: for alias in aliases.alt_aliases: try: await self.client.remove_room_alias( alias_localpart=alias.split(':')[0].lstrip('#'), ) self.log.info(f"Removed alias {alias} from room {room_id}") removed_aliases.append(alias) except Exception as e: self.log.warning(f"Failed to remove alias {alias}: {e}") if aliases.canonical_alias: try: await self.client.remove_room_alias( alias_localpart=aliases.canonical_alias.split(':')[0].lstrip('#'), ) self.log.info(f"Removed canonical alias {aliases.canonical_alias} from room {room_id}") removed_aliases.append(aliases.canonical_alias) except Exception as e: self.log.warning(f"Failed to remove canonical alias: {e}") return removed_aliases async def ban_this_user(self, user, reason="banned", all_rooms=False): roomlist = await self.get_space_roomlist() # don't forget to kick from the space itself roomlist.append(self.config["parent_room"]) ban_event_map = {'ban_list':{}, 'error_list':{}} ban_event_map['ban_list'][user] = [] for room in roomlist: try: roomname = None roomnamestate = await self.client.get_state_event(room, 'm.room.name') roomname = roomnamestate['name'] # ban user even if they're not in the room! if all_rooms: pass else: await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) await self.client.ban_user(room, user, reason=reason) if roomname: ban_event_map['ban_list'][user].append(roomname) else: ban_event_map['ban_list'][user].append(room) time.sleep(self.config['sleep']) except MNotFound: pass except Exception as e: self.log.warning(e) ban_event_map['error_list'][user] = [] ban_event_map['error_list'][user].append(roomname or room) if self.config["redact_on_ban"]: messages = await self.get_messages_to_redact(room, user) # Queue messages for redaction for msg in messages: await self.database.execute( "INSERT INTO redaction_tasks (event_id, room_id) VALUES ($1, $2)", msg.event_id, room ) self.log.info(f"Queued {len(messages)} messages for redaction in {roomname or room}") return ban_event_map async def get_banlist_roomids(self): banlist_roomids = [] for l in self.config['banlists']: #self.log.debug(f"DEBUG getting banlist {l}") if l.startswith('#'): try: l_id = await self.client.resolve_room_alias(l) list_id = l_id["room_id"] time.sleep(self.config['sleep']) #self.log.debug(f"DEBUG banlist id resolves to: {list_id}") except: self.log.error(f"Banlist fetching failed for {l}") return else: list_id = l banlist_roomids.append(list_id) return banlist_roomids @event.on(BAN_STATE_EVENT) async def check_ban_event(self, evt:StateEvent) -> None: if not self.config["proactive_banning"]: return banlist_roomids = await self.get_banlist_roomids() # we only care about ban events in rooms in the banlist if evt.room_id not in banlist_roomids: return else: try: entity = evt.content["entity"] recommendation = evt.content["recommendation"] self.log.debug(f"DEBUG new ban rule found: {entity} should have action {recommendation}") if bool(re.search(r"[*?]", entity)): self.log.debug(f"DEBUG ban rule appears to be glob pattern, skipping proactive measures.") return if bool(re.search('ban$', recommendation)): await self.ban_this_user(entity) except Exception as e: self.log.error(e) @event.on(EventType.ROOM_POWER_LEVELS) async def sync_power_levels(self, evt: StateEvent) -> None: # Only care about changes in the parent room if evt.room_id != self.config['parent_room']: return # Get the changed user and their new power level try: old_levels = evt.prev_content.get("users", {}) new_levels = evt.content.get("users", {}) # Find which user's power level changed changed_users = {} for user, new_level in new_levels.items(): if user not in old_levels or old_levels[user] != new_level: changed_users[user] = new_level if not changed_users: return # Get all rooms in the space space_rooms = await self.client.get_joined_rooms() success_rooms = [] failed_rooms = [] # Apply the same power level changes to each room for room_id in space_rooms: if room_id == self.config['parent_room']: continue try: roomname = (await self.client.get_state_event(room_id, "m.room.name"))["name"] except: self.log.warning(f"Unable to get room name for {room_id}") # Get current power levels try: # Get current power levels current_pl = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS) # Update existing power levels object with new levels users = current_pl.get("users", {}) for user, level in changed_users.items(): users[user] = level current_pl["users"] = users # Send updated power levels try: await self.client.send_state_event(room_id, EventType.ROOM_POWER_LEVELS, current_pl) success_rooms.append(roomname or room_id) except Exception as e: self.log.error(f"Failed to send power levels to {roomname or room_id}: {e}") failed_rooms.append(roomname or room_id) time.sleep(self.config['sleep']) except Exception as e: self.log.warning(f"Failed to update power levels in {room_id}: {e}") failed_rooms.append(room_id) # Send notification if configured if self.config["notification_room"]: changes = ", ".join([f"{user} → {level}" for user, level in changed_users.items()]) notification = f"Power level changes ({changes}) propagated from parent room:
" notification += f"Succeeded in: {', '.join(success_rooms)}
" if failed_rooms: notification += f"Failed in: {', '.join(failed_rooms)}" await self.client.send_notice( self.config["notification_room"], html=notification ) except Exception as e: self.log.error(f"Error syncing power levels: {e}") @event.on(InternalEventType.JOIN) async def newjoin(self, evt:StateEvent) -> None: if evt.source & SyncStream.STATE: return else: on_banlist = await self.check_if_banned(evt.sender) if on_banlist: #self.log.debug(f"DEBUG user is on banlist!") # ban this account in managed rooms, don't bother with anything else await self.ban_this_user(evt.sender) return # passive sync of tracking db if evt.room_id == self.config['parent_room']: await self.do_sync() # greeting activities room_id = str(evt.room_id) if room_id in self.config["greeting_rooms"]: # just in case we got here even if the person is on the banlists if on_banlist: return greeting_map = self.config['greetings'] greeting_name = self.config['greeting_rooms'][room_id] nick = self.client.parse_user_id(evt.sender)[0] pill = '{nick}'.format(mxid=evt.sender, nick=nick) if greeting_name != "none": greeting = greeting_map[greeting_name].format(user=pill) time.sleep(self.config['welcome_sleep']) await self.client.send_notice(evt.room_id, html=greeting) else: pass if self.config["notification_room"]: roomnamestate = await self.client.get_state_event(evt.room_id, 'm.room.name') roomname = roomnamestate['name'] notification_message = self.config['join_notification_message'].format(user=evt.sender, room=roomname) await self.client.send_notice(self.config["notification_room"], html=notification_message) @event.on(EventType.ROOM_MESSAGE) async def update_message_timestamp(self, evt: MessageEvent) -> None: power_levels = await self.client.get_state_event(evt.room_id, EventType.ROOM_POWER_LEVELS) user_level = power_levels.get_user_level(evt.sender) #self.log.debug(f"DEBUGDEBUG user {evt.sender} has power level {user_level}") if self.flag_message(evt): # do we need to redact? if not await self.user_permitted(evt.sender) and \ evt.sender != self.client.mxid and \ self.censor_room(evt): try: await self.client.redact(evt.room_id, evt.event_id, reason="message flagged") except Exception as e: self.log.error(f"Flagged message could not be redacted: {e}") if evt.content.msgtype in {MessageType.TEXT, MessageType.NOTICE, MessageType.EMOTE}: if self.flag_instaban(evt): # do we need to redact? if not await self.user_permitted(evt.sender) and \ evt.sender != self.client.mxid and \ self.censor_room(evt): try: await self.client.redact(evt.room_id, evt.event_id, reason="message flagged") except Exception as e: self.log.error(f"Flagged message could not be redacted: {e}") await self.ban_this_user(evt.sender, all_rooms=True) if not self.config["track_messages"] or not self.config["track_users"]: pass else: rooms_to_manage = await self.get_space_roomlist() # only attempt to track rooms in the space, ignore any other rooms # the bot may happen to be in line banlist policy rooms etc. if evt.room_id not in rooms_to_manage: return else: q = """ INSERT INTO user_events(mxid, last_message_timestamp) VALUES ($1, $2) ON CONFLICT(mxid) DO UPDATE SET last_message_timestamp=$2 """ await self.database.execute(q, evt.sender, evt.timestamp) @event.on(EventType.REACTION) async def update_reaction_timestamp(self, evt: MessageEvent) -> None: if not self.config["track_reactions"] or not self.config["track_users"]: pass else: rooms_to_manage = await self.get_space_roomlist() # only attempt to track rooms in the space, ignore any other rooms # the bot may happen to be in line banlist policy rooms etc. if evt.room_id not in rooms_to_manage: return else: q = """ INSERT INTO user_events(mxid, last_message_timestamp) VALUES ($1, $2) ON CONFLICT(mxid) DO UPDATE SET last_message_timestamp=$2 """ await self.database.execute(q, evt.sender, evt.timestamp) @command.new("community", help="manage rooms and members of a space") async def community(self) -> None: pass @community.subcommand("bancheck", help="check subscribed banlists for a user's mxid") @command.argument("mxid", "full matrix ID", required=True) async def check_banlists(self, evt: MessageEvent, mxid: UserID) -> None: ban_status = await self.check_if_banned(mxid) await evt.reply(f"user on banlist: {ban_status}") @community.subcommand("sync", help="update the activity tracker with the current space members \ in case they are missing") async def sync_space_members(self, evt: MessageEvent) -> None: if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return # check config values for admins and moderators. if they have a lower PL in the parent room, # attempt to update the parent room with their appropriate admin/mod status # we can skip all of this logic if those config values are empty # this logic helps migrate explicit configuration to the parent-room inheritance model if not self.config["admins"] and not self.config["moderators"]: self.log.info("no admins or moderators configured, skipping power level sync") pass else: power_levels = await self.client.get_state_event(self.config["parent_room"], EventType.ROOM_POWER_LEVELS) users = power_levels.get("users", {}) for user in self.config["admins"]: if user not in users or users.get(user) < 100: # update the users object in-place users[user] = 100 for user in self.config["moderators"]: if user not in users or users.get(user) < 50: # update the users object in-place users[user] = 50 try: # update full powerlevels object with updated user object power_levels["users"] = users await self.client.send_state_event(self.config["parent_room"], EventType.ROOM_POWER_LEVELS, power_levels) # if updating was successful, let's go ahead and clear out the values in the config self.config["admins"] = [] self.config["moderators"] = [] # and save the config to the file self.config.save() self.log.debug("successfully migrated admin/mod config to parent room") except Exception as e: self.log.error(f"Failed to send power levels to {self.config["parent_room"]}: {e}") await evt.respond(f"Failed to send power levels to {self.config["parent_room"]}: {e}") if not self.config["track_users"]: await evt.respond("user tracking is disabled") return results = await self.do_sync() added_str = "
".join(results['added']) dropped_str = "
".join(results['dropped']) await evt.respond(f"Added: {added_str}

Dropped: {dropped_str}", allow_html=True) @community.subcommand("ignore", help="exclude a specific matrix ID from inactivity tracking") @command.argument("mxid", "full matrix ID", required=True) async def ignore_inactivity(self, evt: MessageEvent, mxid: UserID) -> None: if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if not self.config["track_users"]: await evt.reply("user tracking is disabled") return try: Client.parse_user_id(mxid) await self.database.execute("UPDATE user_events SET ignore_inactivity = 1 WHERE \ mxid = $1", mxid) self.log.info(f"{mxid} set to ignore inactivity") await evt.react("✅") except Exception as e: await evt.respond(f"{e}") @community.subcommand("unignore", help="re-enable activity tracking for a specific matrix ID") @command.argument("mxid", "full matrix ID", required=True) async def unignore_inactivity(self, evt: MessageEvent, mxid: UserID) -> None: if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if not self.config["track_users"]: await evt.reply("user tracking is disabled") return try: Client.parse_user_id(mxid) await self.database.execute("UPDATE user_events SET ignore_inactivity = 0 WHERE \ mxid = $1", mxid) self.log.info(f"{mxid} set to track inactivity") await evt.react("✅") except Exception as e: await evt.respond(f"{e}") @community.subcommand("report", help='generate a list of matrix IDs that have been inactive') async def get_report(self, evt: MessageEvent) -> None: if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if not self.config["track_users"]: await evt.reply("user tracking is disabled") return sync_results = await self.do_sync() report = await self.generate_report() await evt.respond(f"

Users inactive for between {self.config['warn_threshold_days']} and \ {self.config['kick_threshold_days']} days:
\ {'
'.join(report['warn_inactive'])}

\

Users inactive for at least {self.config['kick_threshold_days']} days:
\ {'
'.join(report['kick_inactive'])}

\

Ignored users:
\ {'
'.join(report['ignored'])}

", \ allow_html=True) @community.subcommand("purge", help='kick users for excessive inactivity') async def kick_users(self, evt: MessageEvent) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return msg = await evt.respond("starting the purge...") report = await self.generate_report() purgeable = report['kick_inactive'] roomlist = await self.get_space_roomlist() # don't forget to kick from the space itself roomlist.append(self.config["parent_room"]) purge_list = {} error_list = {} for user in purgeable: purge_list[user] = [] for room in roomlist: try: roomname = None roomnamestate = await self.client.get_state_event(room, 'm.room.name') roomname = roomnamestate['name'] await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) await self.client.kick_user(room, user, reason='inactivity') if roomname: purge_list[user].append(roomname) else: purge_list[user].append(room) time.sleep('sleep') except MNotFound: pass except Exception as e: self.log.warning(e) error_list[user] = [] error_list[user].append(roomname or room) results = "the following users were purged:

{purge_list}

the following errors were \ recorded:

{error_list}

".format(purge_list=purge_list, error_list=error_list) await evt.respond(results, allow_html=True, edits=msg) # sync our database after we've made changes to room memberships await self.do_sync() @community.subcommand("kick", help='kick a specific user from the community and all rooms') @command.argument("mxid", "full matrix ID", required=True) async def kick_user(self, evt: MessageEvent, mxid: UserID) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return user = mxid msg = await evt.respond("starting the purge...") roomlist = await self.get_space_roomlist() # don't forget to kick from the space itself roomlist.append(self.config["parent_room"]) purge_list = {} error_list = {} purge_list[user] = [] for room in roomlist: try: roomname = None roomnamestate = await self.client.get_state_event(room, 'm.room.name') roomname = roomnamestate['name'] await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) await self.client.kick_user(room, user, reason='kicked') if roomname: purge_list[user].append(roomname) else: purge_list[user].append(room) time.sleep(self.config['sleep']) except MNotFound: pass except Exception as e: self.log.warning(e) error_list[user] = [] error_list[user].append(roomname or room) results = "the following users were kicked:

{purge_list}

the following errors were \ recorded:

{error_list}

".format(purge_list=purge_list, error_list=error_list) await evt.respond(results, allow_html=True, edits=msg) # sync our database after we've made changes to room memberships await self.do_sync() @community.subcommand("ban", help='kick and ban a specific user from the community and all rooms') @command.argument("mxid", "full matrix ID", required=True) async def ban_user(self, evt: MessageEvent, mxid: UserID) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return user = mxid msg = await evt.respond("starting the ban...") results_map = await self.ban_this_user(user, all_rooms=True) results = "the following users were kicked and banned:

{ban_list}

the following errors were \ recorded:

{error_list}

".format(ban_list=results_map['ban_list'], error_list=results_map['error_list']) await evt.respond(results, allow_html=True, edits=msg) # sync our database after we've made changes to room memberships await self.do_sync() @community.subcommand("unban", help='unban a specific user from the community and all rooms') @command.argument("mxid", "full matrix ID", required=True) async def unban_user(self, evt: MessageEvent, mxid: UserID) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return user = mxid msg = await evt.respond("starting the unban...") roomlist = await self.get_space_roomlist() # don't forget to kick from the space itself roomlist.append(self.config["parent_room"]) unban_list = {} error_list = {} unban_list[user] = [] for room in roomlist: try: roomname = None roomnamestate = await self.client.get_state_event(room, 'm.room.name') roomname = roomnamestate['name'] await self.client.get_state_event(room, EventType.ROOM_MEMBER, user) await self.client.unban_user(room, user, reason='unbanned') if roomname: unban_list[user].append(roomname) else: unban_list[user].append(room) time.sleep(self.config['sleep']) except MNotFound: pass except Exception as e: self.log.warning(e) error_list[user] = [] error_list[user].append(roomname or room) results = "the following users were unbanned:

{unban_list}

the following errors were \ recorded:

{error_list}

".format(unban_list=unban_list, error_list=error_list) await evt.respond(results, allow_html=True, edits=msg) # sync our database after we've made changes to room memberships await self.do_sync() @community.subcommand("redact", help="redact messages from a specific user (optionally in a specific room)") @command.argument("mxid", "full matrix ID", required=True) @command.argument("room", "room ID", required=False) async def mark_for_redaction(self, evt: MessageEvent, mxid: UserID, room: str) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if room: if room.startswith('#'): room_id = await self.client.resolve_room_alias(room) room_id = room_id["room_id"] else: room_id = room else: room_id = evt.room_id # get list of messages to redact in this room messages = await self.get_messages_to_redact(room_id, mxid) for msg in messages: await self.database.execute( "INSERT INTO redaction_tasks (event_id, room_id) VALUES ($1, $2)", msg.event_id, room_id ) await evt.respond(f"Queued {len(messages)} messages for redaction in {room_id}") async def create_room(self, roomname: str, evt: MessageEvent = None) -> None: """Create a new room and add it to the parent space. Args: roomname: The name for the new room evt: Optional MessageEvent for progress updates. If provided, will send status messages. Returns: tuple: (room_id, room_alias) if successful, None if failed """ encrypted_flag_regex = re.compile(r'(\s+|^)-+encrypt(ed)?\s?') force_encryption = bool(encrypted_flag_regex.search(roomname)) try: if force_encryption: roomname = encrypted_flag_regex.sub('', roomname) sanitized_name = re.sub(r"[^a-zA-Z0-9]", '', roomname).lower() invitees = self.config['invitees'] parent_room = self.config['parent_room'] server = self.client.parse_user_id(self.client.mxid)[1] # Set bot PL higher than admin so we can kick old admins if needed pl_override = {"users": {self.client.mxid: 1000}} # Get power levels from parent room parent_power_levels = await self.client.get_state_event( self.config["parent_room"], EventType.ROOM_POWER_LEVELS ) # Copy power levels from parent room for all users for user, level in parent_power_levels.users.items(): if user != self.client.mxid: # Skip bot's power level pl_override["users"][user] = level if evt: mymsg = await evt.respond(f"creating {sanitized_name}, give me a minute...") room_id = await self.client.create_room( alias_localpart=sanitized_name, name=roomname, invitees=invitees, power_level_override=pl_override ) await asyncio.sleep(self.config['sleep']) if evt: await evt.respond(f"updating room states...", edits=mymsg) # Set up room state events parent_event_content = json.dumps({'auto_join': False, 'suggested': False, 'via': [server]}) child_event_content = json.dumps({'canonical': True, 'via': [server]}) join_rules_content = json.dumps({ 'join_rule': 'restricted', 'allow': [{'type': 'm.room_membership', 'room_id': parent_room}] }) await self.client.send_state_event(parent_room, 'm.space.child', parent_event_content, state_key=room_id) await asyncio.sleep(self.config['sleep']) await self.client.send_state_event(room_id, 'm.space.parent', child_event_content, state_key=parent_room) await asyncio.sleep(self.config['sleep']) await self.client.send_state_event(room_id, 'm.room.join_rules', join_rules_content, state_key="") await asyncio.sleep(self.config['sleep']) if self.config["encrypt"] or force_encryption: encryption_content = json.dumps({"algorithm": "m.megolm.v1.aes-sha2"}) await self.client.send_state_event(room_id, 'm.room.encryption', encryption_content, state_key="") if evt: await evt.respond(f"encrypting room...", edits=mymsg) await asyncio.sleep(self.config['sleep']) if evt: await evt.respond(f"room created and updated, alias is #{sanitized_name}:{server}", edits=mymsg) return room_id, f"#{sanitized_name}:{server}" except Exception as e: if evt: await evt.respond(f"i tried, but something went wrong: \"{e}\"", edits=mymsg) self.log.error(f"Failed to create room: {e}") return None @community.subcommand("createroom", help="create a new room titled and add it to the parent space. \ optionally include `--encrypt` to encrypt it regardless of the default settings.") @command.argument("roomname", pass_raw=True, required=True) async def create_that_room(self, evt: MessageEvent, roomname: str) -> None: if (roomname == "help") or len(roomname) == 0: await evt.reply('pass me a room name (like "cool topic") and i will create it and add it to the space. \ use `--encrypt` to ensure it is encrypted at creation time even if that isnt my default \ setting.') else: if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return encrypted_flag_regex = re.compile(r'(\s+|^)-+encrypt(ed)?\s?') force_encryption = bool(encrypted_flag_regex.search(roomname)) try: if force_encryption: roomname = encrypted_flag_regex.sub('', roomname) sanitized_name = re.sub(r"[^a-zA-Z0-9]", '', roomname).lower() invitees = self.config['invitees'] parent_room = self.config['parent_room'] ## homeserver is derived from maubot's client instance since this is the user that will create the room server = self.client.parse_user_id(self.client.mxid)[1] # set bot PL higher than admin so we can kick old admins if needed pl_override = {"users": {self.client.mxid: 1000}} for u in self.config['admins']: pl_override["users"][u] = 100 for u in self.config['moderators']: pl_override["users"][u] = 50 pl_json = json.dumps(pl_override) mymsg = await evt.respond(f"creating {sanitized_name}, give me a minute...") #self.log.info(mymsg) room_id = await self.client.create_room(alias_localpart=sanitized_name, name=roomname, invitees=invitees, power_level_override=pl_override) time.sleep(self.config['sleep']) await evt.respond(f"updating room states...", edits=mymsg) parent_event_content = json.dumps({'auto_join': False, 'suggested': False, 'via': [server]}) child_event_content = json.dumps({'canonical': True, 'via': [server]}) join_rules_content = json.dumps({'join_rule': 'restricted', 'allow': [{'type': 'm.room_membership', 'room_id': parent_room}]}) await self.client.send_state_event(parent_room, 'm.space.child', parent_event_content, state_key=room_id) time.sleep(self.config['sleep']) await self.client.send_state_event(room_id, 'm.space.parent', child_event_content, state_key=parent_room) time.sleep(self.config['sleep']) await self.client.send_state_event(room_id, 'm.room.join_rules', join_rules_content, state_key="") time.sleep(self.config['sleep']) if self.config["encrypt"] or force_encryption: encryption_content = json.dumps({"algorithm": "m.megolm.v1.aes-sha2"}) await self.client.send_state_event(room_id, 'm.room.encryption', encryption_content, state_key="") await evt.respond(f"encrypting room...", edits=mymsg) time.sleep(self.config['sleep']) await evt.respond(f"room created and updated, alias is #{sanitized_name}:{server}", edits=mymsg) except Exception as e: await evt.respond(f"i tried, but something went wrong: \"{e}\"", edits=mymsg) @community.subcommand("archive", help="archive a room") @command.argument("room", required=False) async def archive_room(self, evt: MessageEvent, room: str) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if not room: room_id = evt.room_id self.log.debug(f"DEBUG room we are archiving is {room_id}") elif room and room.startswith('#'): try: self.log.debug(f"DEBUG trying to resolve alias {room}") room_id = await self.client.resolve_room_alias(room) room_id = room_id["room_id"] self.log.debug(f"DEBUG room we are archiving is {room_id}") except Exception as e: evt.reply("i couldn't resolve that alias, sorry") self.log.error(f"error resolving alias {room}: {e}") return elif room and room.startswith('!'): room_id = room self.log.debug(f"DEBUG room we are archiving is {room_id}") else: evt.reply("i don't recognize that room, sorry") return success = await self.do_archive_room(room_id, evt) # Only try to respond if we're not archiving the room we're in if success and room_id != evt.room_id: await evt.respond("Room has been archived.") @community.subcommand("replaceroom", help="replace a room with a new one") @command.argument("room", required=False) async def replace_room(self, evt: MessageEvent, room: str) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender): await evt.reply("You don't have permission to use this command") return if not room: room = evt.room_id # first we need to get relevant room state of the room we want to replace # this includes the room name, alias, and join rules if room.startswith('#'): room_id = await self.client.resolve_room_alias(room) room_id = room_id["room_id"] else: room_id = room # Check bot permissions in the old room has_perms, error_msg, _ = await self.check_bot_permissions( room_id, evt, ["state", "tombstone", "power_levels"] ) if not has_perms: await evt.respond(f"Cannot replace room: {error_msg}") return # Get the room name from the state event try: room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME) room_name = room_name_event.name except Exception as e: self.log.warning(f"Failed to get room name: {e}") #await evt.respond("Could not find room name in state events") pass # get the room topic from the state event try: room_topic_event = await self.client.get_state_event(room_id, EventType.ROOM_TOPIC) room_topic = room_topic_event.topic except Exception as e: self.log.warning(f"Failed to get room topic: {e}") pass # Get list of aliases to transfer while removing them from the old room aliases_to_transfer = await self.remove_room_aliases(room_id, evt) # Now we can start the process of replacing the room # First we need to create the new room. this will create the initial alias, # as well as bot defaults such as power levels, initial invitations, encryption, # and space membership new_room_id, new_room_alias = await self.create_room(room_name, evt) if not new_room_id: await evt.respond("Failed to create new room") return # Check bot permissions in the new room has_perms, error_msg, _ = await self.check_bot_permissions( new_room_id, evt, ["state", "tombstone", "power_levels"] ) if not has_perms: await evt.respond(f"Created new room but cannot complete replacement: {error_msg}") return # Transfer the aliases to the new room for alias in aliases_to_transfer: localpart = alias.split(':')[0][1:] # Remove # and get localpart server = alias.split(':')[1] try: await self.client.add_room_alias(new_room_id, localpart) self.log.info(f"Successfully transferred alias {alias} to new room {new_room_id}") except Exception as e: # If transfer failed, try to create a modified alias modified_alias = f"{localpart}NEW" try: await self.client.add_room_alias(new_room_id, modified_alias) self.log.info(f"Successfully transferred modified alias {modified_alias} to new room {new_room_id}") except Exception as e2: self.log.error(f"Failed to transfer modified alias {modified_alias}: {e2}") # Get the room avatar from the old room try: old_room_avatar = await self.client.get_state_event(room_id, EventType.ROOM_AVATAR) if old_room_avatar and old_room_avatar.url: # Set the same avatar in the new room await self.client.send_state_event( new_room_id, EventType.ROOM_AVATAR, {"url": old_room_avatar.url} ) self.log.info(f"Successfully copied room avatar to new room {new_room_id}") except Exception as e: self.log.error(f"Failed to copy room avatar to new room: {e}") #await evt.respond(f"Failed to copy room avatar to new room: {e}") # Set the room topic in the new room try: await self.client.send_state_event( new_room_id, EventType.ROOM_TOPIC, {"topic": room_topic} ) self.log.info(f"Successfully copied room topic to new room {new_room_id}") except Exception as e: self.log.error(f"Failed to copy room topic to new room: {e}") #await evt.respond(f"Failed to copy room topic to new room: {e}") # Archive the old room with a pointer to the new room success = await self.do_archive_room(room_id, evt, new_room_id) if not success: await evt.respond("Failed to archive old room, but new room has been created") @community.subcommand("guests", help="generate a list of members in a room who are not members of the parent space") @command.argument("room", required=False) async def get_guestlist(self, evt: MessageEvent, room: str) -> None: space_members_obj = await self.client.get_joined_members(self.config["parent_room"]) space_members_list = space_members_obj.keys() room_id = None if room: if room.startswith('#'): try: thatroom_id = await self.client.resolve_room_alias(room) room_id = thatroom_id["room_id"] except: evt.reply("i don't recognize that room, sorry") return else: room_id = room else: room_id = evt.room_id room_members_obj = await self.client.get_joined_members(room_id) room_members_list = room_members_obj.keys() # find the non-space members in the room member list try: guest_list = set(room_members_list) - set(space_members_list) if len(guest_list) == 0: guest_list = ["None"] await evt.reply(f"Guests in this room are:
\ {'
'.join(guest_list)}", allow_html=True) except Exception as e: await evt.respond(f"something went wrong: {e}") @community.subcommand("roomid", help="return the matrix room ID of this, or a given, room") @command.argument("room", required=False) async def get_roomid(self, evt: MessageEvent, room: str) -> None: room_id = None if room: if room.startswith('#'): try: thatroom_id = await self.client.resolve_room_alias(room) room_id = thatroom_id["room_id"] except: evt.reply("i don't recognize that room, sorry") return else: room_id = room else: room_id = evt.room_id try: await evt.reply(f"Room ID is: {room_id}") except Exception as e: await evt.respond(f"something went wrong: {e}") @community.subcommand("setpower", help="set power levels according to the community configuration") async def set_powerlevels(self, evt: MessageEvent,) -> None: await evt.mark_read() if not await self.user_permitted(evt.sender, min_level=100): await evt.reply("You don't have permission to use this command") return msg = await evt.respond("truing up power levels, this could take a minute...") admins = self.config['admins'] moderators = self.config['moderators'] roomlist = await self.get_space_roomlist() # don't forget to include the space itself roomlist.append(self.config["parent_room"]) success_list = [] error_list = [] adminpl = 100 modpl = 50 defaultpl = 0 for room in roomlist: # need to get and evaluate the current state that contains powerlevels first current_pl = await self.client.get_state_event(room, 'm.room.power_levels') users = current_pl['users'].serialize() updated_user_map = dict(users) try: roomname = None roomnamestate = await self.client.get_state_event(room, 'm.room.name') roomname = roomnamestate['name'] except Exception as e: self.log.warning(e) # update our powerlevel map values for user in admins: updated_user_map[user] = adminpl for user in moderators: updated_user_map[user] = modpl # revoke values for people no longer in the config for user in users.keys(): if ( user not in admins and user not in moderators and updated_user_map[user] > defaultpl and user != self.client.mxid ): del updated_user_map[user] # and send the new state event back to the room new_pl = current_pl new_pl['users'] = updated_user_map try: #self.log.debug(f"DEBUG sending finalized PL map to room {room}: {updated_user_map}") await self.client.send_state_event(room, 'm.room.power_levels', new_pl) success_list.append(roomname or room) except Exception as e: self.log.warning(e) error_list.append(roomname or room) time.sleep(self.config['sleep']) results = "the following rooms were updated:

{success_list}

the following errors were \ recorded:

{error_list}

".format(success_list=success_list, error_list=error_list) await evt.respond(results, allow_html=True, edits=msg) # sync our database after we've made changes to room memberships await self.do_sync() @classmethod def get_db_upgrade_table(cls) -> None: return upgrade_table @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config