From 212b9bc8781b465b0eb6dd742461cfd53baae8e5 Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Fri, 22 May 2026 03:04:09 +0800 Subject: [PATCH] Add Telegram group keyword monitor with web editor --- README.md | 18 ++ app.py | 237 +++++++++++++++++++++++++- config.example.yaml | 10 ++ tests/test_monitor_message_cleanup.py | 108 +++++++++++- 4 files changed, 371 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index adfb03d..d647d9e 100644 --- a/README.md +++ b/README.md @@ -308,6 +308,24 @@ bot: text: 你好,消息已收到,我稍后处理。 ``` +TG 群关键词监听(可选,默认关闭): + +```yaml +group_monitors: + - name: TG 群关键词监听 + enabled: false + chat_id: -1001234567890 + keywords: + - VPS + - 优惠 + exclude_keywords: + - 求带 + notify_telegram: true +``` + +- 命中 `keywords` 且未命中 `exclude_keywords` 时,会给管理员发送摘要。 +- 机器人想收到群里普通消息,需要在 `@BotFather` 执行 `/setprivacy` 关闭隐私模式。 + 监控数据自动清理示例: ```yaml diff --git a/app.py b/app.py index 5ae9076..9e320dd 100644 --- a/app.py +++ b/app.py @@ -65,6 +65,7 @@ config: dict[str, Any] = {} rate_buckets: dict[int, list[float]] = {} pending_sendpic: dict[int, dict[str, Any]] = {} scheduler_ref: AsyncIOScheduler | None = None +GROUP_SUMMARY_MAX_CHARS = 800 def setup_logging(level: str = "INFO") -> None: @@ -372,6 +373,102 @@ def spam_keyword_hits(text: str) -> list[str]: return keyword_hits(text, settings["keywords"]) +def group_monitors() -> list[dict[str, Any]]: + if not isinstance(config, dict): + return [] + rows = config.get("group_monitors") or [] + if not isinstance(rows, list): + return [] + monitors: list[dict[str, Any]] = [] + for row in rows: + if not isinstance(row, dict): + continue + if not row.get("enabled", True): + continue + try: + chat_id = int(row.get("chat_id")) + except (TypeError, ValueError): + continue + keywords = [str(k).strip() for k in (row.get("keywords") or []) if str(k).strip()] + exclude_keywords = [str(k).strip() for k in (row.get("exclude_keywords") or []) if str(k).strip()] + monitors.append( + { + "name": str(row.get("name") or str(chat_id)), + "chat_id": chat_id, + "keywords": keywords, + "exclude_keywords": exclude_keywords, + "notify_telegram": bool(row.get("notify_telegram", True)), + } + ) + return monitors + + +def group_monitor_for_chat(chat_id: int) -> dict[str, Any] | None: + for monitor in group_monitors(): + if int(monitor["chat_id"]) == int(chat_id): + return monitor + return None + + +def group_message_text(message: Message) -> str: + parts = [message.text or "", message.caption or ""] + if getattr(message, "reply_to_message", None): + reply = message.reply_to_message + parts.extend([getattr(reply, "text", "") or "", getattr(reply, "caption", "") or ""]) + merged = " ".join([p.strip() for p in parts if p and str(p).strip()]) + return merged.strip() + + +def telegram_message_link(chat_username: str | None, chat_id: int, message_id: int) -> str: + if chat_username: + return f"https://t.me/{chat_username}/{message_id}" + chat_num = str(chat_id) + if chat_num.startswith("-100") and len(chat_num) > 4: + return f"https://t.me/c/{chat_num[4:]}/{message_id}" + return f"chat_id={chat_id} message_id={message_id}" + + +def build_group_summary(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: + chat_title = getattr(message.chat, "title", "") or str(message.chat.id) + username = getattr(message.from_user, "username", None) + user_full = " ".join( + x for x in [getattr(message.from_user, "first_name", ""), getattr(message.from_user, "last_name", "")] if x + ).strip() or str(getattr(message.from_user, "id", "unknown")) + sender = f"{user_full} (@{username})" if username else user_full + text = group_message_text(message) + if len(text) > GROUP_SUMMARY_MAX_CHARS: + text = text[:GROUP_SUMMARY_MAX_CHARS] + "..." + link = telegram_message_link(getattr(message.chat, "username", None), int(message.chat.id), int(message.message_id)) + return ( + f"[群关键词命中] {html_escape(str(monitor.get('name') or chat_title))}\n" + f"群:{html_escape(chat_title)} ({message.chat.id})\n" + f"发送者:{html_escape(sender)}\n" + f"命中:{html_escape(', '.join(hits))}\n" + f"时间:{html_escape(now_iso())}\n" + f"链接:{html_escape(link)}\n" + f"内容:\n{html_escape(text or '(非文本消息)')}" + ) + + +async def handle_group_keyword_message(message: Message) -> bool: + monitor = group_monitor_for_chat(int(message.chat.id)) + if not monitor: + return False + text = group_message_text(message) + if not text: + return False + exclude_hits = keyword_hits(text, monitor.get("exclude_keywords") or []) + if exclude_hits: + return False + hits = keyword_hits(text, monitor.get("keywords") or []) + if not hits: + return False + if not monitor.get("notify_telegram", True): + return True + await admin_send(build_group_summary(message, monitor, hits)) + return True + + def update_spam_keywords(action: str, word: str) -> list[str]: cfg = cfg_load_fresh() bot_cfg = cfg.setdefault("bot", {}) @@ -747,6 +844,11 @@ async def user_message(message: Message) -> None: logger.info("incoming message is admin plain message; ignored by user relay") return if message.chat.type != "private": + if message.chat.type in {"group", "supergroup"}: + try: + await handle_group_keyword_message(message) + except Exception: + logger.exception("group keyword handling failed chat_id=%s message_id=%s", message.chat.id, message.message_id) logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type) return uid, full, username = user_display(message) @@ -1311,6 +1413,18 @@ def cfg_save(new_cfg: dict[str, Any]) -> None: raise ValueError("每个 monitor 必须是对象") if int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) < MIN_INTERVAL_SECONDS: m["interval_seconds"] = MIN_INTERVAL_SECONDS + group_monitor_rows = new_cfg.get("group_monitors") or [] + if group_monitor_rows is not None and not isinstance(group_monitor_rows, list): + raise ValueError("group_monitors 必须是列表") + for gm in group_monitor_rows: + if not isinstance(gm, dict): + raise ValueError("每个 group_monitor 必须是对象") + if "chat_id" in gm: + gm["chat_id"] = int(gm["chat_id"]) + gm.setdefault("enabled", True) + gm.setdefault("keywords", []) + gm.setdefault("exclude_keywords", []) + gm.setdefault("notify_telegram", True) CONFIG_PATH.write_text(yaml.safe_dump(new_cfg, allow_unicode=True, sort_keys=False), encoding="utf-8") global config config = new_cfg @@ -1392,7 +1506,7 @@ a{{color:var(--ink);text-decoration:none}}a:hover{{text-decoration:underline}}.s input,select,textarea{{width:100%;box-sizing:border-box;background:#fff;color:var(--ink);border:3px solid var(--ink);border-radius:0;padding:10px 11px;outline:none;font-size:14px;font-weight:600}}input:focus,select:focus,textarea:focus{{box-shadow:4px 4px 0 var(--blue)}}textarea{{min-height:116px;font-family:'Cascadia Mono',Consolas,monospace}}label{{display:block;margin:10px 0 5px;color:var(--ink);font-weight:900;font-size:12px;text-transform:uppercase;letter-spacing:.06em}}.check-row{{display:flex;gap:10px;align-items:center;flex-wrap:wrap}}.check-row label{{display:flex;gap:7px;align-items:center;margin:0;padding:8px 10px;border:3px solid var(--ink);background:var(--gray)}}.check-row input{{width:auto}} small,.muted{{color:var(--muted);line-height:1.5;font-weight:600}}table{{width:100%;border-collapse:collapse;border:3px solid var(--ink);background:white}}td,th{{border:3px solid var(--ink);padding:10px;text-align:left;vertical-align:top}}th{{color:var(--ink);font-size:12px;background:var(--yellow);text-transform:uppercase;letter-spacing:.06em}}tr:nth-child(even) td{{background:#fafafa}}.badge{{padding:4px 8px;border:3px solid var(--ink);border-radius:999px;background:var(--blue);color:white;font-size:12px;font-weight:900;text-transform:uppercase}}.msg{{padding:11px 12px;border:3px solid var(--ink);background:var(--yellow);color:var(--ink);margin:10px 0;font-weight:900;box-shadow:4px 4px 0 var(--ink)}}pre{{white-space:pre-wrap;background:#121212;color:#fff;padding:13px;border:4px solid var(--ink);max-height:420px;overflow:auto;box-shadow:5px 5px 0 var(--yellow)}} @media(max-width:860px){{.shell{{grid-template-columns:1fr}}aside{{position:relative;height:auto}}main{{padding:18px}}nav{{grid-template-columns:repeat(2,minmax(0,1fr))}}.top{{align-items:flex-start;flex-direction:column}}.card{{box-shadow:5px 5px 0 var(--ink)}}}} -

{html_escape(title)}

WatchBot Panel
+""" @@ -1426,6 +1540,22 @@ def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -
取消
""" +def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str: + m = m or {"enabled": True, "keywords": [], "exclude_keywords": [], "notify_telegram": True} + action = "/group-monitors/save" if idx is not None else "/group-monitors/create" + hidden = f"" if idx is not None else "" + keywords = "\n".join(m.get("keywords") or []) + exclude_keywords = "\n".join(m.get("exclude_keywords") or []) + return f"""
{hidden} +
+
+
+
+ + +
""" + + def create_panel_app() -> FastAPI: app = FastAPI(title="tg-watchbot Panel") @@ -1472,6 +1602,111 @@ def create_panel_app() -> FastAPI: async def new_monitor(_: str = Depends(panel_auth)) -> str: return layout("新增监控", "

这里是新增单个监控。要一次加多个网站,用左侧/首页的「批量新增」。

" + monitor_form_html()) + @app.get("/group-monitors", response_class=HTMLResponse) + async def group_monitors_page(_: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh() + rows = cfg.get("group_monitors") or [] + trs = [] + for i, gm in enumerate(rows): + if not isinstance(gm, dict): + continue + enabled = "启用" if gm.get("enabled", True) else "关闭" + notify = "推送 TG" if gm.get("notify_telegram", True) else "仅记录" + kws = ", ".join([str(x) for x in (gm.get("keywords") or [])]) or "-" + exs = ", ".join([str(x) for x in (gm.get("exclude_keywords") or [])]) or "-" + trs.append( + f"""{i+1}{html_escape(gm.get('name') or gm.get('chat_id') or '-')}
{html_escape(gm.get('chat_id') or '-')}{enabled}
{notify}{html_escape(kws)}{html_escape(exs)}编辑 删除""" + ) + body = ( + "

TG 群关键词监听

" + "

监听选定群并在命中关键词时给管理员发送摘要。" + "需要在 @BotFather 关闭 /setprivacy 才能接收群普通消息。

" + "
" + "" + + "".join(trs) + "
#监听状态关键词排除词操作
" + ) + return layout("TG 群监听", body) + + @app.get("/group-monitors/new", response_class=HTMLResponse) + async def group_monitor_new(_: str = Depends(panel_auth)) -> str: + return layout("新增 TG 群监听", group_monitor_form_html()) + + @app.get("/group-monitors/{idx}/edit", response_class=HTMLResponse) + async def group_monitor_edit(idx: int, _: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh() + rows = cfg.get("group_monitors") or [] + if idx < 0 or idx >= len(rows) or not isinstance(rows[idx], dict): + raise HTTPException(404, "group monitor not found") + return layout("编辑 TG 群监听", group_monitor_form_html(rows[idx], idx)) + + async def save_group_monitor_common( + original_index: int | None, + name: str, + chat_id: str, + keywords: str, + exclude_keywords: str, + enabled: str | None, + notify_telegram: str | None, + ) -> RedirectResponse | HTMLResponse: + cfg = cfg_load_fresh() + rows = cfg.setdefault("group_monitors", []) + if not isinstance(rows, list): + rows = [] + cfg["group_monitors"] = rows + try: + item = { + "name": name.strip() or chat_id.strip(), + "enabled": bool(enabled), + "chat_id": int(chat_id.strip()), + "keywords": parse_lines(keywords), + "exclude_keywords": parse_lines(exclude_keywords), + "notify_telegram": bool(notify_telegram), + } + except Exception as e: + return HTMLResponse(layout("保存失败", f"
{html_escape(e)}

返回

"), status_code=400) + if original_index is None: + rows.append(item) + else: + if original_index < 0 or original_index >= len(rows): + raise HTTPException(404, "group monitor not found") + rows[original_index] = item + cfg_save(cfg) + return RedirectResponse("/group-monitors", status_code=303) + + @app.post("/group-monitors/create") + async def group_monitor_create( + _: str = Depends(panel_auth), + name: str = Form(""), + chat_id: str = Form(""), + keywords: str = Form(""), + exclude_keywords: str = Form(""), + enabled: str | None = Form(None), + notify_telegram: str | None = Form(None), + ) -> RedirectResponse | HTMLResponse: + return await save_group_monitor_common(None, name, chat_id, keywords, exclude_keywords, enabled, notify_telegram) + + @app.post("/group-monitors/save") + async def group_monitor_save( + _: str = Depends(panel_auth), + original_index: int = Form(...), + name: str = Form(""), + chat_id: str = Form(""), + keywords: str = Form(""), + exclude_keywords: str = Form(""), + enabled: str | None = Form(None), + notify_telegram: str | None = Form(None), + ) -> RedirectResponse | HTMLResponse: + return await save_group_monitor_common(original_index, name, chat_id, keywords, exclude_keywords, enabled, notify_telegram) + + @app.get("/group-monitors/{idx}/delete") + async def group_monitor_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse: + cfg = cfg_load_fresh() + rows = cfg.get("group_monitors") or [] + if 0 <= idx < len(rows): + rows.pop(idx) + cfg_save(cfg) + return RedirectResponse("/group-monitors", status_code=303) + @app.get("/monitor/templates", response_class=HTMLResponse) async def monitor_templates(_: str = Depends(panel_auth)) -> str: body = """

论坛监控模板

NodeSeek / Linux.do 建议用 RSS,不抓网页 HTML,抗 Cloudflare 更稳。

""" diff --git a/config.example.yaml b/config.example.yaml index 1d12a47..460f43a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -53,6 +53,16 @@ monitors: stock_change: false notify_telegram: true forum: true +group_monitors: +- name: TG 群关键词监听示例 + enabled: false + chat_id: -1001234567890 + keywords: + - VPS + - 优惠 + exclude_keywords: + - 求带 + notify_telegram: true cleanup: enabled: true interval_minutes: 60 diff --git a/tests/test_monitor_message_cleanup.py b/tests/test_monitor_message_cleanup.py index a462efd..ae8338b 100644 --- a/tests/test_monitor_message_cleanup.py +++ b/tests/test_monitor_message_cleanup.py @@ -301,7 +301,7 @@ class PanelHtmlContractTest(unittest.TestCase): def test_layout_groups_navigation_by_domain(self) -> None: html = app.layout("测试", "

ok

") - for expected in ["消息", "监控", "配置", "系统", "私聊广告拦截"]: + for expected in ["消息", "监控", "配置", "系统", "私聊广告拦截", "TG 群监听"]: self.assertIn(expected, html) def test_inbox_copy_describes_two_way_conversation(self) -> None: @@ -314,6 +314,19 @@ class PanelHtmlContractTest(unittest.TestCase): self.assertIn("action='/users/settings'", source) self.assertIn("这里和“Bot / 面板设置”共用同一份 .env", source) + def test_group_monitor_form_keeps_backend_field_names(self) -> None: + html = app.group_monitor_form_html() + for expected in [ + "action='/group-monitors/create'", + "name=name", + "name=chat_id", + "name=keywords", + "name=exclude_keywords", + "name=enabled", + "name=notify_telegram", + ]: + self.assertIn(expected, html) + class SpamAndTemplateConfigTest(unittest.TestCase): def test_spam_keyword_hits_follow_config(self) -> None: @@ -347,5 +360,98 @@ class SpamAndTemplateConfigTest(unittest.TestCase): app.config = old_config +class GroupMonitorTest(unittest.TestCase): + def test_group_monitor_for_chat_returns_enabled_target(self) -> None: + old_config = app.config + app.config = { + "group_monitors": [ + {"enabled": True, "chat_id": -10001, "keywords": ["vps"], "exclude_keywords": []}, + {"enabled": False, "chat_id": -10002, "keywords": ["api"]}, + ] + } + try: + monitor = app.group_monitor_for_chat(-10001) + self.assertIsNotNone(monitor) + self.assertEqual(-10001, monitor["chat_id"]) + self.assertIsNone(app.group_monitor_for_chat(-10002)) + finally: + app.config = old_config + + def test_handle_group_keyword_message_sends_summary_to_admin(self) -> None: + old_config = app.config + old_bot = app.bot + old_admin_chat_ids = app.admin_chat_ids + fake_bot = FakeBot() + app.bot = fake_bot + app.admin_chat_ids = [9001] + app.config = { + "group_monitors": [ + { + "enabled": True, + "name": "测试群", + "chat_id": -100100100, + "keywords": ["vps", "优惠"], + "exclude_keywords": ["求带"], + "notify_telegram": True, + } + ] + } + msg = SimpleNamespace( + chat=SimpleNamespace(id=-100100100, username="groupdemo", title="测试群"), + from_user=SimpleNamespace(id=123, first_name="Alice", last_name="", username="alice"), + text="今晚 vps 有优惠", + caption=None, + reply_to_message=None, + message_id=777, + content_type="text", + ) + try: + ok = asyncio.run(app.handle_group_keyword_message(msg)) + self.assertTrue(ok) + self.assertEqual([9001], fake_bot.sent_chat_ids) + self.assertIn("[群关键词命中]", fake_bot.sent_texts[0]) + self.assertIn("命中:vps, 优惠", fake_bot.sent_texts[0]) + finally: + app.config = old_config + app.bot = old_bot + app.admin_chat_ids = old_admin_chat_ids + + def test_handle_group_keyword_message_respects_exclude_keywords(self) -> None: + old_config = app.config + old_bot = app.bot + old_admin_chat_ids = app.admin_chat_ids + fake_bot = FakeBot() + app.bot = fake_bot + app.admin_chat_ids = [9001] + app.config = { + "group_monitors": [ + { + "enabled": True, + "chat_id": -100100100, + "keywords": ["vps"], + "exclude_keywords": ["求带"], + "notify_telegram": True, + } + ] + } + msg = SimpleNamespace( + chat=SimpleNamespace(id=-100100100, username="groupdemo", title="测试群"), + from_user=SimpleNamespace(id=123, first_name="Alice", last_name="", username="alice"), + text="vps 求带", + caption=None, + reply_to_message=None, + message_id=777, + content_type="text", + ) + try: + ok = asyncio.run(app.handle_group_keyword_message(msg)) + self.assertFalse(ok) + self.assertEqual([], fake_bot.sent_chat_ids) + finally: + app.config = old_config + app.bot = old_bot + app.admin_chat_ids = old_admin_chat_ids + + if __name__ == "__main__": unittest.main()