edd3eee178
This change introduces native `matrix:` URI-based rendering for `{user}` and `{room}` placeholders,
replacing previous plaintext and matrix.to-based links. Users and rooms are now rendered as clickable
pills in supporting clients, with a clean display using display names and room names (no @/# prefixes).
Reporting, moderation, and auto-redaction messages have been updated to use the same rendering logic.
Inspect and event links now also use native `matrix:` URIs for direct in-client navigation.
Internally, URI generation and rendering logic have been unified via central helper functions,
ensuring consistent handling of user IDs, room IDs, aliases, and event IDs.
This commit also includes a broader refactor of the codebase:
- decomposed complex flows (e.g. join handling) into smaller helpers
- moved mutable class-level state to instance-level
- reduced duplicate API calls and redundant logic
- improved overall structure and maintainability
Test coverage has been extended for URI helpers and rendering logic to prevent regressions.
No breaking changes to existing template parameters like `{user_link}` or `{room_link}`.
216 lines
6.7 KiB
Python
216 lines
6.7 KiB
Python
"""User management utility functions."""
|
|
|
|
import fnmatch
|
|
import re
|
|
import time
|
|
from typing import List, Dict, Tuple
|
|
from mautrix.types import EventType, UserID
|
|
from mautrix.errors import MNotFound
|
|
|
|
|
|
async def check_if_banned(client, userid: str, banlists: List[str], logger) -> bool:
|
|
"""Check if a user is banned according to banlists.
|
|
|
|
Args:
|
|
client: Matrix client instance
|
|
userid: The user ID to check
|
|
banlists: List of banlist room IDs or aliases
|
|
logger: Logger instance for error reporting
|
|
|
|
Returns:
|
|
bool: True if user is banned
|
|
"""
|
|
is_banned = False
|
|
myrooms = await client.get_joined_rooms()
|
|
banlist_roomids = await get_banlist_roomids(client, banlists, logger)
|
|
|
|
for list_id in banlist_roomids:
|
|
if list_id not in myrooms:
|
|
logger.error(
|
|
f"Bot must be in {list_id} before attempting to use it as a banlist."
|
|
)
|
|
continue
|
|
|
|
try:
|
|
list_state = await client.get_state(list_id)
|
|
user_policies = list(
|
|
filter(lambda p: p.type.t == "m.policy.rule.user", list_state)
|
|
)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
continue
|
|
|
|
for rule in user_policies:
|
|
try:
|
|
if bool(fnmatch.fnmatch(userid, rule["content"]["entity"])) and bool(
|
|
re.search("ban$", rule["content"]["recommendation"])
|
|
):
|
|
return True
|
|
except Exception:
|
|
# Skip invalid rules
|
|
pass
|
|
|
|
return is_banned
|
|
|
|
|
|
async def get_banlist_roomids(client, banlists: List[str], logger) -> List[str]:
|
|
"""Get room IDs for all configured banlists.
|
|
|
|
Args:
|
|
client: Matrix client instance
|
|
banlists: List of banlist room IDs or aliases
|
|
logger: Logger instance for error reporting
|
|
|
|
Returns:
|
|
list: List of room IDs for banlists
|
|
"""
|
|
banlist_roomids = []
|
|
for banlist in banlists:
|
|
if banlist.startswith("#"):
|
|
try:
|
|
room_info = await client.resolve_room_alias(banlist)
|
|
list_id = room_info["room_id"]
|
|
banlist_roomids.append(list_id)
|
|
except Exception as e:
|
|
logger.error(f"Banlist fetching failed for {banlist}: {e}")
|
|
continue
|
|
else:
|
|
list_id = banlist
|
|
banlist_roomids.append(list_id)
|
|
|
|
return banlist_roomids
|
|
|
|
|
|
async def ban_user_from_rooms(
|
|
client,
|
|
user: str,
|
|
roomlist: List[str],
|
|
reason: str = "banned",
|
|
all_rooms: bool = False,
|
|
redact_on_ban: bool = False,
|
|
get_messages_to_redact_func=None,
|
|
database=None,
|
|
sleep_time: float = 0.1,
|
|
logger=None,
|
|
) -> Dict:
|
|
"""Ban a user from a list of rooms.
|
|
|
|
Args:
|
|
client: Matrix client instance
|
|
user: User ID to ban
|
|
roomlist: List of room IDs to ban from
|
|
reason: Reason for the ban
|
|
all_rooms: Whether to ban even if user is not in room
|
|
redact_on_ban: Whether to queue messages for redaction
|
|
get_messages_to_redact_func: Function to get messages to redact
|
|
database: Database instance for redaction tasks
|
|
sleep_time: Sleep time between operations
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
dict: Ban results with success/error lists
|
|
"""
|
|
ban_event_map = {"ban_list": {}, "error_list": {}}
|
|
ban_event_map["ban_list"][user] = []
|
|
|
|
for room in roomlist:
|
|
try:
|
|
roomname = None
|
|
try:
|
|
roomnamestate = await client.get_state_event(room, "m.room.name")
|
|
roomname = roomnamestate["name"]
|
|
except:
|
|
pass
|
|
|
|
# ban user even if they're not in the room!
|
|
if not all_rooms:
|
|
await client.get_state_event(room, EventType.ROOM_MEMBER, user)
|
|
|
|
await client.ban_user(room, user, reason=reason)
|
|
if roomname:
|
|
ban_event_map["ban_list"][user].append(roomname)
|
|
else:
|
|
ban_event_map["ban_list"][user].append(room)
|
|
time.sleep(sleep_time)
|
|
except MNotFound:
|
|
pass
|
|
except Exception as e:
|
|
if logger:
|
|
logger.warning(e)
|
|
ban_event_map["error_list"][user] = []
|
|
ban_event_map["error_list"][user].append(roomname or room)
|
|
|
|
if redact_on_ban and get_messages_to_redact_func and database:
|
|
messages = await get_messages_to_redact_func(room, user)
|
|
# Queue messages for redaction
|
|
for msg in messages:
|
|
await database.execute(
|
|
"INSERT INTO redaction_tasks (event_id, room_id) VALUES ($1, $2)",
|
|
msg.event_id,
|
|
room,
|
|
)
|
|
if logger:
|
|
logger.info(
|
|
f"Queued {len(messages)} messages for redaction in {roomname or room}"
|
|
)
|
|
|
|
return ban_event_map
|
|
|
|
|
|
async def user_permitted(
|
|
client,
|
|
user_id: UserID,
|
|
parent_room: str,
|
|
min_level: int = 50,
|
|
room_id: str = None,
|
|
logger=None,
|
|
) -> bool:
|
|
"""Check if a user has sufficient power level in a room.
|
|
|
|
Args:
|
|
client: Matrix client instance
|
|
user_id: The Matrix ID of the user to check
|
|
parent_room: The parent room ID
|
|
min_level: Minimum required power level (default 50 for moderator)
|
|
room_id: The room ID to check permissions in. If None, uses parent room.
|
|
logger: Logger instance for error reporting
|
|
|
|
Returns:
|
|
bool: True if user has sufficient power level
|
|
"""
|
|
try:
|
|
target_room = room_id or parent_room
|
|
|
|
# First check if user has unlimited power (creator in modern room versions)
|
|
from .room_utils import user_has_unlimited_power
|
|
|
|
if await user_has_unlimited_power(client, user_id, target_room):
|
|
return True
|
|
|
|
# Then check power level
|
|
power_levels = await client.get_state_event(
|
|
target_room, EventType.ROOM_POWER_LEVELS
|
|
)
|
|
user_level = power_levels.get_user_level(user_id)
|
|
return user_level >= min_level
|
|
except Exception as e:
|
|
if logger:
|
|
logger.error(f"Failed to check user power level: {e}")
|
|
return False
|
|
|
|
|
|
async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
|
|
"""Check if a user has unlimited power in a room (creator in modern room versions).
|
|
|
|
Args:
|
|
client: Matrix client instance
|
|
user_id: The user ID to check
|
|
room_id: The room ID to check in
|
|
|
|
Returns:
|
|
bool: True if user has unlimited power
|
|
"""
|
|
from .room_utils import user_has_unlimited_power as room_user_has_unlimited_power
|
|
|
|
return await room_user_has_unlimited_power(client, user_id, room_id)
|