fix reporting

This commit is contained in:
William Kray
2025-09-09 16:20:43 -07:00
parent 87e02b7ea6
commit 7b2a60d773
6 changed files with 344 additions and 119 deletions
+12 -2
View File
@@ -26,10 +26,15 @@ async def check_space_permissions(
)
bot_level = space_power_levels.get_user_level(client.mxid)
# Check if bot has unlimited power (creator in modern room versions)
from .room_utils import user_has_unlimited_power
bot_has_unlimited_power = await user_has_unlimited_power(client, client.mxid, parent_room)
space_info = {
"room_id": parent_room,
"bot_power_level": bot_level,
"has_admin": bot_level >= 100,
"has_admin": bot_level >= 100 or bot_has_unlimited_power,
"bot_has_unlimited_power": bot_has_unlimited_power,
"users_higher_or_equal": [],
"users_equal": [],
"users_higher": []
@@ -196,7 +201,12 @@ def generate_space_summary(
space_status = "" if space_data.get("has_admin", False) else ""
response = f"<h4>📋 Parent Space</h4><br />"
response += f"{space_status} <b>Administrative privileges:</b> {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})<br />"
# Show admin status with appropriate details
if space_data.get("bot_has_unlimited_power", False):
response += f"{space_status} <b>Administrative privileges:</b> Yes (unlimited power - creator)<br />"
else:
response += f"{space_status} <b>Administrative privileges:</b> {'Yes' if space_data['has_admin'] else 'No'} (level: {space_data['bot_power_level']})<br />"
if space_data.get("users_higher"):
response += f"⚠️ <b>Users with higher power:</b> {', '.join([f'{u['user']} ({u['level']})' for u in space_data['users_higher']])}<br />"
+7 -7
View File
@@ -8,20 +8,20 @@ def generate_activity_report(database_results: Dict[str, List[Dict]]) -> Dict[st
"""Generate an activity report from database results.
Args:
database_results: Dictionary containing 'active', 'inactive', 'ignored' results
database_results: Dictionary containing 'warn_inactive', 'kick_inactive', 'ignored' results
Returns:
dict: Formatted activity report
"""
report = {}
# Process active users
active_results = database_results.get("active", [])
report["active"] = [row["mxid"] for row in active_results] or ["none"]
# Process warn inactive users (between warn and kick thresholds)
warn_inactive_results = database_results.get("warn_inactive", [])
report["warn_inactive"] = [row["mxid"] for row in warn_inactive_results] or ["none"]
# Process inactive users
inactive_results = database_results.get("inactive", [])
report["inactive"] = [row["mxid"] for row in inactive_results] or ["none"]
# Process kick inactive users (beyond kick threshold)
kick_inactive_results = database_results.get("kick_inactive", [])
report["kick_inactive"] = [row["mxid"] for row in kick_inactive_results] or ["none"]
# Process ignored users
ignored_results = database_results.get("ignored", [])
+51 -33
View File
@@ -22,26 +22,29 @@ async def validate_room_creation_params(
Returns:
Tuple of (sanitized_name, force_encryption, force_unencryption, error_msg)
"""
# Check for encryption flags
encrypted_flag_regex = re.compile(r"(\s+|^)-+encrypt(ed)?\s?")
unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?\s?")
# Check for encryption flags (at beginning, middle, or end of string)
encrypted_flag_regex = re.compile(r"(\s+|^)-+encrypt(ed)?(\s+|$)")
unencrypted_flag_regex = re.compile(r"(\s+|^)-+unencrypt(ed)?(\s+|$)")
force_encryption = bool(encrypted_flag_regex.search(roomname))
force_unencryption = bool(unencrypted_flag_regex.search(roomname))
# Clean up room name
if force_encryption:
roomname = encrypted_flag_regex.sub("", roomname)
roomname = encrypted_flag_regex.sub("", roomname) # Remove encryption flag
if force_unencryption:
roomname = unencrypted_flag_regex.sub("", roomname)
roomname = unencrypted_flag_regex.sub("", roomname) # Remove unencryption flag
# Clean up any extra whitespace
roomname = re.sub(r"\s+", " ", roomname).strip()
sanitized_name = re.sub(r"[^a-zA-Z0-9]", "", roomname).lower()
# Check if community slug is configured
if not config.get("community_slug"):
if not config.get("community_slug", ""):
error_msg = "No community slug configured. Please run initialize command first."
return sanitized_name, force_encryption, force_unencryption, error_msg
return sanitized_name, force_encryption, force_unencryption, error_msg, roomname
return sanitized_name, force_encryption, force_unencryption, ""
return sanitized_name, force_encryption, force_unencryption, "", roomname
async def prepare_room_creation_data(
@@ -62,12 +65,12 @@ async def prepare_room_creation_data(
Tuple of (alias_localpart, server, room_invitees, parent_room)
"""
# Create alias with community slug
alias_localpart = f"{sanitized_name}-{config['community_slug']}"
alias_localpart = f"{sanitized_name}-{config.get('community_slug', '')}"
# Get server and invitees
server = client.parse_user_id(client.mxid)[1]
room_invitees = invitees if invitees is not None else config["invitees"]
parent_room = config["parent_room"]
room_invitees = invitees if invitees is not None else config.get("invitees", [])
parent_room = config.get("parent_room", "")
return alias_localpart, server, room_invitees, parent_room
@@ -93,36 +96,51 @@ async def prepare_power_levels(
return power_level_override
if parent_room:
# Get parent room power levels to extract user power levels
parent_power_levels = await client.get_state_event(
parent_room, EventType.ROOM_POWER_LEVELS
)
# Create new power levels with server defaults, not copying all permissions from space
power_levels = PowerLevelStateEventContent()
# Copy only user power levels from parent space, not the entire permission set
if parent_power_levels.users:
user_power_levels = parent_power_levels.users.copy()
# Ensure bot has highest power
user_power_levels[client.mxid] = 1000
power_levels.users = user_power_levels
else:
try:
# Get parent room power levels to extract user power levels
parent_power_levels = await client.get_state_event(
parent_room, EventType.ROOM_POWER_LEVELS
)
# Create new power levels with server defaults, not copying all permissions from space
power_levels = PowerLevelStateEventContent()
# Copy only user power levels from parent space, not the entire permission set
if parent_power_levels and hasattr(parent_power_levels, 'users') and parent_power_levels.users:
try:
user_power_levels = parent_power_levels.users.copy()
# Ensure bot has highest power
user_power_levels[client.mxid] = 1000
power_levels.users = user_power_levels
except Exception as e:
# If copying users fails, create default power levels
power_levels.users = {
client.mxid: 1000, # Bot gets highest power
}
else:
power_levels.users = {
client.mxid: 1000, # Bot gets highest power
}
# Set explicit config values
power_levels.invite = config.get("invite_power_level", 50)
return power_levels
except Exception as e:
# If we can't get parent power levels, create default ones
power_levels = PowerLevelStateEventContent()
power_levels.users = {
client.mxid: 1000, # Bot gets highest power
}
# Set explicit config values
power_levels.invite = config["invite_power_level"]
return power_levels
power_levels.invite = config.get("invite_power_level", 50)
return power_levels
else:
# If no parent room, create default power levels
power_levels = PowerLevelStateEventContent()
power_levels.users = {
client.mxid: 1000, # Bot gets highest power
}
power_levels.invite = config["invite_power_level"]
power_levels.invite = config.get("invite_power_level", 50)
return power_levels
@@ -186,7 +204,7 @@ def prepare_initial_state(
initial_state.append({
"type": str(EventType.ROOM_HISTORY_VISIBILITY),
"content": {
"history_visibility": creation_content["m.room.history_visibility"]
"history_visibility": creation_content.get("m.room.history_visibility", "joined")
}
})
+2 -2
View File
@@ -61,7 +61,7 @@ async def validate_room_aliases(client, room_names: list[str], community_slug: s
return len(conflicting_aliases) == 0, conflicting_aliases
async def get_room_version_and_creators(client, room_id: str) -> Tuple[str, List[str]]:
async def get_room_version_and_creators(client, room_id: str, logger=None) -> Tuple[str, List[str]]:
"""Get the room version and creators for a room.
Args:
@@ -134,7 +134,7 @@ async def user_has_unlimited_power(client, user_id: str, room_id: str) -> bool:
bool: True if user has unlimited power
"""
try:
room_version, creators = await get_room_version_and_creators(client, room_id)
room_version, creators = await get_room_version_and_creators(client, room_id, None)
# In modern room versions (12+), creators have unlimited power
if is_modern_room_version(room_version):