Files
Tuwunel-Interceptor/app.py
T
2026-05-05 17:19:35 +02:00

345 lines
9.4 KiB
Python

import json
import time
import os
import logging
from collections import defaultdict
from datetime import datetime, timezone
import requests
from flask import Flask, request, Response
app = Flask(__name__)
# ============================================================
# CONFIG
# ============================================================
class Config:
REQUIRED_KEYS = ["TUWUNEL_URL", "LOCAL_DOMAIN"]
def __init__(self):
self.tuwunel_url = os.getenv("TUWUNEL_URL")
self.local_domain = os.getenv("LOCAL_DOMAIN")
self.admin_users = self._parse_list_env("ADMIN_USERS", [])
self.admin_token = os.getenv("ADMIN_TOKEN", "")
self.domain_whitelist = set(
self._parse_list_env("DOMAIN_WHITELIST", [])
)
self.block_external_dms = self._parse_bool("BLOCK_EXTERNAL_DMS", True)
self.allow_room_creation = self._parse_bool("ALLOW_ROOM_CREATION", False)
self.cache_ttl = int(os.getenv("CACHE_TTL_SECONDS", 604800))
self.rate_limit_per_minute = int(os.getenv("RATE_LIMIT_PER_MINUTE", 20))
self.http_timeout = int(os.getenv("HTTP_TIMEOUT", 5))
self.fail_open = self._parse_bool("FAIL_OPEN", True)
self.debug = self._parse_bool("DEBUG", False)
def validate(self):
return [k for k in self.REQUIRED_KEYS if not os.getenv(k)]
def _parse_list_env(self, key, default=None):
raw = os.getenv(key)
if not raw:
return default or []
return [x.strip() for x in raw.split(",") if x.strip()]
def _parse_bool(self, key, default):
val = os.getenv(key)
if val is None:
return default
return val.lower() in ("1", "true", "yes", "on")
# ============================================================
# LOGGING
# ============================================================
log_level = logging.DEBUG if os.getenv("DEBUG", "false").lower() == "true" else logging.INFO
logging.basicConfig(level=log_level)
# 🔥 remove HTTP noise
logging.getLogger("werkzeug").setLevel(logging.ERROR)
logger = logging.getLogger("matrix-interceptor")
def now_iso():
return datetime.now(timezone.utc).isoformat()
def log_event(event: str, **kwargs):
base = f"{now_iso()} EVENT={event}"
details = " ".join(f"{k}={v}" for k, v in kwargs.items())
logger.info(f"{base} {details}")
def debug_log(title, data):
if config.debug:
logger.debug(f"{title}: {json.dumps(data, indent=2, default=str)}")
# ============================================================
# INIT
# ============================================================
config = Config()
missing = config.validate()
if missing:
logger.error(f"Missing env vars: {missing}")
raise SystemExit(1)
# ============================================================
# STATE
# ============================================================
KNOWN_EXTERNAL_USERS = {}
RATE_LIMIT = defaultdict(list)
# ============================================================
# HELPERS
# ============================================================
def extract_domain(user_id):
try:
return user_id.split(":")[1].lower().rstrip(".")
except:
return "unknown"
def is_external(user_id):
return extract_domain(user_id) != config.local_domain
def is_known_user(user_id):
ts = KNOWN_EXTERNAL_USERS.get(user_id)
if not ts:
return False
if time.time() - ts > config.cache_ttl:
del KNOWN_EXTERNAL_USERS[user_id]
return False
return True
def remember_user(user_id):
KNOWN_EXTERNAL_USERS[user_id] = time.time()
def get_role(user_id):
return "admin" if user_id in config.admin_users else "user"
def is_local_room(room_id):
try:
return room_id.split(":")[1] == config.local_domain
except:
return False
# ============================================================
# SEED (Membership Cache)
# ============================================================
def seed_known_users():
if not config.admin_token:
logger.warning("No ADMIN_TOKEN → skipping seed")
return
logger.info("Seeding known external users...")
headers = {"Authorization": f"Bearer {config.admin_token}"}
seeded = 0
try:
rooms_res = requests.get(
f"{config.tuwunel_url}/_matrix/client/v3/joined_rooms",
headers=headers,
timeout=10
)
if rooms_res.status_code != 200:
return
for room_id in rooms_res.json().get("joined_rooms", []):
if not is_local_room(room_id):
continue
members_res = requests.get(
f"{config.tuwunel_url}/_matrix/client/v3/rooms/{room_id}/joined_members",
headers=headers,
timeout=10
)
if members_res.status_code != 200:
continue
members = members_res.json().get("joined", {})
for user_id in members.keys():
if is_external(user_id):
remember_user(user_id)
seeded += 1
logger.info(f"Seeded {seeded} external users (local membership)")
except Exception as e:
logger.error(f"Seed failed: {e}")
# ============================================================
# DM DETECTION
# ============================================================
def is_likely_dm_create(payload):
if payload.get("is_direct"):
return True
invite = payload.get("invite", [])
if isinstance(invite, list) and len(invite) == 1:
return True
if payload.get("visibility") == "private" and invite:
return True
if len(invite) == 1 and not payload.get("name"):
return True
return False
def is_likely_dm_event(event):
unsigned = event.get("unsigned", {})
state = unsigned.get("invite_room_state", [])
member_events = [
e for e in state if e.get("type") == "m.room.member"
]
return len(member_events) <= 2
# ============================================================
# FORWARD
# ============================================================
def forward_request(method, url, headers, body):
try:
proxy_headers = {"Content-Type": "application/json"}
if "Authorization" in headers:
proxy_headers["Authorization"] = headers["Authorization"]
res = requests.request(
method=method,
url=url,
headers=proxy_headers,
json=body,
timeout=config.http_timeout
)
return Response(res.content, res.status_code)
except Exception as e:
logger.error(f"proxy error: {e}")
if config.fail_open:
return Response(status=200)
return Response(status=502)
# ============================================================
# ROUTES
# ============================================================
@app.route("/healthz")
def health():
return {
"status": "ok",
"known_users": len(KNOWN_EXTERNAL_USERS)
}
@app.route('/_matrix/client/v3/createRoom', methods=['POST'])
def create_room():
payload = request.get_json(silent=True) or {}
user_id = request.headers.get("Authorization", "unknown")
is_dm = is_likely_dm_create(payload)
domain = extract_domain(user_id)
role = get_role(user_id)
allowed = (
user_id in config.admin_users
or config.allow_room_creation
or is_dm
)
if not allowed:
log_event(
"create_room_blocked",
actor=user_id,
domain=domain,
role=role,
reason="room_creation_disabled"
)
return Response(json.dumps({"errcode": "M_FORBIDDEN"}), status=403)
if is_dm:
log_event(
"create_room_allowed",
actor=user_id,
domain=domain,
role=role,
room_type="dm"
)
return forward_request(
"POST",
f"{config.tuwunel_url}/_matrix/client/v3/createRoom",
request.headers,
payload
)
@app.route('/_matrix/federation/v2/invite/<room_id>/<event_id>', methods=['PUT'])
def invite(room_id, event_id):
payload = request.get_json(force=True)
event = payload.get("event", {})
sender = event.get("sender", "")
if not sender:
return Response(status=400)
domain = extract_domain(sender)
is_dm = is_likely_dm_event(event)
if len(RATE_LIMIT[domain]) > config.rate_limit_per_minute:
return Response(status=429)
if domain in config.domain_whitelist:
remember_user(sender)
return forward_request(
"PUT",
f"{config.tuwunel_url}/_matrix/federation/v2/invite/{room_id}/{event_id}",
request.headers,
payload
)
if config.block_external_dms and is_dm and is_external(sender):
if not is_known_user(sender):
log_event(
"invite_blocked",
actor=sender,
domain=domain,
reason="unknown_external_user"
)
return Response(status=403)
remember_user(sender)
return forward_request(
"PUT",
f"{config.tuwunel_url}/_matrix/federation/v2/invite/{room_id}/{event_id}",
request.headers,
payload
)
# ============================================================
# START
# ============================================================
if __name__ == '__main__':
seed_known_users()
app.run(host='0.0.0.0', port=5000)