to manage users. Available subcommands: bancheck, ban, unban, kick, ignore, unignore, redact"
)
@user.subcommand("bancheck", help="check subscribed banlists for a user's mxid")
@command.argument("mxid", "full matrix ID", required=True)
async def user_bancheck(self, evt: MessageEvent, mxid: UserID) -> None:
if not await self.check_parent_room(evt):
return
ban_status = await self.check_if_banned(mxid)
await evt.reply(f"user on banlist: {ban_status}")
@user.subcommand(
"ban", help="kick and ban a specific user from the community and all rooms"
)
@command.argument("mxid", "full matrix ID", required=True)
@decorators.require_parent_room
@decorators.require_permission()
async def user_ban(self, evt: MessageEvent, mxid: UserID) -> None:
await evt.mark_read()
user = mxid
msg = await evt.respond("starting the ban...")
results_map = await self.ban_this_user(user, all_rooms=True)
results = "the following users were kicked and banned:{ban_list}
the following errors were \
recorded:{error_list}
".format(
ban_list=results_map["ban_list"], error_list=results_map["error_list"]
)
await evt.respond(results, allow_html=True, edits=msg)
# sync our database after we've made changes to room memberships
await self.do_sync()
@user.subcommand(
"unban", help="unban a specific user from the community and all rooms"
)
@command.argument("mxid", "full matrix ID", required=True)
@decorators.require_parent_room
@decorators.require_permission()
async def user_unban(self, evt: MessageEvent, mxid: UserID) -> None:
await evt.mark_read()
user = mxid
msg = await evt.respond("starting the unban...")
roomlist = await self.get_space_roomlist()
# don't forget to kick from the space itself
roomlist.append(self.config["parent_room"])
unban_list = {}
error_list = {}
unban_list[user] = []
for room in roomlist:
try:
roomname = None
roomnamestate = await self.client.get_state_event(room, "m.room.name")
if roomnamestate:
roomname = roomnamestate.name
else:
roomname = room
await self.client.unban_user(room, user)
unban_list[user].append(roomname)
except Exception as e:
error_list[room] = str(e)
results = "the following users were unbanned:{unban_list}
the following errors were \
recorded:{error_list}
".format(
unban_list=unban_list, error_list=error_list
)
await evt.respond(results, allow_html=True, edits=msg)
# sync our database after we've made changes to room memberships
await self.do_sync()
@user.subcommand(
"ignore", help="exclude a specific matrix ID from inactivity tracking"
)
@command.argument("mxid", "full matrix ID", required=True)
@decorators.require_parent_room
@decorators.require_permission()
@decorators.handle_errors("Failed to ignore user")
async def user_ignore(self, evt: MessageEvent, mxid: UserID) -> None:
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
Client.parse_user_id(mxid)
await self.database.execute(
"UPDATE user_events SET ignore_inactivity = 1 WHERE \
mxid = $1",
mxid,
)
self.log.info(f"{mxid} set to ignore inactivity")
await evt.react("✅")
@user.subcommand(
"unignore", help="re-enable activity tracking for a specific matrix ID"
)
@command.argument("mxid", "full matrix ID", required=True)
@decorators.require_parent_room
@decorators.require_permission()
@decorators.handle_errors("Failed to unignore user")
async def user_unignore(self, evt: MessageEvent, mxid: UserID) -> None:
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
Client.parse_user_id(mxid)
await self.database.execute(
"UPDATE user_events SET ignore_inactivity = 0 WHERE \
mxid = $1",
mxid,
)
self.log.info(f"{mxid} set to track inactivity")
await evt.react("✅")
@user.subcommand(
"redact",
help="redact messages from a specific user (optionally in a specific room)",
)
@command.argument("mxid", "full matrix ID", required=True)
@command.argument("room", "room ID", required=False)
@decorators.require_parent_room
@decorators.require_permission()
async def user_redact(self, evt: MessageEvent, mxid: UserID, room: str) -> None:
await evt.mark_read()
if room:
if room.startswith("#"):
try:
room_id = await self.client.resolve_room_alias(room)
room_id = room_id["room_id"]
except:
evt.reply("i couldn't resolve that alias, sorry")
return
else:
room_id = room
else:
room_id = evt.room_id
# get list of messages to redact in this room
messages = await self.get_messages_to_redact(room_id, mxid)
for msg in messages:
await self.database.execute(
"INSERT INTO redaction_tasks (event_id, room_id) VALUES ($1, $2)",
msg.event_id,
room_id,
)
await evt.respond(f"Queued {len(messages)} messages for redaction in {room_id}")
@community.subcommand(
"sync",
help="update the activity tracker with the current space members \
in case they are missing",
)
@decorators.require_parent_room
@decorators.require_permission()
async def sync_space_members(self, evt: MessageEvent) -> None:
# Power level sync is now handled through parent room inheritance
# Users should set power levels directly in the parent room
if not self.config["track_users"]:
await evt.respond("user tracking is disabled")
return
results = await self.do_sync()
added_str = "
".join(results["added"])
dropped_str = "
".join(results["dropped"])
await evt.respond(
f"Added: {added_str}
Dropped: {dropped_str}", allow_html=True
)
@community.subcommand(
"report", help="generate reports of user activity and inactivity"
)
@decorators.require_parent_room
@decorators.require_permission()
async def report(self, evt: MessageEvent) -> None:
"""Main report command - shows full report by default"""
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
sync_results = await self.do_sync()
report = await self.generate_report()
await evt.respond(
f"Users inactive for between {self.config['warn_threshold_days']} and \
{self.config['kick_threshold_days']} days:
\
{'
'.join(report['warn_inactive'])}
\
Users inactive for at least {self.config['kick_threshold_days']} days:
\
{'
'.join(report['kick_inactive'])}
\
Ignored users:
\
{'
'.join(report['ignored'])}
",
allow_html=True,
)
@report.subcommand("all", help="generate a full report of all user activity status")
@decorators.require_parent_room
@decorators.require_permission()
async def report_all(self, evt: MessageEvent) -> None:
"""Report all user activity status - same as main report command"""
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
sync_results = await self.do_sync()
report = await self.generate_report()
await evt.respond(
f"Users inactive for between {self.config['warn_threshold_days']} and \
{self.config['kick_threshold_days']} days:
\
{'
'.join(report['warn_inactive'])}
\
Users inactive for at least {self.config['kick_threshold_days']} days:
\
{'
'.join(report['kick_inactive'])}
\
Ignored users:
\
{'
'.join(report['ignored'])}
",
allow_html=True,
)
@report.subcommand(
"inactive", help="generate a list of users who have been inactive"
)
@decorators.require_parent_room
@decorators.require_permission()
async def report_inactive(self, evt: MessageEvent) -> None:
"""Report users who are inactive but not yet at kick threshold"""
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
sync_results = await self.do_sync()
report = await self.generate_report()
await evt.respond(
f"Users inactive for between {self.config['warn_threshold_days']} and \
{self.config['kick_threshold_days']} days:
\
{'
'.join(report['warn_inactive'])}
",
allow_html=True,
)
@report.subcommand(
"purgable",
help="generate a list of users that would be kicked with the purge command",
)
@decorators.require_parent_room
@decorators.require_permission()
async def report_purgable(self, evt: MessageEvent) -> None:
"""Report users who are inactive long enough to be purged"""
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
sync_results = await self.do_sync()
report = await self.generate_report()
await evt.respond(
f"Users inactive for at least {self.config['kick_threshold_days']} days:
\
{'
'.join(report['kick_inactive'])}
",
allow_html=True,
)
@report.subcommand(
"ignored", help="generate a list of users that have activity tracking disabled"
)
@decorators.require_parent_room
@decorators.require_permission()
async def report_ignored(self, evt: MessageEvent) -> None:
"""Report users who are ignored for activity tracking"""
if not self.config_manager.is_tracking_enabled():
await evt.reply("user tracking is disabled")
return
sync_results = await self.do_sync()
report = await self.generate_report()
await evt.respond(
f"Ignored users:
\
{'
'.join(report['ignored'])}
",
allow_html=True,
)
@community.subcommand("purge", help="kick users for excessive inactivity")
@decorators.require_parent_room
@decorators.require_permission()
async def kick_users(self, evt: MessageEvent) -> None:
await evt.mark_read()
msg = await evt.respond("starting the purge...")
report = await self.generate_report()
purgeable = report["kick_inactive"]
roomlist = await self.get_space_roomlist()
# don't forget to kick from the space itself
roomlist.append(self.config["parent_room"])
purge_list = {}
error_list = {}
for user in purgeable:
purge_list[user] = []
for room in roomlist:
try:
roomname = None
roomnamestate = await self.client.get_state_event(
room, "m.room.name"
)
roomname = roomnamestate["name"]
await self.client.get_state_event(room, EventType.ROOM_MEMBER, user)
await self.client.kick_user(room, user, reason="inactivity")
if roomname:
purge_list[user].append(roomname)
else:
purge_list[user].append(room)
await asyncio.sleep(self.config["sleep"])
except MNotFound:
pass
except Exception as e:
self.log.warning(e)
error_list[user] = []
error_list[user].append(roomname or room)
results = "the following users were purged:{purge_list}
the following errors were \
recorded:{error_list}
".format(
purge_list=purge_list, error_list=error_list
)
await evt.respond(results, allow_html=True, edits=msg)
# sync our database after we've made changes to room memberships
await self.do_sync()
@user.subcommand(
"kick", help="kick a specific user from the community and all rooms"
)
@command.argument("mxid", "full matrix ID", required=True)
@decorators.require_parent_room
@decorators.require_permission()
async def user_kick(self, evt: MessageEvent, mxid: UserID) -> None:
await evt.mark_read()
user = mxid
msg = await evt.respond("starting the kick...")
roomlist = await self.get_space_roomlist()
# don't forget to kick from the space itself
roomlist.append(self.config["parent_room"])
kick_list = {}
error_list = {}
kick_list[user] = []
for room in roomlist:
try:
roomname = None
roomnamestate = await self.client.get_state_event(room, "m.room.name")
roomname = roomnamestate["name"]
await self.client.get_state_event(room, EventType.ROOM_MEMBER, user)
await self.client.kick_user(room, user, reason="kicked")
if roomname:
kick_list[user].append(roomname)
else:
kick_list[user].append(room)
time.sleep(self.config["sleep"])
except MNotFound:
pass
except Exception as e:
self.log.warning(e)
error_list[user] = []
error_list[user].append(roomname or room)
results = "the following users were kicked:{kick_list}
the following errors were \
recorded:{error_list}
".format(
kick_list=kick_list, error_list=error_list
)
await evt.respond(results, allow_html=True, edits=msg)
# sync our database after we've made changes to room memberships
await self.do_sync()
async def create_room(
self,
roomname: str,
evt: MessageEvent = None,
power_level_override: Optional[PowerLevelStateEventContent] = None,
creation_content: Optional[dict] = None,
invitees: Optional[list[str]] = None,
) -> tuple[str, str] | None:
"""Create a new room and add it to the parent space.
Args:
roomname: The name for the new room
evt: Optional MessageEvent for progress updates. If provided, will send status messages.
power_level_override: Optional power levels to use. If not provided, will try to get from parent room.
creation_content: Optional creation content to use when creating the room.
invitees: Optional list of users to invite. If not provided, uses config invitees.
Returns:
tuple: (room_id, room_alias) if successful, None if failed
"""
mymsg = None
try:
# Validate and process room creation parameters
(
sanitized_name,
force_encryption,
force_unencryption,
error_msg,
cleaned_roomname,
) = await room_creation_utils.validate_room_creation_params(
roomname, self.config, evt
)
if error_msg:
self.log.error(error_msg)
if evt:
await evt.respond(error_msg)
return None
# Prepare room creation data
alias_localpart, server, room_invitees, parent_room = (
await room_creation_utils.prepare_room_creation_data(
sanitized_name, self.config, self.client, invitees
)
)
# Validate that the alias is available
is_available = await self.validate_room_alias(alias_localpart, server)
if not is_available:
error_msg = f"Room alias #{alias_localpart}:{server} already exists. Cannot create room."
self.log.error(error_msg)
if evt:
await evt.respond(error_msg)
return None
# Prepare power levels
try:
power_levels = await room_creation_utils.prepare_power_levels(
self.client, self.config, parent_room, power_level_override
)
self.log.info(f"Power levels prepared successfully: {power_levels}")
except Exception as e:
self.log.error(f"Failed to prepare power levels: {e}")
raise
# Adjust power levels for modern rooms
power_levels = room_creation_utils.adjust_power_levels_for_modern_rooms(
power_levels, self.config["room_version"]
)
if (
self.is_modern_room_version(self.config["room_version"])
and power_levels
):
self.log.info(
f"Modern room version {self.config['room_version']} detected - removing bot from power levels"
)
if power_levels.users:
power_levels.users.pop(self.client.mxid, None)
if evt:
mymsg = await evt.respond(
f"creating {alias_localpart} with room version {self.config['room_version']}, give me a minute..."
)
# Prepare initial state events
initial_state = room_creation_utils.prepare_initial_state(
self.config,
parent_room,
server,
force_encryption,
force_unencryption,
creation_content,
)
# Create the room
self.log.info(
f"Creating room with room_version={self.config['room_version']}"
)
if power_levels:
self.log.info(
f"Power level override users: {list(power_levels.users.keys()) if power_levels.users else 'None'}"
)
else:
self.log.info("No power level override")
try:
room_id = await self.client.create_room(
alias_localpart=alias_localpart,
name=cleaned_roomname,
invitees=room_invitees,
initial_state=initial_state,
power_level_override=power_levels,
creation_content=creation_content,
room_version=self.config["room_version"],
)
self.log.info(f"Room created successfully: {room_id}")
except Exception as e:
self.log.error(f"Failed to create room via Matrix API: {e}")
raise
# Verify room creation
await room_creation_utils.verify_room_creation(
self.client, room_id, self.config["room_version"], self.log
)
# Add room to space
await room_creation_utils.add_room_to_space(
self.client, parent_room, room_id, server, self.config["sleep"]
)
if evt:
await evt.respond(
f"#{alias_localpart}:{server} has been created and added to the space.",
edits=mymsg,
allow_html=True,
)
return room_id, f"#{alias_localpart}:{server}"
except Exception as e:
error_msg = f"Failed to create room: {e}"
self.log.error(error_msg)
if evt and mymsg:
await evt.respond(error_msg, edits=mymsg)
elif evt:
await evt.respond(error_msg)
return None
@community.subcommand("room", help="manage rooms in the community")
@decorators.require_parent_room
@decorators.require_permission()
async def room(self, evt: MessageEvent) -> None:
"""Main room command - shows usage by default"""
await evt.reply(
"Use !community room to manage rooms. Available subcommands: create, archive, replace, guests, id, version, setpower, enable-verification"
)
@room.subcommand(
"create",
help="create a new room titled and add it to the parent space. \
optionally include `--encrypted` or `--unencrypted` to force regardless of the default settings.",
)
@command.argument("roomname", pass_raw=True, required=True)
@decorators.require_parent_room
@decorators.require_permission()
async def room_create(self, evt: MessageEvent, roomname: str) -> None:
if (roomname == "help") or len(roomname) == 0:
await evt.reply(
'pass me a room name (like "cool topic") and i will create it and add it to the space. \
use `--encrypted` or `--unencrypted` to ensure encryption is enabled/disabled at creation time even if that isnt my default \
setting.'
)
return
# Check if community slug is configured
if not self.config["community_slug"]:
await evt.reply(
"No community slug configured. Please run initialize command first."
)
return
# Validate the room alias before creating
is_valid, conflicting_aliases = await self.validate_room_aliases(
[roomname], evt
)
if not is_valid:
await evt.reply(
f"Cannot create room: {conflicting_aliases[0]} already exists."
)
return
result = await self.create_room(roomname, evt)
if not result:
return # Error already logged and reported to user by create_room
@room.subcommand("archive", help="archive a room")
@command.argument("room", required=False)
@decorators.require_parent_room
@decorators.require_permission()
async def room_archive(self, evt: MessageEvent, room: str) -> None:
await evt.mark_read()
if not room:
room_id = evt.room_id
self.log.debug(f"DEBUG room we are archiving is {room_id}")
elif room and room.startswith("#"):
try:
self.log.debug(f"DEBUG trying to resolve alias {room}")
room_id = await self.client.resolve_room_alias(room)
room_id = room_id["room_id"]
self.log.debug(f"DEBUG room we are archiving is {room_id}")
except Exception as e:
await evt.reply("i couldn't resolve that alias, sorry")
self.log.error(f"error resolving alias {room}: {e}")
return
elif room and room.startswith("!"):
room_id = room
self.log.debug(f"DEBUG room we are archiving is {room_id}")
else:
await evt.reply("i don't recognize that room, sorry")
return
success = await self.do_archive_room(room_id, evt)
# Only try to respond if we're not archiving the room we're in
if success and room_id != evt.room_id:
await evt.respond("Room has been archived.")
@room.subcommand("replace", help="replace a room with a new one")
@command.argument("room", required=False)
@decorators.require_parent_room
@decorators.require_permission(min_level=100)
async def room_replace(self, evt: MessageEvent, room: str) -> None:
self.log.info(f"=== REPLACEROOM COMMAND STARTED ===")
self.log.info(f"Command arguments: room='{room}', evt.room_id='{evt.room_id}'")
await evt.mark_read()
if not room:
room = evt.room_id
# first we need to get relevant room state of the room we want to replace
# this includes the room name, alias, and join rules
if room.startswith("#"):
room_id = await self.client.resolve_room_alias(room)
room_id = room_id["room_id"]
self.log.info(f"Resolved alias '{room}' to room ID: {room_id}")
else:
room_id = room
self.log.info(f"Using direct room ID: {room_id}")
# Check bot permissions in the old room
self.log.info(f"=== CHECKING BOT PERMISSIONS ===")
has_perms, error_msg, _ = await self.check_bot_permissions(
room_id, evt, ["state", "tombstone", "power_levels"]
)
self.log.info(
f"Bot permissions check result: has_perms={has_perms}, error_msg='{error_msg}'"
)
if not has_perms:
await evt.respond(f"Cannot replace room: {error_msg}")
self.log.info("Bot permissions check failed, returning")
return
# Get the room name from the state event
room_name = None
try:
room_name_event = await self.client.get_state_event(
room_id, EventType.ROOM_NAME
)
room_name = room_name_event.name
self.log.info(f"Retrieved room name: '{room_name}'")
except Exception as e:
self.log.warning(f"Failed to get room name: {e}")
# room_name remains None
# get the room topic from the state event
room_topic = None
try:
room_topic_event = await self.client.get_state_event(
room_id, EventType.ROOM_TOPIC
)
room_topic = room_topic_event.topic
except Exception as e:
self.log.warning(f"Failed to get room topic: {e}")
# room_topic remains None
# Check if the room being replaced is a space
is_space = False
self.log.info(f"=== ABOUT TO START SPACE DETECTION ===")
self.log.info(f"=== SPACE DETECTION DEBUG START ===")
self.log.info(f"Room ID being checked: {room_id}")
self.log.info(f"EventType module: {EventType}")
self.log.info(
f"EventType.ROOM_CREATE exists: {hasattr(EventType, 'ROOM_CREATE')}"
)
if hasattr(EventType, "ROOM_CREATE"):
self.log.info(
f"EventType.ROOM_CREATE value: {getattr(EventType, 'ROOM_CREATE')}"
)
else:
self.log.warning("EventType.ROOM_CREATE does not exist!")
try:
# Get the room creation event to check if it's a space
state_events = await self.client.get_state(room_id)
self.log.info(
f"Retrieved {len(state_events)} state events for space detection"
)
# Log all event types for debugging
event_types = [event.type for event in state_events]
self.log.info(f"Event types found: {event_types}")
# Debug EventType.ROOM_CREATE constant
self.log.info(f"EventType.ROOM_CREATE value: {EventType.ROOM_CREATE}")
self.log.info(f"EventType.ROOM_CREATE type: {type(EventType.ROOM_CREATE)}")
# Also try string comparison as fallback
room_create_string = "m.room.create"
self.log.info(f"String comparison value: {room_create_string}")
# Try to find the room creation event using multiple methods
room_create_event = None
for i, event in enumerate(state_events):
self.log.info(
f"Event {i}: type={event.type} (type: {type(event.type)})"
)
# Try multiple comparison methods
if (
hasattr(EventType, "ROOM_CREATE")
and event.type == EventType.ROOM_CREATE
):
self.log.info(f"✓ Matched EventType.ROOM_CREATE")
room_create_event = event
break
elif str(event.type) == room_create_string:
self.log.info(f"✓ Matched string comparison 'm.room.create'")
room_create_event = event
break
elif event.type == "m.room.create":
self.log.info(f"✓ Matched direct string comparison")
room_create_event = event
break
else:
self.log.info(f"✗ No match for event {i}")
# Now process the room creation event if found
if room_create_event:
space_type = room_create_event.content.get("type")
self.log.info(f"Found ROOM_CREATE event with type: {space_type}")
self.log.info(f"Full ROOM_CREATE content: {room_create_event.content}")
is_space = space_type == "m.space"
self.log.info(f"Space detection result: {is_space}")
else:
self.log.warning("No ROOM_CREATE event found using any method")
if is_space:
self.log.info(
f"✓ FINAL RESULT: Room {room_id} IS a space - will create new space"
)
else:
self.log.info(
f"✗ FINAL RESULT: Room {room_id} is NOT a space - will create regular room"
)
except Exception as e:
self.log.error(f"❌ ERROR during space detection: {e}")
import traceback
self.log.error(f"Traceback: {traceback.format_exc()}")
# Assume it's not a space if we can't determine
is_space = False
self.log.info(f"=== SPACE DETECTION DEBUG END - is_space={is_space} ===")
# Get list of aliases to transfer while removing them from the old room
aliases_to_transfer = await self.remove_room_aliases(room_id, evt)
# Check if community slug is configured
if not self.config["community_slug"]:
await evt.respond(
"No community slug configured. Please run initialize command first."
)
return
# Inform user about what type of room is being replaced
if not room_name:
room_name = f"Room {room_id[:8]}..." # Fallback name
self.log.warning(f"Using fallback room name: {room_name}")
self.log.info(
f"Final decision - is_space: {is_space}, room_name: '{room_name}'"
)
self.log.info(f"About to send user message - is_space: {is_space}")
if is_space:
await evt.respond(f"Replacing space '{room_name}' with a new space...")
self.log.info(f"✓ Sent 'Replacing space' message to user")
else:
await evt.respond(f"Replacing room '{room_name}' with a new room...")
self.log.info(f"✗ Sent 'Replacing room' message to user")
# Validate that the new room alias is available
is_valid, conflicting_aliases = await self.validate_room_aliases(
[room_name], evt
)
if not is_valid:
await evt.respond(
f"Cannot replace room: {conflicting_aliases[0]} already exists."
)
return
# Now we can start the process of replacing the room
# First we need to create the new room. this will create the initial alias,
# as well as bot defaults such as power levels, initial invitations, encryption,
# and space membership
if is_space:
# Create a new space instead of a regular room
# For spaces, we need to pass power_level_override to ensure proper creation
# Get power levels from the old space to use as a template
try:
old_power_levels = await self.client.get_state_event(
room_id, EventType.ROOM_POWER_LEVELS
)
self.log.info(
f"Using user power levels from old space for new space creation"
)
# Create new power levels with server defaults, not copying all permissions from old space
power_levels = PowerLevelStateEventContent()
# Copy only user power levels from old space, not the entire permission set
if old_power_levels.users:
user_power_levels = old_power_levels.users.copy()
# Ensure bot has highest power
user_power_levels[self.client.mxid] = 1000
power_levels.users = user_power_levels
else:
power_levels.users = {
self.client.mxid: 1000, # Bot gets highest power
}
# Set explicit config values
power_levels.invite = self.config["invite_power_level"]
# For other permissions, let the server use its defaults instead of copying from old space
# This prevents issues like only admins being able to post messages
self.log.info(
f"Using user power levels from old space but server defaults for other permissions"
)
power_level_override = power_levels
# remove the bot's explicit power level for modern room versions
# since creators have unlimited power in modern rooms
if self.is_modern_room_version(self.config["room_version"]):
if power_level_override.users:
power_level_override.users.pop(self.client.mxid, None)
self.log.info(f"Removed bot since they are creator")
except Exception as e:
self.log.warning(
f"Could not get power levels from old space, using defaults: {e}"
)
power_level_override = None
self.log.info(
f"Calling create_space with room_name='{room_name}', power_level_override={power_level_override is not None}"
)
new_room_id, new_room_alias = await self.create_space(
room_name, evt, power_level_override
)
self.log.info(
f"create_space returned: room_id={new_room_id}, alias={new_room_alias}"
)
else:
# Create a regular room
self.log.info(f"Calling create_room with room_name='{room_name}'")
new_room_id, new_room_alias = await self.create_room(room_name, evt)
self.log.info(
f"create_room returned: room_id={new_room_id}, alias={new_room_alias}"
)
if not new_room_id:
await evt.respond("Failed to create new room")
return
# Ensure the new space is NOT added to the old space as a child room
if is_space:
try:
# Check if the old space has any m.space.parent events pointing to it
# and ensure the new space doesn't get added as a child
old_space_parent_events = []
state_events = await self.client.get_state(room_id)
for event in state_events:
if event.type == EventType.SPACE_PARENT:
old_space_parent_events.append(event.state_key)
if old_space_parent_events:
self.log.info(
f"Old space has {len(old_space_parent_events)} parent space references - ensuring new space is not added as child"
)
await evt.respond(
f"Note: Old space has {len(old_space_parent_events)} parent space references - new space will be independent"
)
# Also check if the old space is a child of the community parent space
# and ensure the new space doesn't automatically inherit that relationship
if room_id == self.config.get("parent_room"):
self.log.info(
"Old space is the community parent space - new space will be independent"
)
await evt.respond(
"Note: Old space is the community parent space - new space will be independent and may need manual configuration"
)
except Exception as e:
self.log.warning(f"Could not check old space parent references: {e}")
# Check bot permissions in the new room
has_perms, error_msg, _ = await self.check_bot_permissions(
new_room_id, evt, ["state", "tombstone", "power_levels"]
)
if not has_perms:
await evt.respond(
f"Created new room but cannot complete replacement: {error_msg}"
)
return
# Transfer the aliases to the new room/space
if aliases_to_transfer:
await evt.respond(
f"Transferring {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}..."
)
for alias in aliases_to_transfer:
localpart = alias.split(":")[0][1:] # Remove # and get localpart
server = alias.split(":")[1]
try:
await self.client.add_room_alias(new_room_id, localpart)
self.log.info(
f"Successfully transferred alias {alias} to new {'space' if is_space else 'room'} {new_room_id}"
)
except Exception as e:
# If transfer failed, try to create a modified alias
modified_alias = f"{localpart}NEW"
try:
await self.client.add_room_alias(new_room_id, modified_alias)
self.log.info(
f"Successfully transferred modified alias {modified_alias} to new {'space' if is_space else 'room'} {new_room_id}"
)
except Exception as e2:
self.log.error(
f"Failed to transfer modified alias {modified_alias}: {e2}"
)
await evt.respond(
f"Successfully transferred {len(aliases_to_transfer)} aliases to new {'space' if is_space else 'room'}"
)
else:
await evt.respond("No aliases to transfer")
# Get the room avatar from the old room/space
try:
old_room_avatar = await self.client.get_state_event(
room_id, EventType.ROOM_AVATAR
)
if old_room_avatar and old_room_avatar.url:
# Set the same avatar in the new room/space
await self.client.send_state_event(
new_room_id, EventType.ROOM_AVATAR, {"url": old_room_avatar.url}
)
self.log.info(
f"Successfully copied {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'} {new_room_id}"
)
await evt.respond(
f"Copied avatar to new {'space' if is_space else 'room'}"
)
except Exception as e:
self.log.error(
f"Failed to copy {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'}: {e}"
)
# await evt.respond(f"Failed to copy {'space' if is_space else 'room'} avatar to new {'space' if is_space else 'room'}: {e}")
# Set the room topic in the new room/space
if room_topic:
try:
await self.client.send_state_event(
new_room_id, EventType.ROOM_TOPIC, {"topic": room_topic}
)
self.log.info(
f"Successfully copied {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'} {new_room_id}"
)
await evt.respond(
f"Copied topic to new {'space' if is_space else 'room'}"
)
except Exception as e:
self.log.error(
f"Failed to copy {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'}: {e}"
)
# await evt.respond(f"Failed to copy {'space' if is_space else 'room'} topic to new {'space' if is_space else 'room'}: {e}")
else:
await evt.respond("No topic to copy")
# Archive the old room/space with a pointer to the new room/space
await evt.respond(f"Archiving old {'space' if is_space else 'room'}...")
success = await self.do_archive_room(room_id, evt, new_room_id)
if not success:
await evt.respond(
f"Failed to archive old {'space' if is_space else 'room'}, but new {'space' if is_space else 'room'} has been created"
)
else:
await evt.respond(
f"Successfully archived old {'space' if is_space else 'room'}"
)
# If we're replacing a space, we need to handle child room relationships
if is_space:
try:
# Get all child rooms from the old space
old_child_rooms = []
state_events = await self.client.get_state(room_id)
for event in state_events:
if event.type == EventType.SPACE_CHILD:
old_child_rooms.append(event.state_key)
if old_child_rooms:
self.log.info(
f"Found {len(old_child_rooms)} child rooms in old space"
)
await evt.respond(
f"Migrating {len(old_child_rooms)} child rooms from old space to new space..."
)
# Update child rooms to point to the new space
for child_room_id in old_child_rooms:
try:
# Remove old space parent reference
await self.client.send_state_event(
child_room_id,
EventType.SPACE_PARENT,
{}, # Empty content removes the state
state_key=room_id,
)
# Add new space parent reference
server = self.client.parse_user_id(self.client.mxid)[1]
await self.client.send_state_event(
child_room_id,
EventType.SPACE_PARENT,
{"via": [server], "canonical": True},
state_key=new_room_id,
)
# Update space child reference
await self.client.send_state_event(
new_room_id,
EventType.SPACE_CHILD,
{"via": [server], "suggested": False},
state_key=child_room_id,
)
self.log.info(
f"Updated child room {child_room_id} to point to new space"
)
await asyncio.sleep(self.config["sleep"])
except Exception as e:
self.log.error(
f"Failed to update child room {child_room_id}: {e}"
)
await evt.respond(
f"Successfully migrated {len(old_child_rooms)} child rooms to new space"
)
else:
await evt.respond("No child rooms found in old space")
except Exception as e:
self.log.error(f"Failed to handle child room relationships: {e}")
await evt.respond(
f"Warning: Failed to handle child room relationships: {e}"
)
# update instances of the old room id in any config values that use it
config_keys = [
"parent_room",
"notification_room",
"censor",
"check_if_human",
"banlists",
"greeting_rooms",
]
for key in config_keys:
value = self.config[key]
if isinstance(value, str):
if value == room_id:
self.config[key] = new_room_id
elif isinstance(value, list):
# Handle lists that might contain room IDs
if room_id in value:
self.config[key] = [
new_room_id if x == room_id else x for x in value
]
elif isinstance(value, dict):
# Handle dictionaries that might use room IDs as keys
if room_id in value:
self.config[key][new_room_id] = self.config[key].pop(room_id)
# Also check if any values in the dict are room IDs
for dict_key, dict_value in value.items():
if dict_value == room_id:
self.config[key][dict_key] = new_room_id
# Save the updated config
self.config.save()
# Final success message
if is_space:
await evt.respond(
f"✅ Space replacement completed successfully!\n"
f"New space: {new_room_alias}\n"
f"Old space has been archived with a pointer to the new space."
)
else:
await evt.respond(
f"✅ Room replacement completed successfully!\n"
f"New room: {new_room_alias}\n"
f"Old room has been archived with a pointer to the new room."
)
@room.subcommand(
"guests",
help="generate a list of members in a room who are not members of the parent space",
)
@command.argument("room", required=False)
@decorators.require_parent_room
@decorators.require_permission()
async def room_guests(self, evt: MessageEvent, room: str) -> None:
space_members_obj = await self.client.get_joined_members(
self.config["parent_room"]
)
space_members_list = space_members_obj.keys()
room_id = None
if room:
if room.startswith("#"):
try:
thatroom_id = await self.client.resolve_room_alias(room)
room_id = thatroom_id["room_id"]
except:
evt.reply("i don't recognize that room, sorry")
return
else:
room_id = room
else:
room_id = evt.room_id
room_members_obj = await self.client.get_joined_members(room_id)
room_members_list = room_members_obj.keys()
# find the non-space members in the room member list
try:
guest_list = set(room_members_list) - set(space_members_list)
if len(guest_list) == 0:
guest_list = ["None"]
await evt.reply(
f"Guests in this room are:
\
{'
'.join(guest_list)}",
allow_html=True,
)
except Exception as e:
await evt.respond(f"something went wrong: {e}")
@room.subcommand("id", help="return the matrix room ID of this, or a given, room")
@command.argument("room", required=False)
@decorators.require_parent_room
@decorators.require_permission()
async def room_id(self, evt: MessageEvent, room: str) -> None:
room_id = None
if room:
if room.startswith("#"):
try:
thatroom_id = await self.client.resolve_room_alias(room)
room_id = thatroom_id["room_id"]
except:
evt.reply("i don't recognize that room, sorry")
return
else:
room_id = room
else:
room_id = evt.room_id
try:
await evt.reply(f"Room ID is: {room_id}")
except Exception as e:
await evt.respond(f"something went wrong: {e}")
@room.subcommand(
"version", help="return the room version and creators of this, or a given, room"
)
@command.argument("room", required=False)
@decorators.require_parent_room
@decorators.require_permission()
async def room_version(self, evt: MessageEvent, room: str) -> None:
room_id = None
if room:
if room.startswith("#"):
try:
thatroom_id = await self.client.resolve_room_alias(room)
room_id = thatroom_id["room_id"]
except:
evt.reply("i don't recognize that room, sorry")
return
else:
room_id = room
else:
room_id = evt.room_id
try:
room_version, creators = await self.get_room_version_and_creators(room_id)
# Get room name if available
room_name = room_id
try:
room_name_event = await self.client.get_state_event(
room_id, EventType.ROOM_NAME
)
room_name = room_name_event.name
except:
pass
response = f"Room: {room_name}
"
response += f"Room ID: {room_id}
"
response += f"Room Version: {room_version}
"
if creators:
response += f"Creators: {', '.join(creators)}
"
if self.is_modern_room_version(room_version):
response += f"
ℹ️ Note: This room uses version {room_version}, which means creators have unlimited power and cannot be restricted by power levels."
else:
response += "Creators: None found
"
await evt.reply(response, allow_html=True)
except Exception as e:
await evt.respond(f"something went wrong: {e}")
@room.subcommand(
"setpower",
help="sync user power levels from parent room to all child rooms. this will override existing user power levels in child rooms!",
)
@command.argument("target_room", required=False)
@decorators.require_parent_room
@decorators.require_permission(min_level=100)
async def room_setpower(self, evt: MessageEvent, target_room: str = None) -> None:
await evt.mark_read()
if target_room:
if target_room.startswith("#"):
try:
resolved_alias = await self.client.resolve_room_alias(target_room)
roomlist = [resolved_alias.room_id]
except Exception as e:
await evt.respond(f"Fehler: Konnte Alias {target_room} nicht in eine ID auflösen: {e}")
return
elif target_room.startswith("!"):
roomlist = [target_room]
else:
await evt.respond("Fehler: Der Raum muss mit ! (ID) oder # (Alias) beginnen.")
return
target_msg = target_room
else:
roomlist = await self.get_space_roomlist()
target_msg = "space rooms"
if not roomlist:
await evt.respond("Fehler: Keine gültigen Räume zum Aktualisieren gefunden. Ist der Bot in den Zielräumen Mitglied?")
return
msg = await evt.respond(
f"Syncing power levels from parent room to {target_msg}..."
)
success_list = []
skipped_list = []
error_list = []
try:
# Get parent room power levels and version to use as source of truth
parent_power_levels = await self.client.get_state_event(
self.config["parent_room"], EventType.ROOM_POWER_LEVELS
)
parent_version, parent_creators = await self.get_room_version_and_creators(
self.config["parent_room"]
)
self.log.info(f"Parent room version: {parent_version}")
self.log.info(f"Parent room creators: {parent_creators}")
self.log.info(f"Bot MXID: {self.client.mxid}")
self.log.info(
f"Bot is creator in parent: {self.client.mxid in parent_creators}"
)
user_power_levels = parent_power_levels.users.copy()
# Handle bot's power level based on room versions and actual creator status
if self.is_modern_room_version(parent_version):
# In modern parent rooms, check if bot is actually a creator
if self.client.mxid in parent_creators:
# Bot is a creator, remove from power levels to prevent errors
user_power_levels.pop(self.client.mxid, None)
self.log.info(
f"Parent room is modern (v{parent_version}), bot is creator and has unlimited power"
)
else:
# Bot is not a creator, set appropriate power level
user_power_levels[self.client.mxid] = 1000
self.log.info(
f"Parent room is modern (v{parent_version}), bot is not creator, power level set to 1000"
)
else:
# In legacy parent rooms, keep the bot at its actual current PL (cannot self-promote)
bot_pl = parent_power_levels.users.get(
self.client.mxid,
getattr(parent_power_levels, "users_default", 0),
)
user_power_levels[self.client.mxid] = bot_pl
self.log.info(
f"Parent room is legacy (v{parent_version}), bot power level retained at {bot_pl}"
)
for room in roomlist:
try:
roomname = None
try:
roomnamestate = await self.client.get_state_event(
room, "m.room.name"
)
roomname = roomnamestate["name"]
except Exception as e:
self.log.warning(f"Could not get room name for {room}: {e}")
# Skip rooms that are protected by verification, unless its the only target room,
# in which case we have explicitly asked to set power levels in that room
if len(roomlist) > 1 and (
(
isinstance(self.config["check_if_human"], bool)
and self.config["check_if_human"]
)
or (
isinstance(self.config["check_if_human"], list)
and room in self.config["check_if_human"]
)
):
self.log.info(
f"Skipping {roomname or room} as it requires human verification. You can explicitly run this command for this room to override."
)
skipped_list.append(roomname or room)
continue
# Get the room's power levels object and version info
room_power_levels = await self.client.get_state_event(
room, EventType.ROOM_POWER_LEVELS
)
room_version, room_creators = (
await self.get_room_version_and_creators(room)
)
self.log.info(
f"Processing room {roomname or room} (v{room_version}) - Parent is v{parent_version}"
)
# Handle power level mapping based on room version differences
if self.is_modern_room_version(room_version):
# Target room is modern (v12+) - creators have unlimited power
self.log.info(
f"Target room {roomname or room} is modern - preserving creator power levels"
)
# Filter out any users who are creators in the target room
filtered_user_power_levels = {}
for user, level in user_power_levels.items():
if user not in room_creators:
filtered_user_power_levels[user] = level
else:
self.log.info(
f"Skipping power level for creator {user} in modern room {roomname or room}"
)
# Preserve existing power levels for special cases (like verification rooms)
# Only update non-creator users to avoid conflicts
existing_users = set(room_power_levels.users.keys())
creators_set = set(room_creators)
special_users = existing_users - creators_set
# Keep existing power levels for special users unless explicitly overridden
for user in special_users:
if user not in filtered_user_power_levels:
filtered_user_power_levels[user] = (
room_power_levels.users[user]
)
self.log.info(
f"Preserving existing power level for special user {user} in {roomname or room}"
)
# Handle bot power level in modern target room
if self.client.mxid in room_creators:
# Bot is creator in target room - don't set power level
self.log.info(
f"Bot is creator in modern target room {roomname or room} - no power level set"
)
else:
# Bot is not creator in target room - set appropriate power level
filtered_user_power_levels[self.client.mxid] = 1000
self.log.info(
f"Bot is not creator in modern target room {roomname or room} - power level set to 1000"
)
# Merge filtered power levels with existing room power levels
room_power_levels.users.update(filtered_user_power_levels)
elif self.is_modern_room_version(parent_version):
# Target room is legacy but parent is modern
# Map parent room "creators" to "admins" in legacy room
self.log.info(
f"Target room {roomname or room} is legacy, parent is modern - mapping creators to admins"
)
# For legacy rooms, we can set all power levels including the bot
# But map parent room creators to appropriate admin levels
mapped_power_levels = {}
for user, level in user_power_levels.items():
if user in parent_creators and user != self.client.mxid:
# Map parent creators to admin level (100) in legacy rooms
mapped_power_levels[user] = 100
self.log.info(
f"Mapping parent creator {user} to admin level 100 in legacy room {roomname or room}"
)
else:
mapped_power_levels[user] = level
# In a legacy target room, keep the bot at its actual current PL (cannot self-promote)
bot_pl = room_power_levels.users.get(
self.client.mxid,
getattr(room_power_levels, "users_default", 0),
)
mapped_power_levels[self.client.mxid] = bot_pl
self.log.info(
f"Target room is legacy, bot power level retained at {bot_pl} in {roomname or room}"
)
room_power_levels.users = mapped_power_levels
else:
# Both rooms are legacy - direct power level transfer
self.log.info(
f"Both rooms are legacy - direct power level transfer"
)
# Capture the bot's current PL in this child room before overwriting.
# This is always the value we use — the bot cannot self-promote above
# its current PL, and we must not demote it if it's higher than the parent.
child_bot_pl = room_power_levels.users.get(
self.client.mxid,
getattr(room_power_levels, "users_default", 0),
)
room_power_levels.users = user_power_levels.copy()
room_power_levels.users[self.client.mxid] = child_bot_pl
self.log.info(
f"Both rooms legacy: bot PL retained at {child_bot_pl} in {roomname or room}"
)
# Send the updated power levels to this room
await self.client.send_state_event(
room, EventType.ROOM_POWER_LEVELS, room_power_levels
)
success_list.append(roomname or room)
await asyncio.sleep(self.config["sleep"])
except Exception as e:
self.log.error(
f"Failed to update power levels in {roomname or room}: {e}"
)
error_list.append(roomname or room)
results = "Power levels synced from parent room.
"
results += f"Parent room version: {parent_version}
"
results += f"Parent room creators: {', '.join(parent_creators) if parent_creators else 'None'}
"
results += f"Bot creator status: {'✅ Creator' if self.client.mxid in parent_creators else '❌ Not creator'} in parent room
"
# Add explanation of power level mapping strategy
if self.is_modern_room_version(parent_version):
results += f"Mapping Strategy: Parent room is modern (v{parent_version}), creators have unlimited power.
"
if self.client.mxid in parent_creators:
results += "• Bot is creator in parent room (unlimited power)
"
else:
results += (
"• Bot is not creator in parent room (power level 1000)
"
)
results += "• Parent creators mapped to admin level (100) in legacy child rooms
"
results += "• Modern child rooms preserve their creator power levels
"
else:
results += f"Mapping Strategy: Parent room is legacy (v{parent_version}), using traditional power level system.
"
results += (
"• Bot power level retained at its current level in legacy rooms
"
)
results += "• Direct power level transfer to legacy child rooms
"
results += "• Modern child rooms preserve their creator power levels
"
if success_list:
results += f"Successfully updated rooms:
{', '.join(success_list)}
"
if skipped_list:
results += f"Skipped rooms due to verification settings:
{', '.join(skipped_list)}
"
if error_list:
results += (
f"Failed to update rooms:
{', '.join(error_list)}"
)
await evt.respond(results, allow_html=True, edits=msg)
except Exception as e:
error_msg = f"Failed to get parent room power levels: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
@room.subcommand(
"enable-verification",
help="migrate a room to a verification-based permission model, ensuring current members can still send messages while new joiners require verification",
)
@decorators.require_parent_room
@decorators.require_permission()
async def room_enable_verification(self, evt: MessageEvent) -> None:
"""Enable verification-based permissions for the current room"""
await evt.mark_read()
msg = await evt.respond("Starting room migration...")
try:
# Get current room members
members = await self.client.get_joined_members(evt.room_id)
member_list = list(members.keys())
# Get current power levels
power_levels = await self.client.get_state_event(
evt.room_id, EventType.ROOM_POWER_LEVELS
)
# Get the required power level for sending messages
events_default = power_levels.events_default
events = power_levels.events
required_level = events.get(str(EventType.ROOM_MESSAGE), events_default)
# Set default power level to n-1 (usually 0)
power_levels.users_default = required_level - 1
# Set members to required level only if their current level is lower
# and they don't have unlimited power (creators in modern room versions)
for member in member_list:
# Check if member has unlimited power
if await self.user_has_unlimited_power(member, evt.room_id):
continue # Skip creators with unlimited power
current_level = power_levels.get_user_level(member)
if current_level < required_level:
power_levels.users[member] = required_level
# Send updated power levels
await self.client.send_state_event(
evt.room_id, EventType.ROOM_POWER_LEVELS, power_levels
)
await evt.respond(
f"Room migration complete. Current members can send messages, new joiners will require verification.",
edits=msg,
)
except Exception as e:
error_msg = f"Failed to migrate room: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
async def store_verification_state(self, dm_room_id: str, state: dict) -> None:
"""Store verification state in the database."""
# Try to insert first, if it fails due to existing record, then update
try:
insert_query = """INSERT INTO verification_states
(dm_room_id, user_id, target_room_id, verification_phrase, attempts_remaining, \
required_power_level)
VALUES ($1, $2, $3, $4, $5, $6)"""
await self.database.execute(
insert_query,
dm_room_id,
state["user"],
state["target_room"],
state["phrase"],
state["attempts"],
state["required_level"],
)
self.log.debug(f"Inserted new verification state for {dm_room_id}")
except Exception as e:
# If insert fails (likely due to existing record), try update
if (
"UNIQUE constraint failed" in str(e)
or "duplicate key" in str(e).lower()
):
self.log.debug(f"Record exists for {dm_room_id}, updating instead")
update_query = """UPDATE verification_states
SET verification_phrase = $4, \
attempts_remaining = $5, \
required_power_level = $6, \
user_id = $2, \
target_room_id = $3 \
WHERE dm_room_id = $1"""
await self.database.execute(
update_query,
dm_room_id,
state["user"],
state["target_room"],
state["phrase"],
state["attempts"],
state["required_level"],
)
self.log.debug(f"Updated verification state for {dm_room_id}")
else:
# Re-raise if it's not a constraint violation
raise
async def get_verification_state(self, dm_room_id: str) -> Optional[dict]:
"""Retrieve verification state from the database."""
row = await self.database.fetchrow(
"SELECT * FROM verification_states WHERE dm_room_id = $1", dm_room_id
)
if not row:
return None
return {
"user": row["user_id"],
"target_room": row["target_room_id"],
"phrase": row["verification_phrase"],
"attempts": row["attempts_remaining"],
"required_level": row["required_power_level"],
}
async def delete_verification_state(self, dm_room_id: str) -> None:
"""Delete verification state from the database."""
await self.database.execute(
"DELETE FROM verification_states WHERE dm_room_id = $1", dm_room_id
)
async def cleanup_stale_verification_states(self) -> None:
"""Clean up verification states that are no longer valid."""
# Get all verification states
states = await self.database.fetch("SELECT * FROM verification_states")
for state in states:
try:
# Check if DM room still exists and bot is still in it
try:
await self.client.get_state_event(
state["dm_room_id"], EventType.ROOM_MEMBER, self.client.mxid
)
except Exception:
# Bot is not in the DM room anymore, state is stale
await self.delete_verification_state(state["dm_room_id"])
continue
# Check if user is still in the target room
try:
await self.client.get_state_event(
state["target_room_id"], EventType.ROOM_MEMBER, state["user_id"]
)
except Exception:
# User is not in the target room anymore, state is stale
await self.delete_verification_state(state["dm_room_id"])
continue
# Check if verification is too old (older than 24 hours)
if (datetime.now() - state["created_at"]).total_seconds() > 86400:
await self.delete_verification_state(state["dm_room_id"])
continue
except Exception as e:
self.log.error(
f"Error checking verification state {state['dm_room_id']}: {e}"
)
# If we can't check the state, assume it's stale
await self.delete_verification_state(state["dm_room_id"])
@classmethod
def get_db_upgrade_table(cls) -> None:
return upgrade_table
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
@community.subcommand(
"initialize",
help="initialize a new community space with the given name. this command can only be used if no parent room is configured.",
)
@command.argument("community_name", pass_raw=True, required=True)
async def initialize_community(
self, evt: MessageEvent, community_name: str
) -> None:
await evt.mark_read()
# Check if parent room is already configured
if self.config["parent_room"]:
await evt.reply(
"Cannot initialize: a parent room is already configured. Please remove the parent_room configuration first."
)
return
# Validate community name
if not community_name or community_name.isspace():
await evt.reply(
"Please provide a community name. Usage: !community initialize "
)
return
msg = await evt.respond("Initializing new community space...")
try:
# Generate community slug if not already set
if not self.config["community_slug"]:
community_slug = self.generate_community_slug(community_name)
self.config["community_slug"] = community_slug
self.log.info(f"Generated community slug: {community_slug}")
# Define child rooms that will be created during initialization (excluding the space itself)
child_rooms_to_create = [
f"{community_name} Moderators", # Moderators room
f"{community_name} Waiting Room", # Waiting room
]
# Validate child room aliases before creating any rooms
is_valid, conflicting_aliases = await self.validate_room_aliases(
child_rooms_to_create, evt
)
if not is_valid:
error_msg = (
f"Cannot initialize community: The following room aliases already exist:\n"
+ "\n".join(conflicting_aliases)
)
await evt.respond(error_msg, edits=msg)
return
# Add initiator to invitees list if not already there
if evt.sender not in self.config["invitees"]:
self.config["invitees"].append(evt.sender)
# Save the updated config
self.config.save()
# Create the space
server = self.client.parse_user_id(self.client.mxid)[1]
sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", community_name).lower()
# Set up power levels for the space
power_levels = PowerLevelStateEventContent()
# Set up power levels for users
# For modern room versions (12+), the bot (creator) has unlimited power by default
# but we still need to set power levels for other users
if self.is_modern_room_version(self.config.get("room_version", "1")):
# For modern rooms, don't set bot power level (it has unlimited power)
# but still set power levels for other users
power_levels.users = {evt.sender: 100} # Initiator gets admin power
else:
# For legacy rooms, set both bot and initiator power levels
power_levels.users = {
self.client.mxid: 1000, # Bot gets highest power
evt.sender: 100, # Initiator gets admin power
}
# Set invite power level from config
power_levels.invite = self.config.get("invite_power_level", 50)
# Create the space with appropriate metadata and power levels
space_id, space_alias = await self.create_space(
community_name, evt, power_level_override=power_levels
)
if not space_id:
await evt.respond("Failed to create space", edits=msg)
return
# Set the space as the parent room in config
self.config["parent_room"] = space_id
self.log.info(f"Set parent_room to: {space_id}")
# Save the updated config
self.config.save()
self.log.info("Config saved successfully")
# Verify the space exists and has correct power levels
try:
space_power_levels = await self.client.get_state_event(
space_id, EventType.ROOM_POWER_LEVELS
)
# For modern room versions, creators have unlimited power and don't appear in power levels
if self.is_modern_room_version(self.config.get("room_version", "1")):
# Just verify the space exists and has power levels
if not space_power_levels:
raise Exception("Space power levels not set correctly")
self.log.info("Space power levels verified for modern room version")
else:
# For legacy room versions, check that bot has admin power
if space_power_levels.users.get(self.client.mxid) != 1000:
raise Exception("Space power levels not set correctly")
self.log.info("Space power levels verified for legacy room version")
except Exception as e:
error_msg = f"Failed to verify space setup: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
return
# Create moderators room
# Include the initiator as a moderator, plus any other moderators from the space
moderators = [evt.sender] # Always include the initiator
# Also get any other moderators from the space
try:
space_moderators = await self.get_moderators_and_above()
if space_moderators:
# Add other moderators, excluding the bot and the initiator (already added)
for user in space_moderators:
if user != self.client.mxid and user != evt.sender:
moderators.append(user)
except Exception as e:
self.log.warning(f"Could not get additional moderators from space: {e}")
self.log.info(
f"Moderators room will be created with initial members: {moderators}"
)
room_result = await self.create_room(
f"{community_name} Moderators",
evt,
invitees=moderators, # Use moderators list instead of config invitees
)
if not room_result:
error_msg = "Failed to create moderators room"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
return
mod_room_id, mod_room_alias = room_result
# Set moderators room to invite-only
await self.client.send_state_event(
mod_room_id,
EventType.ROOM_JOIN_RULES,
JoinRulesStateEventContent(join_rule=JoinRule.INVITE),
)
# Create waiting room (force unencrypted for public access)
waiting_room_result = await self.create_room(
f"{community_name} Waiting Room --unencrypted",
evt,
creation_content={
"m.federate": True,
"m.room.history_visibility": "joined",
},
)
if not waiting_room_result:
error_msg = "Failed to create waiting room"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
return
waiting_room_id, waiting_room_alias = waiting_room_result
# Set waiting room to be joinable by anyone
await self.client.send_state_event(
waiting_room_id,
EventType.ROOM_JOIN_RULES,
JoinRulesStateEventContent(join_rule=JoinRule.PUBLIC),
)
# Update censor configuration based on current value
current_censor = self.config["censor"]
if current_censor is False:
# If censor is false, set it to a list with just the waiting room
self.config["censor"] = [waiting_room_id]
elif (
isinstance(current_censor, list)
and waiting_room_id not in current_censor
):
# If censor is already a list and waiting room isn't in it, append it
current_censor.append(waiting_room_id)
self.config["censor"] = current_censor
# If censor is True or waiting room is already in the list, leave it as is
# Save the updated config
self.config.save()
# Check if default encryption is enabled and add warning for waiting room
warning_msg = ""
if self.config.get("encrypt", False):
warning_msg = "
⚠️ **Note: Waiting room created without encryption (as it is a public room)**"
await evt.respond(
f"Community space initialized successfully!
"
f"Community Slug: {self.config['community_slug']}
"
f"Room Version: {self.config['room_version']}
"
f"Space: {space_alias}
"
f"Moderators Room: {mod_room_alias}
"
f"Waiting Room: {waiting_room_alias}{warning_msg}",
edits=msg,
allow_html=True,
)
except Exception as e:
error_msg = f"Failed to initialize community: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
@community.subcommand(
"doctor",
help="review bot permissions across the space and all rooms to identify potential issues",
)
@command.argument("room", required=False)
async def doctor_check(self, evt: MessageEvent, room: str = None) -> None:
if not await self.check_parent_room(evt):
return
if not await self.user_permitted(evt.sender):
await evt.reply("You don't have permission to use this command")
return
# If a room is specified, show detailed report for that room
if room:
await self._doctor_room_detail(evt, room)
return
msg = await evt.respond("Running diagnostic check...")
try:
report = {"space": {}, "rooms": {}, "issues": [], "warnings": []}
# Check parent space permissions
report["space"] = await diagnostic_utils.check_space_permissions(
self.client, self.config["parent_room"], self.log
)
if "error" in report["space"]:
report["issues"].append(
f"Failed to check parent space permissions: {report['space']['error']}"
)
elif report["space"].get("bot_power_level", 0) < 100:
report["issues"].append(
f"Bot lacks administrative privileges in parent space (level: {report['space']['bot_power_level']})"
)
# Check all rooms in the space
space_rooms = await self.get_space_roomlist()
for room_id in space_rooms:
room_data = await diagnostic_utils.check_room_permissions(
self.client, room_id, self.log
)
report["rooms"][room_id] = room_data
# Add issues for problematic rooms
if "error" in room_data:
if room_data["error"] == "Bot not in room":
report["issues"].append(
f"Bot is not a member of room '{room_id}' that is part of the space"
)
else:
report["issues"].append(
f"Failed to check room {room_id}: {room_data['error']}"
)
elif not room_data.get("has_admin", False):
report["issues"].append(
f"Bot lacks administrative privileges in room '{room_data.get('room_name', room_id)}' ({room_id}) - level: {room_data.get('bot_power_level', 0)}"
)
# Generate response using helper functions
response = "🔍 Bot Permission Diagnostic Summary
"
# Space summary - only show if there are issues
space_has_issues = (
"error" in report["space"]
or report["space"].get("bot_power_level", 0) < 100
or report["space"].get("users_higher")
or report["space"].get("users_equal")
)
if space_has_issues:
response += diagnostic_utils.generate_space_summary(report["space"])
# Rooms summary
room_summary, room_stats = diagnostic_utils.generate_room_summary(
report["rooms"], self.is_modern_room_version
)
response += room_summary
# Summary statistics
response += diagnostic_utils.generate_summary_stats(
report["space"], room_stats
)
# Issues and warnings
response += diagnostic_utils.generate_issues_and_warnings(
report["issues"], report["warnings"]
)
# All clear message if no issues
if (
not report["issues"]
and not report["warnings"]
and not space_has_issues
and not room_summary
):
response += diagnostic_utils.generate_all_clear_message()
# Try to send the response, and if it's too large, break it up
try:
await evt.respond(response, edits=msg, allow_html=True)
except Exception as e:
error_str = str(e).lower()
if any(
phrase in error_str
for phrase in [
"event too large",
"413",
"payload too large",
"message too long",
]
):
self.log.info(
f"Doctor report too large ({len(response)} chars), breaking into multiple messages"
)
# Break up the response into smaller chunks
chunks = self._split_doctor_report(response)
self.log.info(f"Split report into {len(chunks)} chunks")
# Send the first chunk as an edit to the original message
if chunks:
await evt.respond(chunks[0], edits=msg, allow_html=True)
# Send remaining chunks as new messages
for i, chunk in enumerate(chunks[1:], 2):
await evt.respond(
f"🔍 Bot Permission Diagnostic Report (Part {i}/{len(chunks)})
\n{chunk}",
allow_html=True,
)
await asyncio.sleep(0.5) # Small delay between messages
else:
# Re-raise if it's not a size issue
raise
except Exception as e:
error_msg = f"Failed to run diagnostic check: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)
def _split_doctor_report(
self, report_text: str, max_chunk_size: int = 4000
) -> list[str]:
"""Split a large doctor report into smaller chunks.
Args:
report_text: The full report text to split
max_chunk_size: Maximum size of each chunk in characters
Returns:
list: List of text chunks
"""
return report_utils.split_doctor_report(report_text, max_chunk_size)
def _split_by_sections(self, text: str, max_size: int) -> list[str]:
"""Split text by section headers to maintain logical grouping.
Args:
text: Text to split
max_size: Maximum size per chunk
Returns:
list: List of text chunks
"""
return report_utils._split_by_sections(text, max_size)
async def _doctor_room_detail(self, evt: MessageEvent, room: str) -> None:
"""Generate detailed diagnostic report for a specific room.
Args:
evt: The message event
room: Room ID or alias to analyze
"""
msg = await evt.respond(f"Analyzing room {room}...")
try:
# Resolve room ID if alias provided
room_id = None
if room.startswith("#"):
try:
room_info = await self.client.resolve_room_alias(room)
room_id = room_info["room_id"]
except Exception as e:
await evt.respond(
f"Could not resolve room alias {room}: {e}", edits=msg
)
return
elif room.startswith("!"):
room_id = room
else:
await evt.respond(
f"Invalid room format. Use room ID (!roomid:server) or alias (#alias:server)",
edits=msg,
)
return
# Check if room is in the space
space_rooms = await self.get_space_roomlist()
if room_id not in space_rooms:
await evt.respond(
f"Room {room} is not part of the configured space.", edits=msg
)
return
# Get room name
room_name = room_id
try:
room_name_event = await self.client.get_state_event(
room_id, EventType.ROOM_NAME
)
room_name = room_name_event.name
except:
pass
response = f"🔍 Detailed Analysis: {room_name}
"
response += f"Room ID: {room_id}
"
# Get room version and creators
room_version, creators = await self.get_room_version_and_creators(room_id)
response += f"Room Version: {room_version}
"
if creators:
response += f"Creators: {', '.join(creators)}
"
response += "
"
# Check if bot is in the room
try:
await self.client.get_state_event(
room_id, EventType.ROOM_MEMBER, self.client.mxid
)
response += (
"✅ Bot membership: Bot is a member of this room
"
)
except Exception:
response += "❌ Bot membership: Bot is not a member of this room
"
await evt.respond(response, edits=msg, allow_html=True)
return
# Get power levels
try:
power_levels = await self.client.get_state_event(
room_id, EventType.ROOM_POWER_LEVELS
)
bot_level = power_levels.get_user_level(self.client.mxid)
# Check if bot has unlimited power (creator in modern room versions)
bot_has_unlimited_power = await self.user_has_unlimited_power(
self.client.mxid, room_id
)
response += f"📊 Power Level Analysis
"
response += f"• Bot power level: {bot_level}
"
if bot_has_unlimited_power:
response += f"• Administrative privileges: ✅ Unlimited Power (Creator)
"
else:
response += f"• Administrative privileges: {'✅ Yes' if bot_level >= 100 else '❌ No'}
"
response += (
f"• Default user level: {power_levels.users_default}
"
)
response += f"• Invite level: {power_levels.invite}
"
response += f"• Kick level: {power_levels.kick}
"
response += f"• Ban level: {power_levels.ban}
"
response += f"• Redact level: {power_levels.redact}
"
# Check for users with equal or higher power level
users_higher = []
users_equal = []
for user, level in power_levels.users.items():
if user != self.client.mxid and level >= bot_level:
if level == bot_level:
users_equal.append({"user": user, "level": level})
else:
users_higher.append({"user": user, "level": level})
if bot_has_unlimited_power:
response += f"ℹ️ Creator Status
"
response += f"✅ No power level conflicts relevant: Bot has unlimited power as creator in room version {room_version}
"
else:
if users_higher:
response += f"⚠️ Users with Higher Power Level
"
for user_info in users_higher:
response += f"• {user_info['user']} (level: {user_info['level']})
"
response += "
"
if users_equal:
response += f"⚠️ Users with Equal Power Level
"
for user_info in users_equal:
response += f"• {user_info['user']} (level: {user_info['level']})
"
response += "
"
if not users_higher and not users_equal:
response += (
"✅ No power level conflicts detected
"
)
# Add note about creators in modern room versions
if self.is_modern_room_version(room_version):
response += f"ℹ️ Modern Room Version Note
"
response += f"This room uses version {room_version}, which means creators have unlimited power and cannot be restricted by power levels.
"
# Check specific permissions
response += f"🔐 Permission Analysis
"
# Get required levels for various actions
events_default = power_levels.events_default
events = power_levels.events
permissions = [
(
"Send messages",
events.get(str(EventType.ROOM_MESSAGE), events_default),
),
("Send state events", power_levels.state_default),
(
"Change power levels",
events.get(str(EventType.ROOM_POWER_LEVELS), events_default),
),
("Send tombstone", events.get("m.room.tombstone", events_default)),
("Invite users", power_levels.invite),
("Kick users", power_levels.kick),
("Ban users", power_levels.ban),
("Redact messages", power_levels.redact),
]
for perm_name, required_level in permissions:
has_perm = bot_level >= required_level or bot_has_unlimited_power
status = "✅" if has_perm else "❌"
response += f"• {status} {perm_name}: {'Yes' if has_perm else 'No'} (required: {required_level})
"
except Exception as e:
response += f"❌ Error getting power levels: {e}
"
# Check room state
try:
response += f"🏠 Room State
"
# Check join rules
try:
join_rules = await self.client.get_state_event(
room_id, EventType.ROOM_JOIN_RULES
)
response += f"• Join rule: {join_rules.join_rule}
"
except:
response += "• Join rule: Could not determine
"
# Check encryption
try:
encryption = await self.client.get_state_event(
room_id, EventType.ROOM_ENCRYPTION
)
response += f"• Encryption: ✅ Enabled ({encryption.algorithm})
"
except:
response += "• Encryption: ❌ Not enabled
"
# Check space parent
try:
space_parent = await self.client.get_state_event(
room_id, EventType.SPACE_PARENT
)
response += (
f"• Space parent: ✅ {space_parent.state_key}
"
)
except:
response += "• Space parent: ❌ Not set
"
except Exception as e:
response += f"❌ Error checking room state: {e}
"
await evt.respond(response, edits=msg, allow_html=True)
except Exception as e:
error_msg = f"Failed to analyze room {room}: {e}"
self.log.error(error_msg)
await evt.respond(error_msg, edits=msg)