initial verification check dm logic

This commit is contained in:
William Kray
2025-04-06 21:07:01 -07:00
parent 49ae7bd66d
commit 5b973920ec
4 changed files with 177 additions and 6 deletions
+133 -4
View File
@@ -1,11 +1,12 @@
# kickbot - a maubot plugin to track user activity and remove inactive users from rooms/spaces.
from typing import Awaitable, Type, Optional, Tuple
from typing import Awaitable, Type, Optional, Tuple, Dict
import json
import time
import re
import fnmatch
import asyncio
import random
import asyncpg.exceptions
from mautrix.client import (
@@ -33,6 +34,7 @@ from mautrix.types import (
SpaceParentStateEventContent,
JoinRulesStateEventContent,
JoinRule,
RoomCreatePreset,
)
from mautrix.errors import MNotFound
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
@@ -71,11 +73,15 @@ class Config(BaseProxyConfig):
helper.copy("banlists")
helper.copy("proactive_banning")
helper.copy("redact_on_ban")
helper.copy("check_if_human")
helper.copy("verification_phrases")
helper.copy("verification_attempts")
class CommunityBot(Plugin):
_redaction_tasks: asyncio.Task = None
_verification_states: Dict[str, Dict] = {}
async def start(self) -> None:
await super().start()
@@ -754,8 +760,6 @@ class CommunityBot(Plugin):
else:
on_banlist = await self.check_if_banned(evt.sender)
if on_banlist:
# self.log.debug(f"DEBUG user is on banlist!")
# ban this account in managed rooms, don't bother with anything else
await self.ban_this_user(evt.sender)
return
# passive sync of tracking db
@@ -764,7 +768,6 @@ class CommunityBot(Plugin):
# greeting activities
room_id = str(evt.room_id)
if room_id in self.config["greeting_rooms"]:
# just in case we got here even if the person is on the banlists
if on_banlist:
return
greeting_map = self.config["greetings"]
@@ -791,6 +794,132 @@ class CommunityBot(Plugin):
self.config["notification_room"], html=notification_message
)
# Human verification logic
if self.config["check_if_human"] and self.config["verification_phrases"]:
try:
# Check if verification is enabled for this room
verification_enabled = False
if isinstance(self.config["check_if_human"], bool):
verification_enabled = self.config["check_if_human"]
elif isinstance(self.config["check_if_human"], list):
verification_enabled = evt.room_id in self.config["check_if_human"]
if not verification_enabled:
return
# Get room name for greeting
roomname = "this room"
try:
roomnamestate = await self.client.get_state_event(evt.room_id, "m.room.name")
roomname = roomnamestate["name"]
except:
pass
# Check if user already has sufficient power level
try:
power_levels = await self.client.get_state_event(
evt.room_id, EventType.ROOM_POWER_LEVELS
)
user_level = power_levels.get_user_level(evt.sender)
events_default = power_levels.events_default
events = power_levels.events
# Get the required power level for sending messages
required_level = events.get(str(EventType.ROOM_MESSAGE), events_default)
# If user already has sufficient power level, skip verification
if user_level >= required_level:
self.log.debug(f"User {evt.sender} already has sufficient power level ({user_level} >= {required_level})")
return
except Exception as e:
self.log.error(f"Failed to check user power level: {e}")
return
# Create DM room with name
dm_room = await self.client.create_room(
preset=RoomCreatePreset.PRIVATE,
invitees=[evt.sender],
is_direct=True,
initial_state=[
{
"type": str(EventType.ROOM_NAME),
"content": {"name": f"{roomname} join verification check"}
}
]
)
# Select random verification phrase
verification_phrase = random.choice(self.config["verification_phrases"])
# Store verification state
self._verification_states[dm_room] = {
"user": evt.sender,
"target_room": evt.room_id,
"phrase": verification_phrase,
"attempts": self.config["verification_attempts"],
"required_level": required_level
}
# Send greeting
greeting = f"""Thank you for joining {roomname}. As an anti-spam measure, you must demonstrate that you are a real person before you can send messages in its rooms.
Please send a message to this chat with the phrase: "{verification_phrase}" """
await self.client.send_notice(dm_room, greeting)
except Exception as e:
self.log.error(f"Failed to start verification process: {e}")
@event.on(EventType.ROOM_MESSAGE)
async def handle_verification(self, evt: MessageEvent) -> None:
# Ignore messages from the bot itself
if evt.sender == self.client.mxid:
return
if evt.room_id not in self._verification_states:
return
state = self._verification_states[evt.room_id]
user_phrase = evt.content.body.strip().lower()
expected_phrase = state["phrase"].lower()
# Remove punctuation and compare
user_phrase = re.sub(r'[^\w\s]', '', user_phrase)
expected_phrase = re.sub(r'[^\w\s]', '', expected_phrase)
if user_phrase == expected_phrase:
try:
# Update power levels in target room
power_levels = await self.client.get_state_event(
state["target_room"], EventType.ROOM_POWER_LEVELS
)
power_levels.users[state["user"]] = state["required_level"]
await self.client.send_state_event(
state["target_room"], EventType.ROOM_POWER_LEVELS, power_levels
)
await self.client.send_notice(evt.room_id, "Success! My work here is done. You can leave this room now.")
except Exception as e:
await self.client.send_notice(
evt.room_id,
f"Something went wrong: {str(e)}. Please report this to the room moderators."
)
finally:
await self.client.leave_room(evt.room_id)
del self._verification_states[evt.room_id]
else:
state["attempts"] -= 1
if state["attempts"] <= 0:
await self.client.send_notice(
evt.room_id,
"You have run out of attempts. Please contact a room moderator for assistance."
)
await self.client.leave_room(evt.room_id)
del self._verification_states[evt.room_id]
else:
await self.client.send_notice(
evt.room_id,
f"Phrase does not match, you have {state['attempts']} tries remaining."
)
@event.on(EventType.ROOM_MESSAGE)
async def update_message_timestamp(self, evt: MessageEvent) -> None:
power_levels = await self.client.get_state_event(