new feature: archive or replace existing rooms
good for shutting off temporary rooms and keeping your space tidy, and migrating to new rooms that the bot has appropriate permissions for. probably doesn't solve every problem but should make it a little easier to fix permission issues and such.
This commit is contained in:
@@ -110,6 +110,17 @@ bot is still able to manage room admins. the bot will also invite other users to
|
||||
|
||||
rooms created by the bot will have join restriction limited to members of the space.
|
||||
|
||||
## room archival and replacement
|
||||
|
||||
use the `archive` subcommand to archive a room. this will remove the room from the parent space, remove all room aliases, and add a tombstone event to indicate the room is archived
|
||||
|
||||
use the `replaceroom` subcommand to replace an existing room with a new one. this is useful when:
|
||||
- room members have power levels that cannot be corrected, or room members you cannot kick out
|
||||
- you need to revert encryption settings
|
||||
- you want to start fresh with a new room while preserving the old room's name and aliases
|
||||
|
||||
the replacement process will create a new room with the same name and avatar, transfer all room aliases to the new room, and archive the old room with a pointer to the new room. the new room will have standard join rules that restrict membership to space members. this logic is a little clunky, but it seems to work.
|
||||
|
||||
## get room ID
|
||||
|
||||
sometimes you need to know a rooms identifier, but if the room has an alias associated with it not all clients make it
|
||||
|
||||
+398
-53
@@ -269,6 +269,185 @@ class CommunityBot(Plugin):
|
||||
await asyncio.sleep(sleep_time)
|
||||
return counters
|
||||
|
||||
async def check_bot_permissions(self, room_id: str, evt: MessageEvent = None, required_permissions: list[str] = None) -> tuple[bool, str, dict]:
|
||||
"""Check if the bot has necessary permissions in a room.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room to check permissions in
|
||||
evt: Optional MessageEvent for progress updates
|
||||
required_permissions: List of specific permissions to check. If None, checks basic room access.
|
||||
|
||||
Returns:
|
||||
tuple: (bool, str, dict) - (has_permissions, error_message, permission_details)
|
||||
"""
|
||||
try:
|
||||
# Check if bot is in the room
|
||||
try:
|
||||
await self.client.get_state_event(room_id, EventType.ROOM_MEMBER, self.client.mxid)
|
||||
except MNotFound:
|
||||
return False, "Bot is not a member of this room", {}
|
||||
|
||||
# Get power levels
|
||||
power_levels = await self.client.get_state_event(room_id, EventType.ROOM_POWER_LEVELS)
|
||||
bot_level = power_levels.users.get(self.client.mxid, power_levels.users_default)
|
||||
|
||||
# Define required power levels for different actions
|
||||
permission_requirements = {
|
||||
"redact": power_levels.redact,
|
||||
"kick": power_levels.kick,
|
||||
"ban": power_levels.ban,
|
||||
"invite": power_levels.invite,
|
||||
"tombstone": power_levels.events.get("m.room.tombstone", power_levels.events_default),
|
||||
"power_levels": power_levels.events.get("m.room.power_levels", power_levels.events_default),
|
||||
"state": power_levels.state_default
|
||||
}
|
||||
|
||||
# Check each required permission
|
||||
permission_status = {}
|
||||
if required_permissions:
|
||||
for perm in required_permissions:
|
||||
if perm in permission_requirements:
|
||||
required_level = permission_requirements[perm]
|
||||
permission_status[perm] = {
|
||||
"has_permission": bot_level >= required_level,
|
||||
"required_level": required_level,
|
||||
"bot_level": bot_level
|
||||
}
|
||||
|
||||
# If no specific permissions requested, just check basic access
|
||||
if not required_permissions:
|
||||
if bot_level < 50: # Basic moderator level
|
||||
return False, "Bot does not have sufficient power level (needs at least moderator level)", permission_status
|
||||
return True, "", permission_status
|
||||
|
||||
# Check if all requested permissions are granted
|
||||
missing_permissions = [perm for perm, status in permission_status.items()
|
||||
if not status["has_permission"]]
|
||||
|
||||
if missing_permissions:
|
||||
error_msg = "Bot is missing required permissions: " + ", ".join(missing_permissions)
|
||||
return False, error_msg, permission_status
|
||||
|
||||
return True, "", permission_status
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to check bot permissions: {e}"
|
||||
self.log.error(error_msg)
|
||||
if evt:
|
||||
await evt.respond(error_msg)
|
||||
return False, error_msg, {}
|
||||
|
||||
async def do_archive_room(self, room_id: str, evt: MessageEvent = None, replacement_room: str = "") -> bool:
|
||||
"""Handle common room archival activities like removing from space, removing aliases, and setting tombstone.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room to archive
|
||||
evt: Optional MessageEvent for progress updates
|
||||
replacement_room: Optional room ID to point to in the tombstone event
|
||||
|
||||
Returns:
|
||||
bool: True if all operations succeeded, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Check permissions for all required operations
|
||||
has_perms, error_msg, _ = await self.check_bot_permissions(
|
||||
room_id,
|
||||
evt,
|
||||
["state", "tombstone", "power_levels"]
|
||||
)
|
||||
if not has_perms:
|
||||
if evt:
|
||||
await evt.respond(f"Cannot archive room: {error_msg}")
|
||||
return False
|
||||
|
||||
# Try to remove the room from the space first
|
||||
self.log.debug(f"DEBUG removing space state reference from {room_id}")
|
||||
await self.client.send_state_event(
|
||||
room_id=room_id,
|
||||
event_type="m.space.parent",
|
||||
content={}, # Empty content removes the state
|
||||
state_key=self.config["parent_room"]
|
||||
)
|
||||
self.log.info(f"Removed parent space reference from room {room_id}")
|
||||
|
||||
# Remove the child reference from the space
|
||||
self.log.debug(f"DEBUG removing child state reference from {self.config['parent_room']}")
|
||||
await self.client.send_state_event(
|
||||
self.config["parent_room"],
|
||||
event_type="m.space.child",
|
||||
content={}, # Empty content removes the state
|
||||
state_key=room_id
|
||||
)
|
||||
self.log.info(f"Removed child room reference from space {self.config['parent_room']}")
|
||||
|
||||
# Remove room aliases to release them
|
||||
await self.remove_room_aliases(room_id, evt)
|
||||
|
||||
# Send the tombstone
|
||||
tombstone_content = {
|
||||
"body": "This room has been archived." if not replacement_room else "This room has been replaced. Please join the new room.",
|
||||
"replacement_room": replacement_room
|
||||
}
|
||||
await self.client.send_state_event(
|
||||
room_id=room_id,
|
||||
event_type=EventType.ROOM_TOMBSTONE,
|
||||
content=tombstone_content
|
||||
)
|
||||
self.log.info(f"Successfully added tombstone to room {room_id}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to archive room: {e}"
|
||||
self.log.error(error_msg)
|
||||
if evt:
|
||||
await evt.respond(error_msg)
|
||||
return False
|
||||
|
||||
async def remove_room_aliases(self, room_id: str, evt: MessageEvent = None) -> list:
|
||||
"""Remove all aliases from a room.
|
||||
|
||||
Args:
|
||||
room_id: The ID of the room whose aliases to remove
|
||||
evt: Optional MessageEvent for progress updates
|
||||
|
||||
Returns:
|
||||
list: List of aliases that were successfully removed
|
||||
"""
|
||||
removed_aliases = []
|
||||
|
||||
try:
|
||||
aliases = await self.client.get_state_event(
|
||||
room_id=room_id,
|
||||
event_type=EventType.ROOM_CANONICAL_ALIAS
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(f"Failed to get room alias state event, skipping: {e}")
|
||||
return removed_aliases
|
||||
|
||||
if aliases.alt_aliases:
|
||||
for alias in aliases.alt_aliases:
|
||||
try:
|
||||
await self.client.remove_room_alias(
|
||||
alias_localpart=alias.split(':')[0].lstrip('#'),
|
||||
)
|
||||
self.log.info(f"Removed alias {alias} from room {room_id}")
|
||||
removed_aliases.append(alias)
|
||||
except Exception as e:
|
||||
self.log.warning(f"Failed to remove alias {alias}: {e}")
|
||||
|
||||
if aliases.canonical_alias:
|
||||
try:
|
||||
await self.client.remove_room_alias(
|
||||
alias_localpart=aliases.canonical_alias.split(':')[0].lstrip('#'),
|
||||
)
|
||||
self.log.info(f"Removed canonical alias {aliases.canonical_alias} from room {room_id}")
|
||||
removed_aliases.append(aliases.canonical_alias)
|
||||
except Exception as e:
|
||||
self.log.warning(f"Failed to remove canonical alias: {e}")
|
||||
|
||||
return removed_aliases
|
||||
|
||||
async def ban_this_user(self, user, reason="banned", all_rooms=False):
|
||||
roomlist = await self.get_space_roomlist()
|
||||
# don't forget to kick from the space itself
|
||||
@@ -734,6 +913,80 @@ class CommunityBot(Plugin):
|
||||
else:
|
||||
await evt.reply("lol you don't have permission to do that")
|
||||
|
||||
async def create_room(self, roomname: str, evt: MessageEvent = None) -> None:
|
||||
"""Create a new room and add it to the parent space.
|
||||
|
||||
Args:
|
||||
roomname: The name for the new room
|
||||
evt: Optional MessageEvent for progress updates. If provided, will send status messages.
|
||||
|
||||
Returns:
|
||||
tuple: (room_id, room_alias) if successful, None if failed
|
||||
"""
|
||||
encrypted_flag_regex = re.compile(r'(\s+|^)-+encrypt(ed)?\s?')
|
||||
force_encryption = bool(encrypted_flag_regex.search(roomname))
|
||||
try:
|
||||
if force_encryption:
|
||||
roomname = encrypted_flag_regex.sub('', roomname)
|
||||
sanitized_name = re.sub(r"[^a-zA-Z0-9]", '', roomname).lower()
|
||||
invitees = self.config['invitees']
|
||||
parent_room = self.config['parent_room']
|
||||
server = self.client.parse_user_id(self.client.mxid)[1]
|
||||
|
||||
# Set bot PL higher than admin so we can kick old admins if needed
|
||||
pl_override = {"users": {self.client.mxid: 1000}}
|
||||
for u in self.config['admins']:
|
||||
pl_override["users"][u] = 100
|
||||
for u in self.config['moderators']:
|
||||
pl_override["users"][u] = 50
|
||||
|
||||
if evt:
|
||||
mymsg = await evt.respond(f"creating {sanitized_name}, give me a minute...")
|
||||
|
||||
room_id = await self.client.create_room(
|
||||
alias_localpart=sanitized_name,
|
||||
name=roomname,
|
||||
invitees=invitees,
|
||||
power_level_override=pl_override
|
||||
)
|
||||
await asyncio.sleep(self.config['sleep'])
|
||||
|
||||
if evt:
|
||||
await evt.respond(f"updating room states...", edits=mymsg)
|
||||
|
||||
# Set up room state events
|
||||
parent_event_content = json.dumps({'auto_join': False, 'suggested': False, 'via': [server]})
|
||||
child_event_content = json.dumps({'canonical': True, 'via': [server]})
|
||||
join_rules_content = json.dumps({
|
||||
'join_rule': 'restricted',
|
||||
'allow': [{'type': 'm.room_membership', 'room_id': parent_room}]
|
||||
})
|
||||
|
||||
await self.client.send_state_event(parent_room, 'm.space.child', parent_event_content, state_key=room_id)
|
||||
await asyncio.sleep(self.config['sleep'])
|
||||
await self.client.send_state_event(room_id, 'm.space.parent', child_event_content, state_key=parent_room)
|
||||
await asyncio.sleep(self.config['sleep'])
|
||||
await self.client.send_state_event(room_id, 'm.room.join_rules', join_rules_content, state_key="")
|
||||
await asyncio.sleep(self.config['sleep'])
|
||||
|
||||
if self.config["encrypt"] or force_encryption:
|
||||
encryption_content = json.dumps({"algorithm": "m.megolm.v1.aes-sha2"})
|
||||
await self.client.send_state_event(room_id, 'm.room.encryption', encryption_content, state_key="")
|
||||
if evt:
|
||||
await evt.respond(f"encrypting room...", edits=mymsg)
|
||||
await asyncio.sleep(self.config['sleep'])
|
||||
|
||||
if evt:
|
||||
await evt.respond(f"room created and updated, alias is #{sanitized_name}:{server}", edits=mymsg)
|
||||
|
||||
return room_id, f"#{sanitized_name}:{server}"
|
||||
|
||||
except Exception as e:
|
||||
if evt:
|
||||
await evt.respond(f"i tried, but something went wrong: \"{e}\"", edits=mymsg)
|
||||
self.log.error(f"Failed to create room: {e}")
|
||||
return None
|
||||
|
||||
@community.subcommand("createroom", help="create a new room titled <roomname> and add it to the parent space. \
|
||||
optionally include `--encrypt` to encrypt it regardless of the default settings.")
|
||||
@command.argument("roomname", pass_raw=True, required=True)
|
||||
@@ -744,64 +997,156 @@ class CommunityBot(Plugin):
|
||||
setting.')
|
||||
else:
|
||||
if evt.sender in self.config["admins"] or evt.sender in self.config["moderators"]:
|
||||
encrypted_flag_regex = re.compile(r'(\s+|^)-+encrypt(ed)?\s?')
|
||||
force_encryption = bool(encrypted_flag_regex.search(roomname))
|
||||
try:
|
||||
if force_encryption:
|
||||
roomname = encrypted_flag_regex.sub('', roomname)
|
||||
sanitized_name = re.sub(r"[^a-zA-Z0-9]", '', roomname).lower()
|
||||
invitees = self.config['invitees']
|
||||
parent_room = self.config['parent_room']
|
||||
## homeserver is derived from maubot's client instance since this is the user that will create the room
|
||||
server = self.client.parse_user_id(self.client.mxid)[1]
|
||||
# set bot PL higher than admin so we can kick old admins if needed
|
||||
pl_override = {"users": {self.client.mxid: 1000}}
|
||||
for u in self.config['admins']:
|
||||
pl_override["users"][u] = 100
|
||||
for u in self.config['moderators']:
|
||||
pl_override["users"][u] = 50
|
||||
pl_json = json.dumps(pl_override)
|
||||
|
||||
mymsg = await evt.respond(f"creating {sanitized_name}, give me a minute...")
|
||||
#self.log.info(mymsg)
|
||||
room_id = await self.client.create_room(alias_localpart=sanitized_name, name=roomname,
|
||||
invitees=invitees, power_level_override=pl_override)
|
||||
time.sleep(self.config['sleep'])
|
||||
|
||||
await evt.respond(f"updating room states...", edits=mymsg)
|
||||
parent_event_content = json.dumps({'auto_join': False, 'suggested': False, 'via': [server]})
|
||||
child_event_content = json.dumps({'canonical': True, 'via': [server]})
|
||||
join_rules_content = json.dumps({'join_rule': 'restricted', 'allow': [{'type': 'm.room_membership',
|
||||
'room_id': parent_room}]})
|
||||
|
||||
await self.client.send_state_event(parent_room, 'm.space.child', parent_event_content, state_key=room_id)
|
||||
time.sleep(self.config['sleep'])
|
||||
await self.client.send_state_event(room_id, 'm.space.parent', child_event_content, state_key=parent_room)
|
||||
time.sleep(self.config['sleep'])
|
||||
await self.client.send_state_event(room_id, 'm.room.join_rules', join_rules_content, state_key="")
|
||||
time.sleep(self.config['sleep'])
|
||||
|
||||
if self.config["encrypt"] or force_encryption:
|
||||
encryption_content = json.dumps({"algorithm": "m.megolm.v1.aes-sha2"})
|
||||
|
||||
await self.client.send_state_event(room_id, 'm.room.encryption', encryption_content,
|
||||
state_key="")
|
||||
await evt.respond(f"encrypting room...", edits=mymsg)
|
||||
time.sleep(self.config['sleep'])
|
||||
|
||||
await evt.respond(f"room created and updated, alias is #{sanitized_name}:{server}", edits=mymsg)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
await evt.respond(f"i tried, but something went wrong: \"{e}\"", edits=mymsg)
|
||||
await self.create_room(roomname, evt)
|
||||
else:
|
||||
await evt.reply("you're not the boss of me!")
|
||||
|
||||
@community.subcommand("archive", help="archive a room")
|
||||
@command.argument("room", required=False)
|
||||
async def archive_room(self, evt: MessageEvent, room: str) -> None:
|
||||
await evt.mark_read()
|
||||
|
||||
#need to somehow regularly fetch and update the list of room ids that are associated with a given space
|
||||
#to track events within so that we are actually only paying attention to those rooms
|
||||
if evt.sender in self.config["admins"] or evt.sender in self.config["moderators"]:
|
||||
if not room:
|
||||
room_id = evt.room_id
|
||||
self.log.debug(f"DEBUG room we are archiving is {room_id}")
|
||||
elif room and room.startswith('#'):
|
||||
try:
|
||||
self.log.debug(f"DEBUG trying to resolve alias {room}")
|
||||
room_id = await self.client.resolve_room_alias(room)
|
||||
room_id = room_id["room_id"]
|
||||
self.log.debug(f"DEBUG room we are archiving is {room_id}")
|
||||
except Exception as e:
|
||||
evt.reply("i couldn't resolve that alias, sorry")
|
||||
self.log.error(f"error resolving alias {room}: {e}")
|
||||
return
|
||||
elif room and room.startswith('!'):
|
||||
room_id = room
|
||||
self.log.debug(f"DEBUG room we are archiving is {room_id}")
|
||||
else:
|
||||
evt.reply("i don't recognize that room, sorry")
|
||||
return
|
||||
|
||||
success = await self.do_archive_room(room_id, evt)
|
||||
|
||||
# Only try to respond if we're not archiving the room we're in
|
||||
if success and room_id != evt.room_id:
|
||||
await evt.respond("Room has been archived.")
|
||||
|
||||
@community.subcommand("replaceroom", help="replace a room with a new one")
|
||||
@command.argument("room", required=False)
|
||||
async def replace_room(self, evt: MessageEvent, room: str) -> None:
|
||||
await evt.mark_read()
|
||||
|
||||
if evt.sender in self.config["admins"] or evt.sender in self.config["moderators"]:
|
||||
if not room:
|
||||
room = evt.room_id
|
||||
# first we need to get relevant room state of the room we want to replace
|
||||
# this includes the room name, alias, and join rules
|
||||
if room.startswith('#'):
|
||||
room_id = await self.client.resolve_room_alias(room)
|
||||
room_id = room_id["room_id"]
|
||||
else:
|
||||
room_id = room
|
||||
|
||||
# Check bot permissions in the old room
|
||||
has_perms, error_msg, _ = await self.check_bot_permissions(
|
||||
room_id,
|
||||
evt,
|
||||
["state", "tombstone", "power_levels"]
|
||||
)
|
||||
if not has_perms:
|
||||
await evt.respond(f"Cannot replace room: {error_msg}")
|
||||
return
|
||||
|
||||
# Get the room name from the state event
|
||||
try:
|
||||
room_name_event = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
|
||||
room_name = room_name_event.name
|
||||
except Exception as e:
|
||||
self.log.warning(f"Failed to get room name: {e}")
|
||||
#await evt.respond("Could not find room name in state events")
|
||||
pass
|
||||
|
||||
# get the room topic from the state event
|
||||
try:
|
||||
room_topic_event = await self.client.get_state_event(room_id, EventType.ROOM_TOPIC)
|
||||
room_topic = room_topic_event.topic
|
||||
except Exception as e:
|
||||
self.log.warning(f"Failed to get room topic: {e}")
|
||||
pass
|
||||
|
||||
# Get list of aliases to transfer while removing them from the old room
|
||||
aliases_to_transfer = await self.remove_room_aliases(room_id, evt)
|
||||
|
||||
# Now we can start the process of replacing the room
|
||||
# First we need to create the new room. this will create the initial alias,
|
||||
# as well as bot defaults such as power levels, initial invitations, encryption,
|
||||
# and space membership
|
||||
new_room_id, new_room_alias = await self.create_room(room_name, evt)
|
||||
if not new_room_id:
|
||||
await evt.respond("Failed to create new room")
|
||||
return
|
||||
|
||||
# Check bot permissions in the new room
|
||||
has_perms, error_msg, _ = await self.check_bot_permissions(
|
||||
new_room_id,
|
||||
evt,
|
||||
["state", "tombstone", "power_levels"]
|
||||
)
|
||||
if not has_perms:
|
||||
await evt.respond(f"Created new room but cannot complete replacement: {error_msg}")
|
||||
return
|
||||
|
||||
# Transfer the aliases to the new room
|
||||
for alias in aliases_to_transfer:
|
||||
localpart = alias.split(':')[0][1:] # Remove # and get localpart
|
||||
server = alias.split(':')[1]
|
||||
try:
|
||||
await self.client.add_room_alias(new_room_id, localpart)
|
||||
self.log.info(f"Successfully transferred alias {alias} to new room {new_room_id}")
|
||||
except Exception as e:
|
||||
# If transfer failed, try to create a modified alias
|
||||
modified_alias = f"{localpart}NEW"
|
||||
try:
|
||||
await self.client.add_room_alias(new_room_id, modified_alias)
|
||||
self.log.info(f"Successfully transferred modified alias {modified_alias} to new room {new_room_id}")
|
||||
except Exception as e2:
|
||||
self.log.error(f"Failed to transfer modified alias {modified_alias}: {e2}")
|
||||
|
||||
# Get the room avatar from the old room
|
||||
try:
|
||||
old_room_avatar = await self.client.get_state_event(room_id, EventType.ROOM_AVATAR)
|
||||
if old_room_avatar and old_room_avatar.url:
|
||||
# Set the same avatar in the new room
|
||||
await self.client.send_state_event(
|
||||
new_room_id,
|
||||
EventType.ROOM_AVATAR,
|
||||
{"url": old_room_avatar.url}
|
||||
)
|
||||
self.log.info(f"Successfully copied room avatar to new room {new_room_id}")
|
||||
except Exception as e:
|
||||
self.log.error(f"Failed to copy room avatar to new room: {e}")
|
||||
#await evt.respond(f"Failed to copy room avatar to new room: {e}")
|
||||
|
||||
# Set the room topic in the new room
|
||||
try:
|
||||
await self.client.send_state_event(
|
||||
new_room_id,
|
||||
EventType.ROOM_TOPIC,
|
||||
{"topic": room_topic}
|
||||
)
|
||||
self.log.info(f"Successfully copied room topic to new room {new_room_id}")
|
||||
except Exception as e:
|
||||
self.log.error(f"Failed to copy room topic to new room: {e}")
|
||||
#await evt.respond(f"Failed to copy room topic to new room: {e}")
|
||||
|
||||
|
||||
# Archive the old room with a pointer to the new room
|
||||
success = await self.do_archive_room(room_id, evt, new_room_id)
|
||||
if not success:
|
||||
await evt.respond("Failed to archive old room, but new room has been created")
|
||||
|
||||
## loop through each room and report people who are "guests" (in the room, but not members of the space)
|
||||
@community.subcommand("guests", help="generate a list of members in a room who are not members of the parent space")
|
||||
@command.argument("room", required=False)
|
||||
async def get_guestlist(self, evt: MessageEvent, room: str) -> None:
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
maubot: 0.1.0
|
||||
id: org.jobmachine.communitybot
|
||||
version: 0.1.18
|
||||
version: 0.1.19
|
||||
license: MIT
|
||||
modules:
|
||||
- community
|
||||
|
||||
Reference in New Issue
Block a user