From 6ea6c1cfdacdca1a9fbb4f97ac9d4d93b9a4899f Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Fri, 22 May 2026 19:30:27 +0800 Subject: [PATCH] feat(group-monitor): add bot/user-session source option and tg api session settings --- .env.example | 5 + README.md | 10 ++ app.py | 248 +++++++++++++++++++++++--- config.example.yaml | 1 + requirements.txt | 1 + tests/test_monitor_message_cleanup.py | 20 +++ 6 files changed, 264 insertions(+), 21 deletions(-) diff --git a/.env.example b/.env.example index 39601e9..a2a8ec7 100644 --- a/.env.example +++ b/.env.example @@ -14,3 +14,8 @@ WEB_PANEL_PORT=8765 WEB_PANEL_USER=admin WEB_PANEL_PASSWORD=change-me WEB_PANEL_SESSION_SECRET= + +# Optional: user-session listener for TG groups where bot cannot be added. +TG_API_ID= +TG_API_HASH= +TG_API_SESSION= diff --git a/README.md b/README.md index 62cbfb9..99dc741 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。 - TG 群监听新增“已发现群聊”:自动显示 Bot 收到过消息的群聊 `chat_id`,可一键创建监听。 +- TG 群监听新增“监听来源”选项:`Bot` / `用户会话`(可用于 Bot 无法加入的群)。 +- 设置页新增 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 可视化配置;用于用户会话监听。 - 新增 `/update` 安全更新流程:显示本地/远端 commit、ahead/behind、工作区状态;仅允许 `ff-only` 更新。 - 更新前若检测到本地未提交改动,会拒绝更新;避免覆盖本地代码。 - 新增“回滚上次更新”按钮:更新前自动记录回滚点,可一键回滚并重启。 @@ -301,6 +303,9 @@ curl http://127.0.0.1:8765/health | `WEB_PANEL_USER` | 面板用户名 | | `WEB_PANEL_PASSWORD` | 面板密码 | | `WEB_PANEL_SESSION_SECRET` | Session Secret,留空会自动生成并写回 `.env` | +| `TG_API_ID` | (可选)Telegram API ID,用于“TG 群监听=用户会话” | +| `TG_API_HASH` | (可选)Telegram API Hash,用于“TG 群监听=用户会话” | +| `TG_API_SESSION` | (可选)Telethon StringSession,用于“TG 群监听=用户会话” | ### `config.yaml` @@ -329,6 +334,7 @@ TG 群关键词监听(可选,默认关闭): group_monitors: - name: TG 群关键词监听 enabled: false + listen_source: bot chat_id: -1001234567890 keywords: - VPS @@ -350,6 +356,9 @@ group_monitors: - 命中 `keywords` 且未命中 `exclude_keywords` 时,会给管理员发送摘要。 - TG 群监听页面会展示“已发现群聊”(Bot 收到过消息的群),可直接点“用此群创建监听”自动填入 `chat_id`。 +- `listen_source` 支持: + - `bot`:默认,使用 Bot 接收群消息(需把 Bot 拉进群) + - `user_session`:使用用户会话接收群消息(适合 Bot 无法入群) - `summary_mode` 支持: - `template`:固定模板摘要(默认) - `ai`:调用 AI 生成摘要(在 TG 群监听页面可视化配置) @@ -360,6 +369,7 @@ group_monitors: - `ai_min_interval_seconds`:同一个群监听最小推送间隔(防刷屏) - `ai_dedupe_window_seconds`:相同内容摘要去重窗口(防重复) - 机器人想收到群里普通消息,需要在 `@BotFather` 执行 `/setprivacy` 关闭隐私模式。 +- 若使用 `listen_source=user_session`,需在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 后重启。 更新代码(`/update`)已支持安全检查: - 显示本地/远端 commit、ahead/behind、工作区是否干净 diff --git a/app.py b/app.py index 64d166c..2cabde0 100644 --- a/app.py +++ b/app.py @@ -24,6 +24,7 @@ from contextlib import closing from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path +from types import SimpleNamespace from typing import Any from urllib.parse import quote_plus, urljoin @@ -44,6 +45,14 @@ from fastapi import Depends, FastAPI, Form, HTTPException, Request, Response, st from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse import uvicorn +try: + from telethon import TelegramClient, events + from telethon.sessions import StringSession +except Exception: # pragma: no cover - optional dependency + TelegramClient = None + events = None + StringSession = None + BASE_DIR = Path(__file__).resolve().parent DB_PATH = BASE_DIR / "tg-watchbot.sqlite3" CONFIG_PATH = BASE_DIR / "config.yaml" @@ -68,6 +77,8 @@ config: dict[str, Any] = {} rate_buckets: dict[int, list[float]] = {} pending_sendpic: dict[int, dict[str, Any]] = {} scheduler_ref: AsyncIOScheduler | None = None +user_session_listener_task: asyncio.Task | None = None +user_session_client: Any = None GROUP_SUMMARY_MAX_CHARS = 800 @@ -442,25 +453,41 @@ def group_monitors() -> list[dict[str, Any]]: ai_interface = str(row.get("ai_interface") or "responses").strip().lower() or "responses" if ai_interface not in {"responses", "chat"}: ai_interface = "responses" + listen_source = str(row.get("listen_source") or "bot").strip().lower() or "bot" + if listen_source not in {"bot", "user_session"}: + listen_source = "bot" keywords = [str(k).strip() for k in (row.get("keywords") or []) if str(k).strip()] exclude_keywords = [str(k).strip() for k in (row.get("exclude_keywords") or []) if str(k).strip()] monitors.append( { "name": str(row.get("name") or str(chat_id)), "chat_id": chat_id, + "listen_source": listen_source, "summary_mode": summary_mode, "keywords": keywords, "exclude_keywords": exclude_keywords, "notify_telegram": bool(row.get("notify_telegram", True)), "ai_base_url": str(row.get("ai_base_url") or "").strip(), "ai_api_key": str(row.get("ai_api_key") or "").strip(), - "ai_model": str(row.get("ai_model") or "gpt-4o-mini").strip(), - "ai_interface": ai_interface, - "ai_temperature": safe_float(row.get("ai_temperature", 0.2), 0.2), - "ai_timeout_seconds": max(1, safe_int(row.get("ai_timeout_seconds", 30), 30)), - "ai_prompt": str(row.get("ai_prompt") or "").strip(), - "ai_min_interval_seconds": max(0, safe_int(row.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)), - "ai_dedupe_window_seconds": max(0, safe_int(row.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)), + "ai_model": str(row.get("ai_model") or "gpt-4o-mini").strip(), + "ai_interface": ai_interface, + "ai_temperature": safe_float(row.get("ai_temperature", 0.2), 0.2), + "ai_timeout_seconds": max(1, safe_int(row.get("ai_timeout_seconds", 30), 30)), + "ai_prompt": str(row.get("ai_prompt") or "").strip(), + "ai_min_interval_seconds": max( + 0, + safe_int( + row.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), + DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS, + ), + ), + "ai_dedupe_window_seconds": max( + 0, + safe_int( + row.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), + DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS, + ), + ), } ) return monitors @@ -473,6 +500,44 @@ def group_monitor_for_chat(chat_id: int) -> dict[str, Any] | None: return None +def group_monitor_for_chat_and_source(chat_id: int, listen_source: str) -> dict[str, Any] | None: + source = (listen_source or "bot").strip().lower() or "bot" + for monitor in group_monitors(): + if int(monitor["chat_id"]) != int(chat_id): + continue + if str(monitor.get("listen_source") or "bot") == source: + return monitor + return None + + +def group_monitors_need_user_session() -> bool: + for monitor in group_monitors(): + if str(monitor.get("listen_source") or "bot") == "user_session": + return True + return False + + +def user_session_config() -> tuple[str, str, str]: + load_dotenv(ENV_PATH, override=True) + api_id = os.getenv("TG_API_ID", "").strip() + api_hash = os.getenv("TG_API_HASH", "").strip() + session = os.getenv("TG_API_SESSION", "").strip() + return api_id, api_hash, session + + +def user_session_ready() -> bool: + api_id, api_hash, session = user_session_config() + if not api_id or not api_hash: + return False + if not session: + return False + try: + int(api_id) + except Exception: + return False + return True + + def group_message_text(message: Message) -> str: parts = [message.text or "", message.caption or ""] if getattr(message, "reply_to_message", None): @@ -718,8 +783,8 @@ def group_monitor_allow_send(monitor: dict[str, Any], fingerprint: str, now_ts: return True, "" -async def handle_group_keyword_message(message: Message) -> bool: - monitor = group_monitor_for_chat(int(message.chat.id)) +async def handle_group_keyword_message(message: Message, listen_source: str = "bot") -> bool: + monitor = group_monitor_for_chat_and_source(int(message.chat.id), listen_source) if not monitor: return False text = group_message_text(message) @@ -748,6 +813,93 @@ async def handle_group_keyword_message(message: Message) -> bool: return True +def build_pseudo_message_from_user_session_event(event: Any) -> Any | None: + msg = getattr(event, "message", None) + if msg is None: + return None + try: + chat_id = int(getattr(event, "chat_id")) + except Exception: + return None + text = str(getattr(msg, "text", "") or getattr(msg, "message", "") or "").strip() + caption = str(getattr(msg, "caption", "") or "").strip() + content = text or caption + if not content: + return None + chat_title = str(getattr(getattr(event, "chat", None), "title", "") or str(chat_id)) + chat_username = str(getattr(getattr(event, "chat", None), "username", "") or "") + sender_id = getattr(event, "sender_id", None) + from_user = SimpleNamespace( + id=int(sender_id) if sender_id is not None else 0, + first_name="", + last_name="", + username="", + ) + return SimpleNamespace( + chat=SimpleNamespace( + id=chat_id, + type="supergroup" if str(chat_id).startswith("-100") else "group", + title=chat_title, + username=chat_username, + ), + from_user=from_user, + text=text or content, + caption=caption or None, + reply_to_message=None, + message_id=int(getattr(msg, "id", 0) or 0), + content_type="text", + ) + + +async def run_user_session_group_listener() -> None: + global user_session_client + if TelegramClient is None or StringSession is None: + logger.warning("user-session group listener skipped: telethon is not installed") + return + api_id_raw, api_hash, session = user_session_config() + if not api_id_raw or not api_hash or not session: + logger.warning("user-session group listener skipped: TG_API_ID/TG_API_HASH/TG_API_SESSION not complete") + return + try: + api_id = int(api_id_raw) + except Exception: + logger.warning("user-session group listener skipped: TG_API_ID must be integer") + return + try: + client = TelegramClient(StringSession(session), api_id, api_hash) + user_session_client = client + + @client.on(events.NewMessage(incoming=True)) # type: ignore[misc] + async def on_new_group_message(event: Any) -> None: + if not (getattr(event, "is_group", False) or getattr(event, "is_channel", False)): + return + pseudo = build_pseudo_message_from_user_session_event(event) + if pseudo is None: + return + record_discovered_group_chat_data( + int(pseudo.chat.id), + str(getattr(pseudo.chat, "title", "") or pseudo.chat.id), + str(getattr(pseudo.chat, "username", "") or ""), + ) + await handle_group_keyword_message(pseudo, listen_source="user_session") + + await client.start() + logger.info("user-session group listener started") + await client.run_until_disconnected() + except asyncio.CancelledError: + logger.info("user-session group listener cancelled") + raise + except Exception: + logger.exception("user-session group listener crashed") + finally: + try: + if user_session_client is not None: + await user_session_client.disconnect() + except Exception: + logger.exception("user-session listener disconnect failed") + user_session_client = None + + def update_spam_keywords(action: str, word: str) -> list[str]: cfg = cfg_load_fresh() bot_cfg = cfg.setdefault("bot", {}) @@ -847,12 +999,7 @@ def get_monitor_status_badge(status: dict[str, Any] | None) -> str: return "正常" -def record_discovered_group_chat(message: Message) -> None: - if message.chat.type not in {"group", "supergroup"}: - return - chat_id = int(message.chat.id) - title = str(getattr(message.chat, "title", "") or str(chat_id)) - username = str(getattr(message.chat, "username", "") or "") +def record_discovered_group_chat_data(chat_id: int, title: str, username: str = "") -> None: with closing(db()) as conn: conn.execute( """ @@ -864,11 +1011,20 @@ def record_discovered_group_chat(message: Message) -> None: last_seen_at=excluded.last_seen_at, active=1 """, - (chat_id, title, username, now_iso()), + (int(chat_id), str(title or chat_id), str(username or ""), now_iso()), ) conn.commit() +def record_discovered_group_chat(message: Message) -> None: + if message.chat.type not in {"group", "supergroup"}: + return + chat_id = int(message.chat.id) + title = str(getattr(message.chat, "title", "") or str(chat_id)) + username = str(getattr(message.chat, "username", "") or "") + record_discovered_group_chat_data(chat_id, title, username) + + def list_discovered_group_chats(limit: int = 200) -> list[dict[str, Any]]: with closing(db()) as conn: rows = conn.execute( @@ -1237,7 +1393,7 @@ async def user_message(message: Message) -> None: if message.chat.type in {"group", "supergroup"}: try: record_discovered_group_chat(message) - await handle_group_keyword_message(message) + await handle_group_keyword_message(message, listen_source="bot") except Exception: logger.exception("group keyword handling failed chat_id=%s message_id=%s", message.chat.id, message.message_id) logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type) @@ -1783,6 +1939,9 @@ def env_values() -> dict[str, str]: "WEB_PANEL_USER": os.getenv("WEB_PANEL_USER", "admin"), "WEB_PANEL_PASSWORD": os.getenv("WEB_PANEL_PASSWORD", "admin"), "WEB_PANEL_SESSION_SECRET": os.getenv("WEB_PANEL_SESSION_SECRET", ""), + "TG_API_ID": os.getenv("TG_API_ID", ""), + "TG_API_HASH": os.getenv("TG_API_HASH", ""), + "TG_API_SESSION": os.getenv("TG_API_SESSION", ""), } @@ -1807,6 +1966,9 @@ def write_env_values(values: dict[str, str]) -> None: f"WEB_PANEL_USER={values.get('WEB_PANEL_USER','admin')}", f"WEB_PANEL_PASSWORD={values.get('WEB_PANEL_PASSWORD','admin')}", f"WEB_PANEL_SESSION_SECRET={session_value}", + f"TG_API_ID={values.get('TG_API_ID','')}", + f"TG_API_HASH={values.get('TG_API_HASH','')}", + f"TG_API_SESSION={values.get('TG_API_SESSION','')}", "", ] ENV_PATH.write_text("\n".join(lines), encoding="utf-8") @@ -1841,6 +2003,10 @@ def cfg_save(new_cfg: dict[str, Any]) -> None: gm.setdefault("keywords", []) gm.setdefault("exclude_keywords", []) gm.setdefault("notify_telegram", True) + listen_source = str(gm.get("listen_source") or "bot").strip().lower() or "bot" + if listen_source not in {"bot", "user_session"}: + listen_source = "bot" + gm["listen_source"] = listen_source summary_mode = str(gm.get("summary_mode") or "template").strip().lower() or "template" if summary_mode not in {"template", "ai"}: summary_mode = "template" @@ -2140,6 +2306,7 @@ def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = N "ai_prompt": "", "ai_min_interval_seconds": DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS, "ai_dedupe_window_seconds": DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS, + "listen_source": "bot", } action = "/group-monitors/save" if idx is not None else "/group-monitors/create" hidden = f"" if idx is not None else "" @@ -2150,9 +2317,11 @@ def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = N
监听来源选“用户会话”时,需要在“Bot / 面板设置”填写 TG_API_ID、TG_API_HASH、TG_API_SESSION,并重启。
{chat_id}| # | 监听 | 状态 | 关键词 | 排除词 | 操作 |
|---|
Bot 在群里收到消息后会自动记录群信息。可直接选择群聊创建监听。
" @@ -2302,6 +2484,7 @@ def create_panel_app() -> FastAPI: exclude_keywords: str, enabled: str | None, notify_telegram: str | None, + listen_source: str, summary_mode: str, ai_base_url: str, ai_api_key: str, @@ -2322,6 +2505,9 @@ def create_panel_app() -> FastAPI: parsed_summary_mode = (summary_mode or "template").strip().lower() or "template" if parsed_summary_mode not in {"template", "ai"}: parsed_summary_mode = "template" + parsed_listen_source = (listen_source or "bot").strip().lower() or "bot" + if parsed_listen_source not in {"bot", "user_session"}: + parsed_listen_source = "bot" parsed_ai_interface = (ai_interface or "responses").strip().lower() or "responses" if parsed_ai_interface not in {"responses", "chat"}: parsed_ai_interface = "responses" @@ -2332,6 +2518,7 @@ def create_panel_app() -> FastAPI: "keywords": parse_lines(keywords), "exclude_keywords": parse_lines(exclude_keywords), "notify_telegram": bool(notify_telegram), + "listen_source": parsed_listen_source, "summary_mode": parsed_summary_mode, "ai_base_url": ai_base_url.strip(), "ai_api_key": ai_api_key.strip(), @@ -2364,6 +2551,7 @@ def create_panel_app() -> FastAPI: enabled: str | None = Form(None), notify_telegram: str | None = Form(None), summary_mode: str = Form("template"), + listen_source: str = Form("bot"), ai_base_url: str = Form(""), ai_api_key: str = Form(""), ai_model: str = Form("gpt-4o-mini"), @@ -2382,6 +2570,7 @@ def create_panel_app() -> FastAPI: exclude_keywords, enabled, notify_telegram, + listen_source, summary_mode, ai_base_url, ai_api_key, @@ -2405,6 +2594,7 @@ def create_panel_app() -> FastAPI: enabled: str | None = Form(None), notify_telegram: str | None = Form(None), summary_mode: str = Form("template"), + listen_source: str = Form("bot"), ai_base_url: str = Form(""), ai_api_key: str = Form(""), ai_model: str = Form("gpt-4o-mini"), @@ -2423,6 +2613,7 @@ def create_panel_app() -> FastAPI: exclude_keywords, enabled, notify_telegram, + listen_source, summary_mode, ai_base_url, ai_api_key, @@ -2613,6 +2804,9 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" body = f"""这里和“Bot / 面板设置”共用同一份 .env。修改 Token、管理员 ID、端口、账号或密码后不会自动重启,需要手动重启服务。
| 用户 | 状态 | 备注 | 操作 |
|---|