diff --git a/.env.example b/.env.example index 39601e9..a2a8ec7 100644 --- a/.env.example +++ b/.env.example @@ -14,3 +14,8 @@ WEB_PANEL_PORT=8765 WEB_PANEL_USER=admin WEB_PANEL_PASSWORD=change-me WEB_PANEL_SESSION_SECRET= + +# Optional: user-session listener for TG groups where bot cannot be added. +TG_API_ID= +TG_API_HASH= +TG_API_SESSION= diff --git a/README.md b/README.md index 62cbfb9..99dc741 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。 - TG 群监听新增“已发现群聊”:自动显示 Bot 收到过消息的群聊 `chat_id`,可一键创建监听。 +- TG 群监听新增“监听来源”选项:`Bot` / `用户会话`(可用于 Bot 无法加入的群)。 +- 设置页新增 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 可视化配置;用于用户会话监听。 - 新增 `/update` 安全更新流程:显示本地/远端 commit、ahead/behind、工作区状态;仅允许 `ff-only` 更新。 - 更新前若检测到本地未提交改动,会拒绝更新;避免覆盖本地代码。 - 新增“回滚上次更新”按钮:更新前自动记录回滚点,可一键回滚并重启。 @@ -301,6 +303,9 @@ curl http://127.0.0.1:8765/health | `WEB_PANEL_USER` | 面板用户名 | | `WEB_PANEL_PASSWORD` | 面板密码 | | `WEB_PANEL_SESSION_SECRET` | Session Secret,留空会自动生成并写回 `.env` | +| `TG_API_ID` | (可选)Telegram API ID,用于“TG 群监听=用户会话” | +| `TG_API_HASH` | (可选)Telegram API Hash,用于“TG 群监听=用户会话” | +| `TG_API_SESSION` | (可选)Telethon StringSession,用于“TG 群监听=用户会话” | ### `config.yaml` @@ -329,6 +334,7 @@ TG 群关键词监听(可选,默认关闭): group_monitors: - name: TG 群关键词监听 enabled: false + listen_source: bot chat_id: -1001234567890 keywords: - VPS @@ -350,6 +356,9 @@ group_monitors: - 命中 `keywords` 且未命中 `exclude_keywords` 时,会给管理员发送摘要。 - TG 群监听页面会展示“已发现群聊”(Bot 收到过消息的群),可直接点“用此群创建监听”自动填入 `chat_id`。 +- `listen_source` 支持: + - `bot`:默认,使用 Bot 接收群消息(需把 Bot 拉进群) + - `user_session`:使用用户会话接收群消息(适合 Bot 无法入群) - `summary_mode` 支持: - `template`:固定模板摘要(默认) - `ai`:调用 AI 生成摘要(在 TG 群监听页面可视化配置) @@ -360,6 +369,7 @@ group_monitors: - `ai_min_interval_seconds`:同一个群监听最小推送间隔(防刷屏) - `ai_dedupe_window_seconds`:相同内容摘要去重窗口(防重复) - 机器人想收到群里普通消息,需要在 `@BotFather` 执行 `/setprivacy` 关闭隐私模式。 +- 若使用 `listen_source=user_session`,需在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 后重启。 更新代码(`/update`)已支持安全检查: - 显示本地/远端 commit、ahead/behind、工作区是否干净 diff --git a/app.py b/app.py index 64d166c..2cabde0 100644 --- a/app.py +++ b/app.py @@ -24,6 +24,7 @@ from contextlib import closing from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path +from types import SimpleNamespace from typing import Any from urllib.parse import quote_plus, urljoin @@ -44,6 +45,14 @@ from fastapi import Depends, FastAPI, Form, HTTPException, Request, Response, st from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse import uvicorn +try: + from telethon import TelegramClient, events + from telethon.sessions import StringSession +except Exception: # pragma: no cover - optional dependency + TelegramClient = None + events = None + StringSession = None + BASE_DIR = Path(__file__).resolve().parent DB_PATH = BASE_DIR / "tg-watchbot.sqlite3" CONFIG_PATH = BASE_DIR / "config.yaml" @@ -68,6 +77,8 @@ config: dict[str, Any] = {} rate_buckets: dict[int, list[float]] = {} pending_sendpic: dict[int, dict[str, Any]] = {} scheduler_ref: AsyncIOScheduler | None = None +user_session_listener_task: asyncio.Task | None = None +user_session_client: Any = None GROUP_SUMMARY_MAX_CHARS = 800 @@ -442,25 +453,41 @@ def group_monitors() -> list[dict[str, Any]]: ai_interface = str(row.get("ai_interface") or "responses").strip().lower() or "responses" if ai_interface not in {"responses", "chat"}: ai_interface = "responses" + listen_source = str(row.get("listen_source") or "bot").strip().lower() or "bot" + if listen_source not in {"bot", "user_session"}: + listen_source = "bot" keywords = [str(k).strip() for k in (row.get("keywords") or []) if str(k).strip()] exclude_keywords = [str(k).strip() for k in (row.get("exclude_keywords") or []) if str(k).strip()] monitors.append( { "name": str(row.get("name") or str(chat_id)), "chat_id": chat_id, + "listen_source": listen_source, "summary_mode": summary_mode, "keywords": keywords, "exclude_keywords": exclude_keywords, "notify_telegram": bool(row.get("notify_telegram", True)), "ai_base_url": str(row.get("ai_base_url") or "").strip(), "ai_api_key": str(row.get("ai_api_key") or "").strip(), - "ai_model": str(row.get("ai_model") or "gpt-4o-mini").strip(), - "ai_interface": ai_interface, - "ai_temperature": safe_float(row.get("ai_temperature", 0.2), 0.2), - "ai_timeout_seconds": max(1, safe_int(row.get("ai_timeout_seconds", 30), 30)), - "ai_prompt": str(row.get("ai_prompt") or "").strip(), - "ai_min_interval_seconds": max(0, safe_int(row.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)), - "ai_dedupe_window_seconds": max(0, safe_int(row.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)), + "ai_model": str(row.get("ai_model") or "gpt-4o-mini").strip(), + "ai_interface": ai_interface, + "ai_temperature": safe_float(row.get("ai_temperature", 0.2), 0.2), + "ai_timeout_seconds": max(1, safe_int(row.get("ai_timeout_seconds", 30), 30)), + "ai_prompt": str(row.get("ai_prompt") or "").strip(), + "ai_min_interval_seconds": max( + 0, + safe_int( + row.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), + DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS, + ), + ), + "ai_dedupe_window_seconds": max( + 0, + safe_int( + row.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), + DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS, + ), + ), } ) return monitors @@ -473,6 +500,44 @@ def group_monitor_for_chat(chat_id: int) -> dict[str, Any] | None: return None +def group_monitor_for_chat_and_source(chat_id: int, listen_source: str) -> dict[str, Any] | None: + source = (listen_source or "bot").strip().lower() or "bot" + for monitor in group_monitors(): + if int(monitor["chat_id"]) != int(chat_id): + continue + if str(monitor.get("listen_source") or "bot") == source: + return monitor + return None + + +def group_monitors_need_user_session() -> bool: + for monitor in group_monitors(): + if str(monitor.get("listen_source") or "bot") == "user_session": + return True + return False + + +def user_session_config() -> tuple[str, str, str]: + load_dotenv(ENV_PATH, override=True) + api_id = os.getenv("TG_API_ID", "").strip() + api_hash = os.getenv("TG_API_HASH", "").strip() + session = os.getenv("TG_API_SESSION", "").strip() + return api_id, api_hash, session + + +def user_session_ready() -> bool: + api_id, api_hash, session = user_session_config() + if not api_id or not api_hash: + return False + if not session: + return False + try: + int(api_id) + except Exception: + return False + return True + + def group_message_text(message: Message) -> str: parts = [message.text or "", message.caption or ""] if getattr(message, "reply_to_message", None): @@ -718,8 +783,8 @@ def group_monitor_allow_send(monitor: dict[str, Any], fingerprint: str, now_ts: return True, "" -async def handle_group_keyword_message(message: Message) -> bool: - monitor = group_monitor_for_chat(int(message.chat.id)) +async def handle_group_keyword_message(message: Message, listen_source: str = "bot") -> bool: + monitor = group_monitor_for_chat_and_source(int(message.chat.id), listen_source) if not monitor: return False text = group_message_text(message) @@ -748,6 +813,93 @@ async def handle_group_keyword_message(message: Message) -> bool: return True +def build_pseudo_message_from_user_session_event(event: Any) -> Any | None: + msg = getattr(event, "message", None) + if msg is None: + return None + try: + chat_id = int(getattr(event, "chat_id")) + except Exception: + return None + text = str(getattr(msg, "text", "") or getattr(msg, "message", "") or "").strip() + caption = str(getattr(msg, "caption", "") or "").strip() + content = text or caption + if not content: + return None + chat_title = str(getattr(getattr(event, "chat", None), "title", "") or str(chat_id)) + chat_username = str(getattr(getattr(event, "chat", None), "username", "") or "") + sender_id = getattr(event, "sender_id", None) + from_user = SimpleNamespace( + id=int(sender_id) if sender_id is not None else 0, + first_name="", + last_name="", + username="", + ) + return SimpleNamespace( + chat=SimpleNamespace( + id=chat_id, + type="supergroup" if str(chat_id).startswith("-100") else "group", + title=chat_title, + username=chat_username, + ), + from_user=from_user, + text=text or content, + caption=caption or None, + reply_to_message=None, + message_id=int(getattr(msg, "id", 0) or 0), + content_type="text", + ) + + +async def run_user_session_group_listener() -> None: + global user_session_client + if TelegramClient is None or StringSession is None: + logger.warning("user-session group listener skipped: telethon is not installed") + return + api_id_raw, api_hash, session = user_session_config() + if not api_id_raw or not api_hash or not session: + logger.warning("user-session group listener skipped: TG_API_ID/TG_API_HASH/TG_API_SESSION not complete") + return + try: + api_id = int(api_id_raw) + except Exception: + logger.warning("user-session group listener skipped: TG_API_ID must be integer") + return + try: + client = TelegramClient(StringSession(session), api_id, api_hash) + user_session_client = client + + @client.on(events.NewMessage(incoming=True)) # type: ignore[misc] + async def on_new_group_message(event: Any) -> None: + if not (getattr(event, "is_group", False) or getattr(event, "is_channel", False)): + return + pseudo = build_pseudo_message_from_user_session_event(event) + if pseudo is None: + return + record_discovered_group_chat_data( + int(pseudo.chat.id), + str(getattr(pseudo.chat, "title", "") or pseudo.chat.id), + str(getattr(pseudo.chat, "username", "") or ""), + ) + await handle_group_keyword_message(pseudo, listen_source="user_session") + + await client.start() + logger.info("user-session group listener started") + await client.run_until_disconnected() + except asyncio.CancelledError: + logger.info("user-session group listener cancelled") + raise + except Exception: + logger.exception("user-session group listener crashed") + finally: + try: + if user_session_client is not None: + await user_session_client.disconnect() + except Exception: + logger.exception("user-session listener disconnect failed") + user_session_client = None + + def update_spam_keywords(action: str, word: str) -> list[str]: cfg = cfg_load_fresh() bot_cfg = cfg.setdefault("bot", {}) @@ -847,12 +999,7 @@ def get_monitor_status_badge(status: dict[str, Any] | None) -> str: return "正常" -def record_discovered_group_chat(message: Message) -> None: - if message.chat.type not in {"group", "supergroup"}: - return - chat_id = int(message.chat.id) - title = str(getattr(message.chat, "title", "") or str(chat_id)) - username = str(getattr(message.chat, "username", "") or "") +def record_discovered_group_chat_data(chat_id: int, title: str, username: str = "") -> None: with closing(db()) as conn: conn.execute( """ @@ -864,11 +1011,20 @@ def record_discovered_group_chat(message: Message) -> None: last_seen_at=excluded.last_seen_at, active=1 """, - (chat_id, title, username, now_iso()), + (int(chat_id), str(title or chat_id), str(username or ""), now_iso()), ) conn.commit() +def record_discovered_group_chat(message: Message) -> None: + if message.chat.type not in {"group", "supergroup"}: + return + chat_id = int(message.chat.id) + title = str(getattr(message.chat, "title", "") or str(chat_id)) + username = str(getattr(message.chat, "username", "") or "") + record_discovered_group_chat_data(chat_id, title, username) + + def list_discovered_group_chats(limit: int = 200) -> list[dict[str, Any]]: with closing(db()) as conn: rows = conn.execute( @@ -1237,7 +1393,7 @@ async def user_message(message: Message) -> None: if message.chat.type in {"group", "supergroup"}: try: record_discovered_group_chat(message) - await handle_group_keyword_message(message) + await handle_group_keyword_message(message, listen_source="bot") except Exception: logger.exception("group keyword handling failed chat_id=%s message_id=%s", message.chat.id, message.message_id) logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type) @@ -1783,6 +1939,9 @@ def env_values() -> dict[str, str]: "WEB_PANEL_USER": os.getenv("WEB_PANEL_USER", "admin"), "WEB_PANEL_PASSWORD": os.getenv("WEB_PANEL_PASSWORD", "admin"), "WEB_PANEL_SESSION_SECRET": os.getenv("WEB_PANEL_SESSION_SECRET", ""), + "TG_API_ID": os.getenv("TG_API_ID", ""), + "TG_API_HASH": os.getenv("TG_API_HASH", ""), + "TG_API_SESSION": os.getenv("TG_API_SESSION", ""), } @@ -1807,6 +1966,9 @@ def write_env_values(values: dict[str, str]) -> None: f"WEB_PANEL_USER={values.get('WEB_PANEL_USER','admin')}", f"WEB_PANEL_PASSWORD={values.get('WEB_PANEL_PASSWORD','admin')}", f"WEB_PANEL_SESSION_SECRET={session_value}", + f"TG_API_ID={values.get('TG_API_ID','')}", + f"TG_API_HASH={values.get('TG_API_HASH','')}", + f"TG_API_SESSION={values.get('TG_API_SESSION','')}", "", ] ENV_PATH.write_text("\n".join(lines), encoding="utf-8") @@ -1841,6 +2003,10 @@ def cfg_save(new_cfg: dict[str, Any]) -> None: gm.setdefault("keywords", []) gm.setdefault("exclude_keywords", []) gm.setdefault("notify_telegram", True) + listen_source = str(gm.get("listen_source") or "bot").strip().lower() or "bot" + if listen_source not in {"bot", "user_session"}: + listen_source = "bot" + gm["listen_source"] = listen_source summary_mode = str(gm.get("summary_mode") or "template").strip().lower() or "template" if summary_mode not in {"template", "ai"}: summary_mode = "template" @@ -2140,6 +2306,7 @@ def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = N "ai_prompt": "", "ai_min_interval_seconds": DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS, "ai_dedupe_window_seconds": DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS, + "listen_source": "bot", } action = "/group-monitors/save" if idx is not None else "/group-monitors/create" hidden = f"" if idx is not None else "" @@ -2150,9 +2317,11 @@ def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = N
+
+

监听来源选“用户会话”时,需要在“Bot / 面板设置”填写 TG_API_ID、TG_API_HASH、TG_API_SESSION,并重启。

@@ -2230,10 +2399,11 @@ def create_panel_app() -> FastAPI: continue enabled = "启用" if gm.get("enabled", True) else "关闭" notify = "推送 TG" if gm.get("notify_telegram", True) else "仅记录" + source = "Bot" if str(gm.get("listen_source") or "bot") == "bot" else "用户会话" kws = ", ".join([str(x) for x in (gm.get("keywords") or [])]) or "-" exs = ", ".join([str(x) for x in (gm.get("exclude_keywords") or [])]) or "-" trs.append( - f"""{i+1}{html_escape(gm.get('name') or gm.get('chat_id') or '-')}
{html_escape(gm.get('chat_id') or '-')}{enabled}
{notify}{html_escape(kws)}{html_escape(exs)}编辑 删除""" + f"""{i+1}{html_escape(gm.get('name') or gm.get('chat_id') or '-')}
{html_escape(gm.get('chat_id') or '-')}{enabled}
{notify} · {source}{html_escape(kws)}{html_escape(exs)}编辑 删除""" ) discovered_rows = [] for row in discovered: @@ -2244,11 +2414,23 @@ def create_panel_app() -> FastAPI: discovered_rows.append( f"""{html_escape(title)}
{html_escape(username)}{chat_id}{html_escape(row['last_seen_at'])}用此群创建监听""" ) + use_user_session = any( + isinstance(gm, dict) and str(gm.get("listen_source") or "bot") == "user_session" + for gm in rows + ) + user_session_notice = "" + if use_user_session: + if TelegramClient is None: + user_session_notice = "
检测到“用户会话”监听,但未安装 telethon;该来源不会生效。
" + elif not user_session_ready(): + user_session_notice = "
检测到“用户会话”监听,但 TG_API_ID / TG_API_HASH / TG_API_SESSION 未完整填写;该来源不会生效。
" body = ( "

TG 群关键词监听

" "

监听选定群并在命中关键词时给管理员发送摘要。" "需要在 @BotFather 关闭 /setprivacy 才能接收群普通消息。

" "
" + + user_session_notice + + "" + "".join(trs) + "
#监听状态关键词排除词操作
" + "

已发现群聊

Bot 在群里收到消息后会自动记录群信息。可直接选择群聊创建监听。

" @@ -2302,6 +2484,7 @@ def create_panel_app() -> FastAPI: exclude_keywords: str, enabled: str | None, notify_telegram: str | None, + listen_source: str, summary_mode: str, ai_base_url: str, ai_api_key: str, @@ -2322,6 +2505,9 @@ def create_panel_app() -> FastAPI: parsed_summary_mode = (summary_mode or "template").strip().lower() or "template" if parsed_summary_mode not in {"template", "ai"}: parsed_summary_mode = "template" + parsed_listen_source = (listen_source or "bot").strip().lower() or "bot" + if parsed_listen_source not in {"bot", "user_session"}: + parsed_listen_source = "bot" parsed_ai_interface = (ai_interface or "responses").strip().lower() or "responses" if parsed_ai_interface not in {"responses", "chat"}: parsed_ai_interface = "responses" @@ -2332,6 +2518,7 @@ def create_panel_app() -> FastAPI: "keywords": parse_lines(keywords), "exclude_keywords": parse_lines(exclude_keywords), "notify_telegram": bool(notify_telegram), + "listen_source": parsed_listen_source, "summary_mode": parsed_summary_mode, "ai_base_url": ai_base_url.strip(), "ai_api_key": ai_api_key.strip(), @@ -2364,6 +2551,7 @@ def create_panel_app() -> FastAPI: enabled: str | None = Form(None), notify_telegram: str | None = Form(None), summary_mode: str = Form("template"), + listen_source: str = Form("bot"), ai_base_url: str = Form(""), ai_api_key: str = Form(""), ai_model: str = Form("gpt-4o-mini"), @@ -2382,6 +2570,7 @@ def create_panel_app() -> FastAPI: exclude_keywords, enabled, notify_telegram, + listen_source, summary_mode, ai_base_url, ai_api_key, @@ -2405,6 +2594,7 @@ def create_panel_app() -> FastAPI: enabled: str | None = Form(None), notify_telegram: str | None = Form(None), summary_mode: str = Form("template"), + listen_source: str = Form("bot"), ai_base_url: str = Form(""), ai_api_key: str = Form(""), ai_model: str = Form("gpt-4o-mini"), @@ -2423,6 +2613,7 @@ def create_panel_app() -> FastAPI: exclude_keywords, enabled, notify_telegram, + listen_source, summary_mode, ai_base_url, ai_api_key, @@ -2613,6 +2804,9 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" body = f"""

Bot / 面板设置

{status}
+

TG 用户会话(可选)

仅用于“TG 群监听 -> 监听来源=用户会话”,适合 Bot 无法加入的群。填写后需重启。

+
+

监控数据自动清理

删除过期监控通知消息,并清理 RSS/网站监控状态和去重记录;不会删除用户、收件箱、双向对话消息。

改 Token、管理员 ID 或端口后需要重启。
""" @@ -2635,7 +2829,7 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" cfg_save(cfg) @app.post("/settings", response_class=HTMLResponse) - async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: + async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), TG_API_ID: str = Form(""), TG_API_HASH: str = Form(""), TG_API_SESSION: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: save_panel_settings(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, CLEANUP_INTERVAL_MINUTES, CLEANUP_MESSAGE_DELETE_AFTER_MINUTES, CLEANUP_RETENTION_MINUTES) return layout("已保存", "
已保存,不会自动重启;修改 Token/管理员 ID 后请重启。

返回 重启机器人

") @@ -2757,13 +2951,16 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" settings_card = f"""

Bot / 面板配置

这里和“Bot / 面板设置”共用同一份 .env。修改 Token、管理员 ID、端口、账号或密码后不会自动重启,需要手动重启服务。

+

TG 用户会话(可选)

仅用于 TG 群监听来源=用户会话。修改后需重启。

+
+
重启机器人
""" body = settings_card + "

用户管理

" + "".join(trs) + "
用户状态备注操作
" return layout("用户管理", body) @app.post("/users/settings", response_class=HTMLResponse) - async def users_settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin")) -> str: + async def users_settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), TG_API_ID: str = Form(""), TG_API_HASH: str = Form(""), TG_API_SESSION: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin")) -> str: cleanup = (cfg_load_fresh().get("cleanup") or {}) save_panel_settings( locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, @@ -3013,7 +3210,7 @@ def bot_env_configured() -> bool: async def main_async(run_once: bool = False, panel_only: bool = False) -> None: - global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref + global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task load_dotenv(ENV_PATH, override=True) config = load_config() setup_logging(os.getenv("LOG_LEVEL", "INFO")) @@ -3053,6 +3250,15 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: scheduler.start() asyncio.create_task(flush_pending_loop()) asyncio.create_task(cleanup_monitor_loop()) + if group_monitors_need_user_session(): + if TelegramClient is None: + logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed") + elif not user_session_ready(): + logger.warning( + "group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete" + ) + else: + user_session_listener_task = asyncio.create_task(run_user_session_group_listener()) await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") logger.info("bot polling start") await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) diff --git a/config.example.yaml b/config.example.yaml index c391c68..73fdf5d 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -56,6 +56,7 @@ monitors: group_monitors: - name: TG 群关键词监听示例 enabled: false + listen_source: bot chat_id: -1001234567890 keywords: - VPS diff --git a/requirements.txt b/requirements.txt index 17855f9..122a39e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ python-dotenv==1.2.1 python-multipart==0.0.20 PyYAML==6.0.3 uvicorn[standard]==0.38.0 +telethon==1.42.0 diff --git a/tests/test_monitor_message_cleanup.py b/tests/test_monitor_message_cleanup.py index b344997..80cab47 100644 --- a/tests/test_monitor_message_cleanup.py +++ b/tests/test_monitor_message_cleanup.py @@ -324,6 +324,7 @@ class PanelHtmlContractTest(unittest.TestCase): "name=exclude_keywords", "name=enabled", "name=notify_telegram", + "name=listen_source", "name=summary_mode", "name=ai_interface", "name=ai_base_url", @@ -419,6 +420,7 @@ class GroupMonitorTest(unittest.TestCase): self.assertEqual(-10099, saved["chat_id"]) self.assertEqual("template", saved["summary_mode"]) self.assertEqual("responses", saved["ai_interface"]) + self.assertEqual("bot", saved["listen_source"]) self.assertEqual(0.2, saved["ai_temperature"]) self.assertEqual(1, saved["ai_timeout_seconds"]) self.assertEqual(app.DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS, saved["ai_min_interval_seconds"]) @@ -525,6 +527,24 @@ class MonitorRuntimeAndUpdateTest(unittest.TestCase): finally: app.config = old_config + def test_group_monitor_for_chat_and_source_returns_matched_monitor(self) -> None: + old_config = app.config + app.config = { + "group_monitors": [ + {"enabled": True, "chat_id": -10001, "listen_source": "bot", "keywords": ["vps"]}, + {"enabled": True, "chat_id": -10001, "listen_source": "user_session", "keywords": ["api"]}, + ] + } + try: + monitor_bot = app.group_monitor_for_chat_and_source(-10001, "bot") + monitor_session = app.group_monitor_for_chat_and_source(-10001, "user_session") + self.assertIsNotNone(monitor_bot) + self.assertIsNotNone(monitor_session) + self.assertEqual("bot", monitor_bot["listen_source"]) + self.assertEqual("user_session", monitor_session["listen_source"]) + finally: + app.config = old_config + def test_handle_group_keyword_message_sends_summary_to_admin(self) -> None: old_config = app.config old_bot = app.bot