Add Telegram group keyword monitor with web editor

This commit is contained in:
InfernoXuaI
2026-05-22 03:04:09 +08:00
parent 97f91e94a8
commit 212b9bc878
4 changed files with 371 additions and 2 deletions
+18
View File
@@ -308,6 +308,24 @@ bot:
text: 你好,消息已收到,我稍后处理。
```
TG 群关键词监听(可选,默认关闭):
```yaml
group_monitors:
- name: TG 群关键词监听
enabled: false
chat_id: -1001234567890
keywords:
- VPS
- 优惠
exclude_keywords:
- 求带
notify_telegram: true
```
- 命中 `keywords` 且未命中 `exclude_keywords` 时,会给管理员发送摘要。
- 机器人想收到群里普通消息,需要在 `@BotFather` 执行 `/setprivacy` 关闭隐私模式。
监控数据自动清理示例:
```yaml
+236 -1
View File
@@ -65,6 +65,7 @@ config: dict[str, Any] = {}
rate_buckets: dict[int, list[float]] = {}
pending_sendpic: dict[int, dict[str, Any]] = {}
scheduler_ref: AsyncIOScheduler | None = None
GROUP_SUMMARY_MAX_CHARS = 800
def setup_logging(level: str = "INFO") -> None:
@@ -372,6 +373,102 @@ def spam_keyword_hits(text: str) -> list[str]:
return keyword_hits(text, settings["keywords"])
def group_monitors() -> list[dict[str, Any]]:
if not isinstance(config, dict):
return []
rows = config.get("group_monitors") or []
if not isinstance(rows, list):
return []
monitors: list[dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
continue
if not row.get("enabled", True):
continue
try:
chat_id = int(row.get("chat_id"))
except (TypeError, ValueError):
continue
keywords = [str(k).strip() for k in (row.get("keywords") or []) if str(k).strip()]
exclude_keywords = [str(k).strip() for k in (row.get("exclude_keywords") or []) if str(k).strip()]
monitors.append(
{
"name": str(row.get("name") or str(chat_id)),
"chat_id": chat_id,
"keywords": keywords,
"exclude_keywords": exclude_keywords,
"notify_telegram": bool(row.get("notify_telegram", True)),
}
)
return monitors
def group_monitor_for_chat(chat_id: int) -> dict[str, Any] | None:
for monitor in group_monitors():
if int(monitor["chat_id"]) == int(chat_id):
return monitor
return None
def group_message_text(message: Message) -> str:
parts = [message.text or "", message.caption or ""]
if getattr(message, "reply_to_message", None):
reply = message.reply_to_message
parts.extend([getattr(reply, "text", "") or "", getattr(reply, "caption", "") or ""])
merged = " ".join([p.strip() for p in parts if p and str(p).strip()])
return merged.strip()
def telegram_message_link(chat_username: str | None, chat_id: int, message_id: int) -> str:
if chat_username:
return f"https://t.me/{chat_username}/{message_id}"
chat_num = str(chat_id)
if chat_num.startswith("-100") and len(chat_num) > 4:
return f"https://t.me/c/{chat_num[4:]}/{message_id}"
return f"chat_id={chat_id} message_id={message_id}"
def build_group_summary(message: Message, monitor: dict[str, Any], hits: list[str]) -> str:
chat_title = getattr(message.chat, "title", "") or str(message.chat.id)
username = getattr(message.from_user, "username", None)
user_full = " ".join(
x for x in [getattr(message.from_user, "first_name", ""), getattr(message.from_user, "last_name", "")] if x
).strip() or str(getattr(message.from_user, "id", "unknown"))
sender = f"{user_full} (@{username})" if username else user_full
text = group_message_text(message)
if len(text) > GROUP_SUMMARY_MAX_CHARS:
text = text[:GROUP_SUMMARY_MAX_CHARS] + "..."
link = telegram_message_link(getattr(message.chat, "username", None), int(message.chat.id), int(message.message_id))
return (
f"[群关键词命中] {html_escape(str(monitor.get('name') or chat_title))}\n"
f"群:{html_escape(chat_title)} ({message.chat.id})\n"
f"发送者:{html_escape(sender)}\n"
f"命中:{html_escape(', '.join(hits))}\n"
f"时间:{html_escape(now_iso())}\n"
f"链接:{html_escape(link)}\n"
f"内容:\n{html_escape(text or '(非文本消息)')}"
)
async def handle_group_keyword_message(message: Message) -> bool:
monitor = group_monitor_for_chat(int(message.chat.id))
if not monitor:
return False
text = group_message_text(message)
if not text:
return False
exclude_hits = keyword_hits(text, monitor.get("exclude_keywords") or [])
if exclude_hits:
return False
hits = keyword_hits(text, monitor.get("keywords") or [])
if not hits:
return False
if not monitor.get("notify_telegram", True):
return True
await admin_send(build_group_summary(message, monitor, hits))
return True
def update_spam_keywords(action: str, word: str) -> list[str]:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
@@ -747,6 +844,11 @@ async def user_message(message: Message) -> None:
logger.info("incoming message is admin plain message; ignored by user relay")
return
if message.chat.type != "private":
if message.chat.type in {"group", "supergroup"}:
try:
await handle_group_keyword_message(message)
except Exception:
logger.exception("group keyword handling failed chat_id=%s message_id=%s", message.chat.id, message.message_id)
logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type)
return
uid, full, username = user_display(message)
@@ -1311,6 +1413,18 @@ def cfg_save(new_cfg: dict[str, Any]) -> None:
raise ValueError("每个 monitor 必须是对象")
if int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) < MIN_INTERVAL_SECONDS:
m["interval_seconds"] = MIN_INTERVAL_SECONDS
group_monitor_rows = new_cfg.get("group_monitors") or []
if group_monitor_rows is not None and not isinstance(group_monitor_rows, list):
raise ValueError("group_monitors 必须是列表")
for gm in group_monitor_rows:
if not isinstance(gm, dict):
raise ValueError("每个 group_monitor 必须是对象")
if "chat_id" in gm:
gm["chat_id"] = int(gm["chat_id"])
gm.setdefault("enabled", True)
gm.setdefault("keywords", [])
gm.setdefault("exclude_keywords", [])
gm.setdefault("notify_telegram", True)
CONFIG_PATH.write_text(yaml.safe_dump(new_cfg, allow_unicode=True, sort_keys=False), encoding="utf-8")
global config
config = new_cfg
@@ -1392,7 +1506,7 @@ a{{color:var(--ink);text-decoration:none}}a:hover{{text-decoration:underline}}.s
input,select,textarea{{width:100%;box-sizing:border-box;background:#fff;color:var(--ink);border:3px solid var(--ink);border-radius:0;padding:10px 11px;outline:none;font-size:14px;font-weight:600}}input:focus,select:focus,textarea:focus{{box-shadow:4px 4px 0 var(--blue)}}textarea{{min-height:116px;font-family:'Cascadia Mono',Consolas,monospace}}label{{display:block;margin:10px 0 5px;color:var(--ink);font-weight:900;font-size:12px;text-transform:uppercase;letter-spacing:.06em}}.check-row{{display:flex;gap:10px;align-items:center;flex-wrap:wrap}}.check-row label{{display:flex;gap:7px;align-items:center;margin:0;padding:8px 10px;border:3px solid var(--ink);background:var(--gray)}}.check-row input{{width:auto}}
small,.muted{{color:var(--muted);line-height:1.5;font-weight:600}}table{{width:100%;border-collapse:collapse;border:3px solid var(--ink);background:white}}td,th{{border:3px solid var(--ink);padding:10px;text-align:left;vertical-align:top}}th{{color:var(--ink);font-size:12px;background:var(--yellow);text-transform:uppercase;letter-spacing:.06em}}tr:nth-child(even) td{{background:#fafafa}}.badge{{padding:4px 8px;border:3px solid var(--ink);border-radius:999px;background:var(--blue);color:white;font-size:12px;font-weight:900;text-transform:uppercase}}.msg{{padding:11px 12px;border:3px solid var(--ink);background:var(--yellow);color:var(--ink);margin:10px 0;font-weight:900;box-shadow:4px 4px 0 var(--ink)}}pre{{white-space:pre-wrap;background:#121212;color:#fff;padding:13px;border:4px solid var(--ink);max-height:420px;overflow:auto;box-shadow:5px 5px 0 var(--yellow)}}
@media(max-width:860px){{.shell{{grid-template-columns:1fr}}aside{{position:relative;height:auto}}main{{padding:18px}}nav{{grid-template-columns:repeat(2,minmax(0,1fr))}}.top{{align-items:flex-start;flex-direction:column}}.card{{box-shadow:5px 5px 0 var(--ink)}}}}
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><section><b>消息</b><a href='/inbox'>收件箱</a><a href='/users'>用户管理</a><a href='/send'>主动发消息</a><a href='/replies'>快捷回复</a><a href='/rules'>私聊广告拦截</a></section><section><b>监控</b><a href='/'>监控面板</a><a href='/monitor/new'>新增监控</a><a href='/monitor/events'>推送历史</a><a href='/run-once'>手动检查</a></section><section><b>配置</b><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/config/export'>导出配置</a></section><section><b>系统</b><a href='/logs'>运行日志</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></section></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><section><b>消息</b><a href='/inbox'>收件箱</a><a href='/users'>用户管理</a><a href='/send'>主动发消息</a><a href='/replies'>快捷回复</a><a href='/rules'>私聊广告拦截</a></section><section><b>监控</b><a href='/'>监控面板</a><a href='/monitor/new'>新增监控</a><a href='/group-monitors'>TG 群监听</a><a href='/monitor/events'>推送历史</a><a href='/run-once'>手动检查</a></section><section><b>配置</b><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/config/export'>导出配置</a></section><section><b>系统</b><a href='/logs'>运行日志</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></section></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
{body}</main></div></body></html>"""
@@ -1426,6 +1540,22 @@ def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -
<div class=form-actions><button class='btn primary' type=submit>保存</button> <a class=btn href='/'>取消</a></div></form>"""
def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str:
m = m or {"enabled": True, "keywords": [], "exclude_keywords": [], "notify_telegram": True}
action = "/group-monitors/save" if idx is not None else "/group-monitors/create"
hidden = f"<input type=hidden name=original_index value='{idx}'>" if idx is not None else ""
keywords = "\n".join(m.get("keywords") or [])
exclude_keywords = "\n".join(m.get("exclude_keywords") or [])
return f"""<form method=post action='{action}' class=card>{hidden}
<div class=check-row><label><input type=checkbox name=enabled {'checked' if m.get('enabled', True) else ''}> 启用监听</label>
<label><input type=checkbox name=notify_telegram {'checked' if m.get('notify_telegram', True) else ''}> 推送管理员</label></div>
<div class=grid><div><label>监听名称</label><input name=name value='{html_escape(m.get('name',''))}' placeholder='例如:业务群关键词'></div>
<div><label>群 chat_id</label><input name=chat_id value='{html_escape(m.get('chat_id',''))}' placeholder='例如 -1001234567890' required></div></div>
<label>关键词(一行一个)</label><textarea name=keywords>{html_escape(keywords)}</textarea>
<label>排除词(一行一个)</label><textarea name=exclude_keywords>{html_escape(exclude_keywords)}</textarea>
<div class=form-actions><button class='btn primary' type=submit>保存</button> <a class=btn href='/group-monitors'>返回列表</a></div></form>"""
def create_panel_app() -> FastAPI:
app = FastAPI(title="tg-watchbot Panel")
@@ -1472,6 +1602,111 @@ def create_panel_app() -> FastAPI:
async def new_monitor(_: str = Depends(panel_auth)) -> str:
return layout("新增监控", "<div class=card><p class=muted>这里是新增单个监控。要一次加多个网站,用左侧/首页的「批量新增」。</p></div>" + monitor_form_html())
@app.get("/group-monitors", response_class=HTMLResponse)
async def group_monitors_page(_: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
trs = []
for i, gm in enumerate(rows):
if not isinstance(gm, dict):
continue
enabled = "启用" if gm.get("enabled", True) else "关闭"
notify = "推送 TG" if gm.get("notify_telegram", True) else "仅记录"
kws = ", ".join([str(x) for x in (gm.get("keywords") or [])]) or "-"
exs = ", ".join([str(x) for x in (gm.get("exclude_keywords") or [])]) or "-"
trs.append(
f"""<tr><td>{i+1}</td><td><b>{html_escape(gm.get('name') or gm.get('chat_id') or '-')}</b><br><small>{html_escape(gm.get('chat_id') or '-')}</small></td><td>{enabled}<br><small>{notify}</small></td><td>{html_escape(kws)}</td><td>{html_escape(exs)}</td><td><a class=btn href='/group-monitors/{i}/edit'>编辑</a> <a class='btn danger' href='/group-monitors/{i}/delete' onclick='return confirm("确定删除?")'>删除</a></td></tr>"""
)
body = (
"<div class=card><div class=toolbar><div><h2 style='margin:0 0 6px'>TG 群关键词监听</h2>"
"<p class=muted style='margin:0'>监听选定群并在命中关键词时给管理员发送摘要。"
"需要在 @BotFather 关闭 /setprivacy 才能接收群普通消息。</p></div>"
"<div class=actions><a class='btn primary' href='/group-monitors/new'>新增监听</a></div></div>"
"<table style='margin-top:16px'><tr><th>#</th><th>监听</th><th>状态</th><th>关键词</th><th>排除词</th><th>操作</th></tr>"
+ "".join(trs) + "</table></div>"
)
return layout("TG 群监听", body)
@app.get("/group-monitors/new", response_class=HTMLResponse)
async def group_monitor_new(_: str = Depends(panel_auth)) -> str:
return layout("新增 TG 群监听", group_monitor_form_html())
@app.get("/group-monitors/{idx}/edit", response_class=HTMLResponse)
async def group_monitor_edit(idx: int, _: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
if idx < 0 or idx >= len(rows) or not isinstance(rows[idx], dict):
raise HTTPException(404, "group monitor not found")
return layout("编辑 TG 群监听", group_monitor_form_html(rows[idx], idx))
async def save_group_monitor_common(
original_index: int | None,
name: str,
chat_id: str,
keywords: str,
exclude_keywords: str,
enabled: str | None,
notify_telegram: str | None,
) -> RedirectResponse | HTMLResponse:
cfg = cfg_load_fresh()
rows = cfg.setdefault("group_monitors", [])
if not isinstance(rows, list):
rows = []
cfg["group_monitors"] = rows
try:
item = {
"name": name.strip() or chat_id.strip(),
"enabled": bool(enabled),
"chat_id": int(chat_id.strip()),
"keywords": parse_lines(keywords),
"exclude_keywords": parse_lines(exclude_keywords),
"notify_telegram": bool(notify_telegram),
}
except Exception as e:
return HTMLResponse(layout("保存失败", f"<div class=card><pre>{html_escape(e)}</pre></div><p><a class=btn href='/group-monitors'>返回</a></p>"), status_code=400)
if original_index is None:
rows.append(item)
else:
if original_index < 0 or original_index >= len(rows):
raise HTTPException(404, "group monitor not found")
rows[original_index] = item
cfg_save(cfg)
return RedirectResponse("/group-monitors", status_code=303)
@app.post("/group-monitors/create")
async def group_monitor_create(
_: str = Depends(panel_auth),
name: str = Form(""),
chat_id: str = Form(""),
keywords: str = Form(""),
exclude_keywords: str = Form(""),
enabled: str | None = Form(None),
notify_telegram: str | None = Form(None),
) -> RedirectResponse | HTMLResponse:
return await save_group_monitor_common(None, name, chat_id, keywords, exclude_keywords, enabled, notify_telegram)
@app.post("/group-monitors/save")
async def group_monitor_save(
_: str = Depends(panel_auth),
original_index: int = Form(...),
name: str = Form(""),
chat_id: str = Form(""),
keywords: str = Form(""),
exclude_keywords: str = Form(""),
enabled: str | None = Form(None),
notify_telegram: str | None = Form(None),
) -> RedirectResponse | HTMLResponse:
return await save_group_monitor_common(original_index, name, chat_id, keywords, exclude_keywords, enabled, notify_telegram)
@app.get("/group-monitors/{idx}/delete")
async def group_monitor_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
if 0 <= idx < len(rows):
rows.pop(idx)
cfg_save(cfg)
return RedirectResponse("/group-monitors", status_code=303)
@app.get("/monitor/templates", response_class=HTMLResponse)
async def monitor_templates(_: str = Depends(panel_auth)) -> str:
body = """<div class=card><h2>论坛监控模板</h2><p class=muted>NodeSeek / Linux.do 建议用 RSS,不抓网页 HTML,抗 Cloudflare 更稳。</p><div class=grid><a class='btn primary' href='/monitor/template/nodeseek'>NodeSeek 新帖</a><a class='btn primary' href='/monitor/template/linuxdo'>Linux.do 最新</a><a class='btn primary' href='/monitor/template/linuxdo-resource'>Linux.do 资源荟萃</a></div></div>"""
+10
View File
@@ -53,6 +53,16 @@ monitors:
stock_change: false
notify_telegram: true
forum: true
group_monitors:
- name: TG 群关键词监听示例
enabled: false
chat_id: -1001234567890
keywords:
- VPS
- 优惠
exclude_keywords:
- 求带
notify_telegram: true
cleanup:
enabled: true
interval_minutes: 60
+107 -1
View File
@@ -301,7 +301,7 @@ class PanelHtmlContractTest(unittest.TestCase):
def test_layout_groups_navigation_by_domain(self) -> None:
html = app.layout("测试", "<p>ok</p>")
for expected in ["<b>消息</b>", "<b>监控</b>", "<b>配置</b>", "<b>系统</b>", "私聊广告拦截"]:
for expected in ["<b>消息</b>", "<b>监控</b>", "<b>配置</b>", "<b>系统</b>", "私聊广告拦截", "TG 群监听"]:
self.assertIn(expected, html)
def test_inbox_copy_describes_two_way_conversation(self) -> None:
@@ -314,6 +314,19 @@ class PanelHtmlContractTest(unittest.TestCase):
self.assertIn("action='/users/settings'", source)
self.assertIn("这里和“Bot / 面板设置”共用同一份 .env", source)
def test_group_monitor_form_keeps_backend_field_names(self) -> None:
html = app.group_monitor_form_html()
for expected in [
"action='/group-monitors/create'",
"name=name",
"name=chat_id",
"name=keywords",
"name=exclude_keywords",
"name=enabled",
"name=notify_telegram",
]:
self.assertIn(expected, html)
class SpamAndTemplateConfigTest(unittest.TestCase):
def test_spam_keyword_hits_follow_config(self) -> None:
@@ -347,5 +360,98 @@ class SpamAndTemplateConfigTest(unittest.TestCase):
app.config = old_config
class GroupMonitorTest(unittest.TestCase):
def test_group_monitor_for_chat_returns_enabled_target(self) -> None:
old_config = app.config
app.config = {
"group_monitors": [
{"enabled": True, "chat_id": -10001, "keywords": ["vps"], "exclude_keywords": []},
{"enabled": False, "chat_id": -10002, "keywords": ["api"]},
]
}
try:
monitor = app.group_monitor_for_chat(-10001)
self.assertIsNotNone(monitor)
self.assertEqual(-10001, monitor["chat_id"])
self.assertIsNone(app.group_monitor_for_chat(-10002))
finally:
app.config = old_config
def test_handle_group_keyword_message_sends_summary_to_admin(self) -> None:
old_config = app.config
old_bot = app.bot
old_admin_chat_ids = app.admin_chat_ids
fake_bot = FakeBot()
app.bot = fake_bot
app.admin_chat_ids = [9001]
app.config = {
"group_monitors": [
{
"enabled": True,
"name": "测试群",
"chat_id": -100100100,
"keywords": ["vps", "优惠"],
"exclude_keywords": ["求带"],
"notify_telegram": True,
}
]
}
msg = SimpleNamespace(
chat=SimpleNamespace(id=-100100100, username="groupdemo", title="测试群"),
from_user=SimpleNamespace(id=123, first_name="Alice", last_name="", username="alice"),
text="今晚 vps 有优惠",
caption=None,
reply_to_message=None,
message_id=777,
content_type="text",
)
try:
ok = asyncio.run(app.handle_group_keyword_message(msg))
self.assertTrue(ok)
self.assertEqual([9001], fake_bot.sent_chat_ids)
self.assertIn("[群关键词命中]", fake_bot.sent_texts[0])
self.assertIn("命中:vps, 优惠", fake_bot.sent_texts[0])
finally:
app.config = old_config
app.bot = old_bot
app.admin_chat_ids = old_admin_chat_ids
def test_handle_group_keyword_message_respects_exclude_keywords(self) -> None:
old_config = app.config
old_bot = app.bot
old_admin_chat_ids = app.admin_chat_ids
fake_bot = FakeBot()
app.bot = fake_bot
app.admin_chat_ids = [9001]
app.config = {
"group_monitors": [
{
"enabled": True,
"chat_id": -100100100,
"keywords": ["vps"],
"exclude_keywords": ["求带"],
"notify_telegram": True,
}
]
}
msg = SimpleNamespace(
chat=SimpleNamespace(id=-100100100, username="groupdemo", title="测试群"),
from_user=SimpleNamespace(id=123, first_name="Alice", last_name="", username="alice"),
text="vps 求带",
caption=None,
reply_to_message=None,
message_id=777,
content_type="text",
)
try:
ok = asyncio.run(app.handle_group_keyword_message(msg))
self.assertFalse(ok)
self.assertEqual([], fake_bot.sent_chat_ids)
finally:
app.config = old_config
app.bot = old_bot
app.admin_chat_ids = old_admin_chat_ids
if __name__ == "__main__":
unittest.main()