tg-watchbot
登录后管理 Telegram 机器人、关键词监控和提醒。
{err}From 9095229218cf0ab574792a3f000f5e3e90e3381e Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Fri, 22 May 2026 14:45:50 +0800 Subject: [PATCH] Add safer web update flow, group-monitor AI controls, and monitor observability --- README.md | 38 +- app.py | 795 +++++++++++++++++++++++++- config.example.yaml | 10 + tests/test_monitor_message_cleanup.py | 199 +++++++ 4 files changed, 1015 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index d647d9e..84eae81 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,16 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 ``` ## 更新日志 +### 2026-05-22 更新 + +- TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。 +- 新增 `/update` 安全更新流程:显示本地/远端 commit、ahead/behind、工作区状态;仅允许 `ff-only` 更新。 +- 更新前若检测到本地未提交改动,会拒绝更新;避免覆盖本地代码。 +- 新增“回滚上次更新”按钮:更新前自动记录回滚点,可一键回滚并重启。 +- TG 群监听 AI 总结新增可视化高级控制:`ai_prompt`、`ai_min_interval_seconds`、`ai_dedupe_window_seconds`。 +- TG 群监听增加限频和去重窗口,降低重复推送与 AI 调用成本;AI 失败时仍会回退模板摘要。 +- 监控面板新增可观测状态:最近成功/失败时间、最近错误、耗时、推送数、连续失败次数。 + ### 2026-05-21 第二次更新 - Web 面板新增收件箱直接回复、用户管理、快捷回复、私聊广告拦截、监控推送历史、配置导入/导出。 @@ -127,7 +137,6 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - 如果要把面板暴露到公网,建议使用 Cloudflare Access / 反代鉴权,并使用强密码。 - Bot 只能给“已经主动私聊过 Bot 的用户”发消息,这是 Telegram Bot API 的限制。 - ## 快速开始 @@ -157,6 +166,9 @@ docker compose logs -f docker compose restart ``` + +## 手动安装(Python) + ```bash git clone https://github.com/GongyiChuren/tg-watchbot.git tg-watchbot cd tg-watchbot @@ -321,11 +333,35 @@ group_monitors: exclude_keywords: - 求带 notify_telegram: true + summary_mode: template + ai_base_url: "" + ai_api_key: "" + ai_model: gpt-4o-mini + ai_interface: responses + ai_temperature: 0.2 + ai_timeout_seconds: 30 + ai_prompt: "" + ai_min_interval_seconds: 30 + ai_dedupe_window_seconds: 300 ``` - 命中 `keywords` 且未命中 `exclude_keywords` 时,会给管理员发送摘要。 +- `summary_mode` 支持: + - `template`:固定模板摘要(默认) + - `ai`:调用 AI 生成摘要(在 TG 群监听页面可视化配置) +- `ai_prompt` 可填自定义总结提示词;留空使用内置默认提示词。 +- `ai_interface` 支持: + - `responses`:`/v1/responses` + - `chat`:`/v1/chat/completions` +- `ai_min_interval_seconds`:同一个群监听最小推送间隔(防刷屏) +- `ai_dedupe_window_seconds`:相同内容摘要去重窗口(防重复) - 机器人想收到群里普通消息,需要在 `@BotFather` 执行 `/setprivacy` 关闭隐私模式。 +更新代码(`/update`)已支持安全检查: +- 显示本地/远端 commit、ahead/behind、工作区是否干净 +- 只允许 `ff-only` 更新,工作区有未提交改动会拒绝更新 +- 自动记录上次更新前的回滚点,并支持一键回滚 + 监控数据自动清理示例: ```yaml diff --git a/app.py b/app.py index 9e320dd..f1562e6 100644 --- a/app.py +++ b/app.py @@ -17,6 +17,7 @@ import os import re import secrets import signal +import subprocess import sqlite3 import time from contextlib import closing @@ -50,6 +51,8 @@ ENV_PATH = BASE_DIR / ".env" LOG_PATH = BASE_DIR / "tg-watchbot.log" MIN_INTERVAL_SECONDS = 60 DEFAULT_MONITOR_MESSAGE_DELETE_AFTER_MINUTES = 60 +DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS = 30 +DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS = 300 DEFAULT_UA = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " @@ -183,6 +186,32 @@ def init_db() -> None: pushed INTEGER DEFAULT 0, created_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS monitor_runtime_status ( + monitor_name TEXT PRIMARY KEY, + last_run_at TEXT, + last_success_at TEXT, + last_error_at TEXT, + last_error TEXT, + last_duration_ms INTEGER DEFAULT 0, + last_sent_count INTEGER DEFAULT 0, + consecutive_failures INTEGER DEFAULT 0, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS group_monitor_recent ( + monitor_name TEXT NOT NULL, + fingerprint TEXT NOT NULL, + sent_at_ts REAL NOT NULL, + PRIMARY KEY (monitor_name, fingerprint) + ); + CREATE TABLE IF NOT EXISTS group_monitor_last_send ( + monitor_name TEXT PRIMARY KEY, + sent_at_ts REAL NOT NULL + ); + CREATE TABLE IF NOT EXISTS app_meta ( + meta_key TEXT PRIMARY KEY, + meta_value TEXT, + updated_at TEXT NOT NULL + ); """ ) for sql in [ @@ -373,6 +402,17 @@ def spam_keyword_hits(text: str) -> list[str]: return keyword_hits(text, settings["keywords"]) +def ai_api_url(base_url: str, path: str) -> str: + base = base_url.strip().rstrip("/") + if not base: + return path + if base.endswith("/v1"): + return base + path + if path.startswith("/v1"): + return base + path + return base + "/v1" + path + + def group_monitors() -> list[dict[str, Any]]: if not isinstance(config, dict): return [] @@ -389,15 +429,31 @@ def group_monitors() -> list[dict[str, Any]]: chat_id = int(row.get("chat_id")) except (TypeError, ValueError): continue + summary_mode = str(row.get("summary_mode") or "template").strip().lower() + if summary_mode not in {"template", "ai"}: + summary_mode = "template" + ai_interface = str(row.get("ai_interface") or "responses").strip().lower() or "responses" + if ai_interface not in {"responses", "chat"}: + ai_interface = "responses" keywords = [str(k).strip() for k in (row.get("keywords") or []) if str(k).strip()] exclude_keywords = [str(k).strip() for k in (row.get("exclude_keywords") or []) if str(k).strip()] monitors.append( { "name": str(row.get("name") or str(chat_id)), "chat_id": chat_id, + "summary_mode": summary_mode, "keywords": keywords, "exclude_keywords": exclude_keywords, "notify_telegram": bool(row.get("notify_telegram", True)), + "ai_base_url": str(row.get("ai_base_url") or "").strip(), + "ai_api_key": str(row.get("ai_api_key") or "").strip(), + "ai_model": str(row.get("ai_model") or "gpt-4o-mini").strip(), + "ai_interface": ai_interface, + "ai_temperature": safe_float(row.get("ai_temperature", 0.2), 0.2), + "ai_timeout_seconds": max(1, safe_int(row.get("ai_timeout_seconds", 30), 30)), + "ai_prompt": str(row.get("ai_prompt") or "").strip(), + "ai_min_interval_seconds": max(0, safe_int(row.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)), + "ai_dedupe_window_seconds": max(0, safe_int(row.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)), } ) return monitors @@ -419,6 +475,37 @@ def group_message_text(message: Message) -> str: return merged.strip() +def group_message_context(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: + chat_title = getattr(message.chat, "title", "") or str(message.chat.id) + username = getattr(message.from_user, "username", None) + user_full = " ".join( + x for x in [getattr(message.from_user, "first_name", ""), getattr(message.from_user, "last_name", "")] if x + ).strip() or str(getattr(message.from_user, "id", "unknown")) + sender = f"{user_full} (@{username})" if username else user_full + reply_text = "" + if getattr(message, "reply_to_message", None): + reply = message.reply_to_message + reply_text = group_message_text(reply)[:GROUP_SUMMARY_MAX_CHARS] + text = group_message_text(message) + if len(text) > GROUP_SUMMARY_MAX_CHARS: + text = text[:GROUP_SUMMARY_MAX_CHARS] + "..." + return ( + f"群名: {chat_title}\n" + f"群ID: {message.chat.id}\n" + f"群用户名: @{getattr(message.chat, 'username', '') or ''}\n" + f"发送者: {sender}\n" + f"发送者ID: {getattr(message.from_user, 'id', 'unknown')}\n" + f"消息ID: {message.message_id}\n" + f"时间: {now_iso()}\n" + f"命中关键词: {', '.join(hits) or '-'}\n" + f"消息类型: {message.content_type}\n" + f"正文:\n{text or '(非文本消息)'}\n" + f"回复引用:\n{reply_text or '-'}\n" + f"链接: {telegram_message_link(getattr(message.chat, 'username', None), int(message.chat.id), int(message.message_id))}\n" + f"监听名称: {monitor.get('name') or chat_title}" + ) + + def telegram_message_link(chat_username: str | None, chat_id: int, message_id: int) -> str: if chat_username: return f"https://t.me/{chat_username}/{message_id}" @@ -428,7 +515,99 @@ def telegram_message_link(chat_username: str | None, chat_id: int, message_id: i return f"chat_id={chat_id} message_id={message_id}" -def build_group_summary(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: +def extract_responses_text(data: dict[str, Any]) -> str: + text = data.get("output_text") + if isinstance(text, str) and text.strip(): + return text.strip() + chunks: list[str] = [] + for item in data.get("output") or []: + if not isinstance(item, dict): + continue + for part in item.get("content") or []: + if not isinstance(part, dict): + continue + value = part.get("text") or part.get("content") + if isinstance(value, str) and value.strip(): + chunks.append(value.strip()) + return "\n".join(chunks).strip() + + +def extract_chat_text(data: dict[str, Any]) -> str: + choices = data.get("choices") or [] + if not choices: + return "" + message = choices[0].get("message") or {} + content = message.get("content") + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + chunks: list[str] = [] + for part in content: + if not isinstance(part, dict): + continue + value = part.get("text") or part.get("content") + if isinstance(value, str) and value.strip(): + chunks.append(value.strip()) + return "\n".join(chunks).strip() + return "" + + +def build_group_ai_system_prompt(custom_prompt: str) -> str: + base = ( + "你是 Telegram 群消息摘要助手。" + "请用简体中文输出一段给管理员看的消息摘要,尽量完整,保留群名、发送者、命中词、正文关键内容、时间和链接。" + "不要编造,不要添加未出现的信息。" + "如果正文很长,请压缩成可快速扫读的摘要,但不要丢掉数字、价格、链接、联系方式和结论。" + ) + custom = (custom_prompt or "").strip() + if not custom: + return base + return base + "\n补充要求:\n" + custom + + +def build_group_ai_prompt(message: Message, monitor: dict[str, Any], hits: list[str]) -> tuple[str, str]: + system = build_group_ai_system_prompt(str(monitor.get("ai_prompt") or "")) + user = group_message_context(message, monitor, hits) + return system, user + + +async def summarize_group_message_ai(message: Message, monitor: dict[str, Any], hits: list[str]) -> str | None: + ai_base_url = str(monitor.get("ai_base_url") or "").strip() + ai_api_key = str(monitor.get("ai_api_key") or "").strip() + ai_model = str(monitor.get("ai_model") or "gpt-4o-mini").strip() + ai_interface = str(monitor.get("ai_interface") or "responses").strip().lower() + ai_timeout = max(1, int(monitor.get("ai_timeout_seconds") or 30)) + ai_temperature = float(monitor.get("ai_temperature") or 0.2) + if not ai_base_url or not ai_api_key or not ai_model: + return None + system_prompt, user_prompt = build_group_ai_prompt(message, monitor, hits) + headers = {"Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json"} + async with httpx.AsyncClient(timeout=ai_timeout, headers=headers) as client: + if ai_interface == "chat": + payload = { + "model": ai_model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + "temperature": ai_temperature, + } + resp = await client.post(ai_api_url(ai_base_url, "/chat/completions"), json=payload) + resp.raise_for_status() + return extract_chat_text(resp.json()) + payload = { + "model": ai_model, + "instructions": system_prompt, + "input": user_prompt, + "temperature": ai_temperature, + "max_output_tokens": 300, + } + resp = await client.post(ai_api_url(ai_base_url, "/responses"), json=payload) + resp.raise_for_status() + return extract_responses_text(resp.json()) + + +def summarize_group_message_template(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: chat_title = getattr(message.chat, "title", "") or str(message.chat.id) username = getattr(message.from_user, "username", None) user_full = " ".join( @@ -438,18 +617,100 @@ def build_group_summary(message: Message, monitor: dict[str, Any], hits: list[st text = group_message_text(message) if len(text) > GROUP_SUMMARY_MAX_CHARS: text = text[:GROUP_SUMMARY_MAX_CHARS] + "..." - link = telegram_message_link(getattr(message.chat, "username", None), int(message.chat.id), int(message.message_id)) return ( f"[群关键词命中] {html_escape(str(monitor.get('name') or chat_title))}\n" f"群:{html_escape(chat_title)} ({message.chat.id})\n" f"发送者:{html_escape(sender)}\n" f"命中:{html_escape(', '.join(hits))}\n" f"时间:{html_escape(now_iso())}\n" - f"链接:{html_escape(link)}\n" + f"链接:{html_escape(telegram_message_link(getattr(message.chat, 'username', None), int(message.chat.id), int(message.message_id)))}\n" f"内容:\n{html_escape(text or '(非文本消息)')}" ) +async def summarize_group_message(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: + if str(monitor.get("summary_mode") or "template").strip().lower() != "ai": + return summarize_group_message_template(message, monitor, hits) + try: + text = await summarize_group_message_ai(message, monitor, hits) + if text: + return f"[群AI总结] {html_escape(str(monitor.get('name') or message.chat.id))}\n{html_escape(text.strip())}" + logger.warning("group ai summary returned empty result chat_id=%s message_id=%s", message.chat.id, message.message_id) + except Exception: + logger.exception("group ai summary failed chat_id=%s message_id=%s", message.chat.id, message.message_id) + return "[群AI总结失败,已使用模板]\n" + summarize_group_message_template(message, monitor, hits) + + +def build_group_summary(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: + return summarize_group_message_template(message, monitor, hits) + + +def group_monitor_interval_seconds(monitor: dict[str, Any]) -> int: + return max(0, safe_int(monitor.get("ai_min_interval_seconds"), DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)) + + +def group_monitor_dedupe_window_seconds(monitor: dict[str, Any]) -> int: + return max(0, safe_int(monitor.get("ai_dedupe_window_seconds"), DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)) + + +def group_monitor_fingerprint(message: Message, monitor: dict[str, Any], hits: list[str]) -> str: + payload = "|".join( + [ + str(monitor.get("name") or ""), + str(message.chat.id), + str(getattr(message.from_user, "id", "")), + ",".join(sorted(hits)), + group_message_text(message)[:300], + ] + ) + return hashlib.sha256(payload.encode("utf-8", errors="ignore")).hexdigest() + + +def group_monitor_allow_send(monitor: dict[str, Any], fingerprint: str, now_ts: float | None = None) -> tuple[bool, str]: + ts = time.time() if now_ts is None else float(now_ts) + name = str(monitor.get("name") or monitor.get("chat_id") or "group-monitor") + min_interval = group_monitor_interval_seconds(monitor) + dedupe_window = group_monitor_dedupe_window_seconds(monitor) + with closing(db()) as conn: + if min_interval > 0: + row = conn.execute( + "SELECT sent_at_ts FROM group_monitor_last_send WHERE monitor_name=?", + (name,), + ).fetchone() + if row and (ts - float(row["sent_at_ts"]) < min_interval): + return False, f"min-interval({min_interval}s)" + if dedupe_window > 0: + row = conn.execute( + "SELECT sent_at_ts FROM group_monitor_recent WHERE monitor_name=? AND fingerprint=?", + (name, fingerprint), + ).fetchone() + if row and (ts - float(row["sent_at_ts"]) < dedupe_window): + return False, f"dedupe({dedupe_window}s)" + if dedupe_window > 0: + conn.execute( + "DELETE FROM group_monitor_recent WHERE sent_at_ts < ?", + (ts - dedupe_window,), + ) + conn.execute( + """ + INSERT INTO group_monitor_recent(monitor_name, fingerprint, sent_at_ts) + VALUES(?,?,?) + ON CONFLICT(monitor_name, fingerprint) DO UPDATE SET sent_at_ts=excluded.sent_at_ts + """, + (name, fingerprint, ts), + ) + conn.execute( + """ + INSERT INTO group_monitor_last_send(monitor_name, sent_at_ts) + VALUES(?,?) + ON CONFLICT(monitor_name) DO UPDATE SET sent_at_ts=excluded.sent_at_ts + """, + (name, ts), + ) + conn.commit() + return True, "" + + async def handle_group_keyword_message(message: Message) -> bool: monitor = group_monitor_for_chat(int(message.chat.id)) if not monitor: @@ -465,7 +726,18 @@ async def handle_group_keyword_message(message: Message) -> bool: return False if not monitor.get("notify_telegram", True): return True - await admin_send(build_group_summary(message, monitor, hits)) + fp = group_monitor_fingerprint(message, monitor, hits) + allow, reason = group_monitor_allow_send(monitor, fp) + if not allow: + logger.info( + "group monitor skipped by limiter monitor=%s chat_id=%s message_id=%s reason=%s", + monitor.get("name"), + message.chat.id, + message.message_id, + reason, + ) + return False + await admin_send(await summarize_group_message(message, monitor, hits)) return True @@ -497,6 +769,77 @@ def record_monitor_event(monitor_name: str, title: str, link: str, reasons: list conn.commit() +def record_monitor_runtime( + monitor_name: str, + ok: bool, + duration_ms: int, + sent_count: int, + error: str = "", +) -> None: + now = now_iso() + with closing(db()) as conn: + row = conn.execute( + "SELECT consecutive_failures FROM monitor_runtime_status WHERE monitor_name=?", + (monitor_name,), + ).fetchone() + prev_failures = int(row["consecutive_failures"]) if row else 0 + failures = 0 if ok else (prev_failures + 1) + conn.execute( + """ + INSERT INTO monitor_runtime_status( + monitor_name, last_run_at, last_success_at, last_error_at, last_error, + last_duration_ms, last_sent_count, consecutive_failures, updated_at + ) VALUES(?,?,?,?,?,?,?,?,?) + ON CONFLICT(monitor_name) DO UPDATE SET + last_run_at=excluded.last_run_at, + last_success_at=CASE WHEN excluded.last_success_at IS NOT NULL THEN excluded.last_success_at ELSE monitor_runtime_status.last_success_at END, + last_error_at=CASE WHEN excluded.last_error_at IS NOT NULL THEN excluded.last_error_at ELSE monitor_runtime_status.last_error_at END, + last_error=CASE WHEN excluded.last_error != '' THEN excluded.last_error ELSE monitor_runtime_status.last_error END, + last_duration_ms=excluded.last_duration_ms, + last_sent_count=excluded.last_sent_count, + consecutive_failures=excluded.consecutive_failures, + updated_at=excluded.updated_at + """, + ( + monitor_name, + now, + now if ok else None, + now if not ok else None, + "" if ok else error[:1000], + max(0, int(duration_ms)), + max(0, int(sent_count)), + failures, + now, + ), + ) + conn.commit() + + +def list_monitor_runtime_status() -> dict[str, dict[str, Any]]: + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM monitor_runtime_status").fetchall() + output: dict[str, dict[str, Any]] = {} + for row in rows: + output[str(row["monitor_name"])] = { + "last_run_at": row["last_run_at"], + "last_success_at": row["last_success_at"], + "last_error_at": row["last_error_at"], + "last_error": row["last_error"] or "", + "last_duration_ms": int(row["last_duration_ms"] or 0), + "last_sent_count": int(row["last_sent_count"] or 0), + "consecutive_failures": int(row["consecutive_failures"] or 0), + } + return output + + +def get_monitor_status_badge(status: dict[str, Any] | None) -> str: + if not status: + return "未运行" + if int(status.get("consecutive_failures", 0)) > 0: + return f"异常 x{int(status.get('consecutive_failures', 0))}" + return "正常" + + def lookup_reply_target(admin_chat: int, admin_message_id: int) -> int | None: with closing(db()) as conn: row = conn.execute( @@ -1139,8 +1482,10 @@ async def run_monitor(monitor: dict[str, Any]) -> int: name = monitor.get("name", "unnamed") mtype = monitor.get("type", "web") url = monitor.get("url") + started = time.time() if not url: logger.error("monitor %s missing url", name) + record_monitor_runtime(name, ok=False, duration_ms=int((time.time() - started) * 1000), sent_count=0, error="missing url") return 0 keywords = monitor.get("keywords") or [] timeout = int((config.get("http") or {}).get("timeout_seconds", 20)) @@ -1198,8 +1543,10 @@ async def run_monitor(monitor: dict[str, Any]) -> int: continue if await admin_send_monitor(text, name): sent_count += 1 - except Exception: + record_monitor_runtime(name, ok=True, duration_ms=int((time.time() - started) * 1000), sent_count=sent_count) + except Exception as e: logger.exception("monitor failed: %s %s", name, url) + record_monitor_runtime(name, ok=False, duration_ms=int((time.time() - started) * 1000), sent_count=sent_count, error=str(e)) return sent_count @@ -1345,13 +1692,34 @@ def login_page(error: str = "") -> str:
登录后管理 Telegram 机器人、关键词监控和提醒。
{err}