diff --git a/README.md b/README.md index 9206c99..d7d5b89 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,15 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 ## 更新日志 -### 2026-05-21 +### 2026-05-21 第二次更新 + +- Web 面板新增收件箱直接回复、用户管理、快捷回复、私聊广告拦截、监控推送历史、配置导入/导出。 +- 收件箱改为完整双向对话记录:用户消息、Web 回复、TG 管理员回复都会显示。 +- 用户管理页新增 Bot / 面板配置卡片,和设置页共用同一份配置;修改 Token、管理员 ID、端口、账号或密码后需要重启。 +- `ADMIN_CHAT_ID` 支持最多 3 个管理员,用逗号分隔。 +- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。 + +### 2026-05-21 第一次更新 - 默认启动改为先启动 Web 面板:未填写 `TELEGRAM_BOT_TOKEN` / `ADMIN_CHAT_ID` 时,面板仍可打开,同时 Telegram 收发、监控推送不可用。 - 面板配置页可填写 Bot Token、管理员 ID、面板账号和清理策略;保存后需要重启服务让 Bot 配置生效。 @@ -34,8 +42,13 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - `/unblock `:解封用户; - `/note <备注>`:给用户加备注; - `/who `:查看用户信息; + - `/spamwords`:查看广告关键词; + - `/spamadd <关键词>`:添加广告关键词; + - `/spamdel <关键词>`:删除广告关键词; - `/cancel`:取消待发送图片。 - 普通用户有简单限流,防止刷屏。 +- 支持最多 3 个管理员 chat id,用逗号分隔配置。 +- 支持私聊广告关键词自动拦截和自动拉黑,不影响 RSS/Web 监控。 ![示例图片](https://pic.gongyichuren.de/file/1779287173835_8521cab29a9635743a603582ceb7ba02.png) @@ -52,6 +65,7 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - 支持论坛 RSS 增强字段:作者、分类、tags、摘要。 - 支持去重,避免同一条反复推送。 - 支持屏蔽词、作者、分类过滤(YAML 高级配置)。 +- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。 - 默认最低监控间隔为 60 秒。 ![示例图片](https://pic.gongyichuren.de/file/1779287170665_17b7c8b4040d6334ea62a108d08db644.png) @@ -64,7 +78,11 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - 批量新增监控。 - YAML 高级编辑。 - Bot Token / 管理员 ID / 面板账号配置页。 -- 收件箱页面,可查看用户消息和重试转发。 +- 收件箱页面,可查看完整双向对话记录、重试转发、直接回复。 +- 用户管理页,可备注、封禁、解封、主动发消息,并可编辑 Bot / 面板配置。 +- 私聊广告拦截规则和快捷回复模板可在 Web 面板编辑。 +- 监控推送历史页,可查看 Telegram 推送和仅 Web 记录。 +- `config.yaml` 导入/导出页面,方便迁移。 - 主动发消息页面 `/send`,发送成功后会在页面显示结果,并给管理员聊天发送确认提醒。 - 自动清理监控/RSS/网站状态数据;支持定时删除 Telegram 监控通知消息;不会删除用户、收件箱、双向对话消息。 - 日志页面和健康检查 `/health`。 @@ -213,7 +231,7 @@ curl http://127.0.0.1:8765/health | 变量 | 说明 | |---|---| | `TELEGRAM_BOT_TOKEN` | BotFather 创建的 Telegram Bot Token | -| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id,用于接收用户消息和监控通知 | +| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id;最多 3 个,用逗号分隔 | | `LOG_LEVEL` | 日志级别,默认 `INFO` | | `WEB_PANEL_ENABLED` | 是否启用 Web 面板,默认 `true` | | `WEB_PANEL_HOST` | 面板监听地址,默认 `127.0.0.1` | @@ -224,6 +242,25 @@ curl http://127.0.0.1:8765/health ### `config.yaml` +Bot 扩展配置示例: + +```yaml +bot: + rate_limit: + window_seconds: 10 + max_messages: 3 + spam_filter: + enabled: true + auto_block: true + keywords: + - 投资 + - 博彩 + - 空投 + quick_replies: + - title: 已收到 + text: 你好,消息已收到,我稍后处理。 +``` + 监控数据自动清理示例: ```yaml @@ -267,6 +304,7 @@ monitors: new_item: true price_change: false stock_change: false + notify_telegram: true forum: true ``` @@ -291,6 +329,7 @@ monitors: new_item: true price_change: true stock_change: true + notify_telegram: true ``` ## 管理命令 @@ -304,6 +343,9 @@ monitors: /unblock /note <备注> /who +/spamwords +/spamadd <关键词> +/spamdel <关键词> /cancel ``` @@ -324,6 +366,11 @@ monitors: | `/settings` | `.env` 设置和监控清理策略 | | `/send` | 主动发消息给已私聊过 Bot 的用户 | | `/inbox` | 收件箱 | +| `/users` | 用户管理 | +| `/rules` | 私聊广告拦截规则 | +| `/replies` | 快捷回复模板 | +| `/monitor/events` | 监控推送历史 | +| `/config/export` | 导出 / 导入 `config.yaml` | | `/logs` | 日志 | | `/health` | 健康检查 | diff --git a/app.py b/app.py index 1aa2ec6..b228a82 100644 --- a/app.py +++ b/app.py @@ -60,6 +60,7 @@ logger = logging.getLogger("tg-watchbot") router = Router() bot: Bot | None = None admin_chat_id: int | None = None +admin_chat_ids: list[int] = [] config: dict[str, Any] = {} rate_buckets: dict[int, list[float]] = {} pending_sendpic: dict[int, dict[str, Any]] = {} @@ -161,6 +162,8 @@ def init_db() -> None: username TEXT, full_name TEXT, user_message_id INTEGER, + direction TEXT DEFAULT 'in', + source TEXT DEFAULT 'user', message_type TEXT, text TEXT, forwarded INTEGER DEFAULT 0, @@ -170,8 +173,25 @@ def init_db() -> None: forwarded_at TEXT, error TEXT ); + CREATE TABLE IF NOT EXISTS monitor_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + monitor_name TEXT NOT NULL, + title TEXT, + link TEXT, + reasons TEXT, + pushed INTEGER DEFAULT 0, + created_at TEXT NOT NULL + ); """ ) + for sql in [ + "ALTER TABLE inbox_messages ADD COLUMN direction TEXT DEFAULT 'in'", + "ALTER TABLE inbox_messages ADD COLUMN source TEXT DEFAULT 'user'", + ]: + try: + conn.execute(sql) + except sqlite3.OperationalError: + pass conn.commit() @@ -232,6 +252,21 @@ def set_block(user_id: int, blocked: bool) -> None: conn.commit() +def all_admin_chat_ids() -> list[int]: + if admin_chat_ids: + return list(dict.fromkeys(admin_chat_ids[:3])) + return [admin_chat_id] if admin_chat_id is not None else [] + + +def parse_admin_chat_ids(raw: str) -> list[int]: + ids: list[int] = [] + for part in re.split(r"[\s,;]+", raw.strip()): + if not part: + continue + ids.append(int(part)) + return list(dict.fromkeys(ids))[:3] + + def set_note(user_id: int, note: str) -> None: with closing(db()) as conn: conn.execute("UPDATE users SET note=?, updated_at=? WHERE user_id=?", (note, now_iso(), user_id)) @@ -275,6 +310,22 @@ def create_inbox_message(message: Message, user_id: int, full_name: str, usernam return int(cur.lastrowid) +def create_outbox_message(user_id: int, text: str, source: str, user_message_id: int | None = None) -> int: + row = get_user(user_id) + username = row["username"] if row else None + full_name = row["full_name"] if row else str(user_id) + with closing(db()) as conn: + cur = conn.execute( + """ + INSERT INTO inbox_messages(user_id, username, full_name, user_message_id, direction, source, message_type, text, forwarded, created_at, forwarded_at) + VALUES(?,?,?,?,?,?,?,?,?,?,?) + """, + (user_id, username, full_name, user_message_id, "out", source, "text", text, 1, now_iso(), now_iso()), + ) + conn.commit() + return int(cur.lastrowid) + + def mark_inbox_forwarded(inbox_id: int, header_id: int | None = None, copy_id: int | None = None) -> None: with closing(db()) as conn: conn.execute( @@ -295,6 +346,60 @@ def pending_inbox(limit: int = 50) -> list[sqlite3.Row]: return list(conn.execute("SELECT * FROM inbox_messages WHERE forwarded=0 ORDER BY id ASC LIMIT ?", (limit,)).fetchall()) +def get_inbox_message(inbox_id: int) -> sqlite3.Row | None: + with closing(db()) as conn: + return conn.execute("SELECT * FROM inbox_messages WHERE id=?", (inbox_id,)).fetchone() + + +def list_quick_replies() -> list[dict[str, str]]: + replies = (config.get("bot") or {}).get("quick_replies") or [] + return [r for r in replies if isinstance(r, dict)] + + +def spam_filter_settings() -> dict[str, Any]: + spam = (config.get("bot") or {}).get("spam_filter") or {} + return { + "enabled": bool(spam.get("enabled", False)), + "auto_block": bool(spam.get("auto_block", True)), + "keywords": [str(k) for k in spam.get("keywords") or [] if str(k).strip()], + } + + +def spam_keyword_hits(text: str) -> list[str]: + settings = spam_filter_settings() + if not settings["enabled"]: + return [] + return keyword_hits(text, settings["keywords"]) + + +def update_spam_keywords(action: str, word: str) -> list[str]: + cfg = cfg_load_fresh() + bot_cfg = cfg.setdefault("bot", {}) + spam = bot_cfg.setdefault("spam_filter", {"enabled": True, "auto_block": True, "keywords": []}) + words = [str(k).strip() for k in spam.get("keywords") or [] if str(k).strip()] + if action == "add" and word and word not in words: + words.append(word) + if action == "delete": + words = [k for k in words if k != word] + spam["keywords"] = words + spam.setdefault("enabled", True) + spam.setdefault("auto_block", True) + cfg_save(cfg) + return words + + +def record_monitor_event(monitor_name: str, title: str, link: str, reasons: list[str], pushed: bool) -> None: + with closing(db()) as conn: + conn.execute( + """ + INSERT INTO monitor_events(monitor_name, title, link, reasons, pushed, created_at) + VALUES(?,?,?,?,?,?) + """, + (monitor_name, title, link, "; ".join(reasons), 1 if pushed else 0, now_iso()), + ) + conn.commit() + + def lookup_reply_target(admin_chat: int, admin_message_id: int) -> int | None: with closing(db()) as conn: row = conn.execute( @@ -338,36 +443,50 @@ def describe_sendpic_target(user_id: int) -> str: async def admin_send(text: str) -> None: - if not bot or admin_chat_id is None: + if not bot or not all_admin_chat_ids(): logger.error("admin_send called before bot/admin init: %s", text) return - try: - await bot.send_message(admin_chat_id, text, disable_web_page_preview=False) - except Exception: - logger.exception("failed to send admin notification") + for chat_id in all_admin_chat_ids(): + try: + await bot.send_message(chat_id, text, disable_web_page_preview=False) + except Exception: + logger.exception("failed to send admin notification chat_id=%s", chat_id) async def admin_send_monitor(text: str, monitor_name: str) -> bool: - if not bot or admin_chat_id is None: + if not bot or not all_admin_chat_ids(): logger.error("admin_send_monitor called before bot/admin init: %s", text) return False - try: - sent = await bot.send_message(admin_chat_id, text, disable_web_page_preview=False) - settings = monitor_cleanup_settings() - record_monitor_message( - sent, - monitor_name, - int(settings["message_delete_after_minutes"]) * 60, - ) - return True - except Exception: - logger.exception("failed to send monitor notification") - return False + sent_any = False + for chat_id in all_admin_chat_ids(): + try: + sent = await bot.send_message(chat_id, text, disable_web_page_preview=False) + settings = monitor_cleanup_settings() + record_monitor_message( + sent, + monitor_name, + int(settings["message_delete_after_minutes"]) * 60, + ) + sent_any = True + except Exception: + logger.exception("failed to send monitor notification chat_id=%s", chat_id) + return sent_any + + +async def send_text_to_user(user_id: int, text: str, source: str = "web") -> int: + if is_blocked(user_id): + raise ValueError(f"用户 {user_id} 已被封禁") + if not bot: + raise RuntimeError("Bot 尚未初始化") + sent = await bot.send_message(user_id, text.strip()) + create_outbox_message(user_id, text.strip(), source, sent.message_id) + logger.info("sent message to user_id=%s message_id=%s", user_id, sent.message_id) + return int(sent.message_id) def is_admin_chat(message: Message) -> bool: """Dynamic admin-chat filter.""" - return admin_chat_id is not None and message.chat.id == admin_chat_id + return message.chat.id in all_admin_chat_ids() def is_admin_action_message(message: Message) -> bool: @@ -396,7 +515,7 @@ async def start(message: Message) -> None: async def send_text_to_user_from_admin(message: Message, args: str | None, command_name: str) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid, text = parse_user_id_and_text(args) @@ -409,8 +528,8 @@ async def send_text_to_user_from_admin(message: Message, args: str | None, comma if not bot: await message.reply("错误:Bot 尚未初始化") return - sent = await bot.send_message(uid, text) # type: ignore[union-attr] - await message.reply(f"{command_name} 成功:已发送给用户 {uid},message_id={sent.message_id}") + message_id = await send_text_to_user(uid, text, f"tg:{command_name}") + await message.reply(f"{command_name} 成功:已发送给用户 {uid},message_id={message_id}") except Exception as e: logger.exception("/%s failed", command_name) await message.reply(f"/{command_name} 失败:{e}\n用法:/{command_name} <内容>") @@ -428,7 +547,7 @@ async def cmd_send(message: Message, command: CommandObject) -> None: @router.message(Command("sendpic")) async def cmd_sendpic(message: Message, command: CommandObject) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid, caption = parse_user_id_and_optional_text(command.args) @@ -451,13 +570,13 @@ async def cmd_sendpic(message: Message, command: CommandObject) -> None: @router.message(Command("cancel")) async def cmd_cancel(message: Message) -> None: - if message.chat.id == admin_chat_id and pending_sendpic.pop(message.chat.id, None): + if is_admin_chat(message) and pending_sendpic.pop(message.chat.id, None): await message.reply("已取消待发送图片。") @router.message(Command("block")) async def cmd_block(message: Message, command: CommandObject) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid = parse_user_id(command.args) @@ -473,7 +592,7 @@ async def cmd_block(message: Message, command: CommandObject) -> None: @router.message(Command("unblock")) async def cmd_unblock(message: Message, command: CommandObject) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid = parse_user_id(command.args) @@ -489,7 +608,7 @@ async def cmd_unblock(message: Message, command: CommandObject) -> None: @router.message(Command("note")) async def cmd_note(message: Message, command: CommandObject) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid, note = parse_user_id_and_text(command.args) @@ -505,7 +624,7 @@ async def cmd_note(message: Message, command: CommandObject) -> None: @router.message(Command("who")) async def cmd_who(message: Message, command: CommandObject) -> None: - if message.chat.id != admin_chat_id: + if not is_admin_chat(message): return try: uid = parse_user_id(command.args) @@ -528,6 +647,39 @@ async def cmd_who(message: Message, command: CommandObject) -> None: await message.reply(f"/who 失败:{e}") +@router.message(Command("spamwords")) +async def cmd_spamwords(message: Message) -> None: + if not is_admin_chat(message): + return + words = spam_filter_settings()["keywords"] + text = "\n".join(f"- {html_escape(w)}" for w in words) or "暂无广告关键词" + await message.reply(f"广告关键词:\n{text}") + + +@router.message(Command("spamadd")) +async def cmd_spamadd(message: Message, command: CommandObject) -> None: + if not is_admin_chat(message): + return + word = (command.args or "").strip() + if not word: + await message.reply("用法:/spamadd 关键词") + return + words = update_spam_keywords("add", word) + await message.reply(f"已添加广告关键词:{html_escape(word)}\n当前共 {len(words)} 个。") + + +@router.message(Command("spamdel")) +async def cmd_spamdel(message: Message, command: CommandObject) -> None: + if not is_admin_chat(message): + return + word = (command.args or "").strip() + if not word: + await message.reply("用法:/spamdel 关键词") + return + words = update_spam_keywords("delete", word) + await message.reply(f"已删除广告关键词:{html_escape(word)}\n当前共 {len(words)} 个。") + + @router.message(is_admin_action_message) async def admin_reply_by_message(message: Message) -> None: # Pending /sendpic flow: after /sendpic , the next admin photo is copied to target. @@ -567,8 +719,8 @@ async def admin_reply_by_message(message: Message) -> None: if is_blocked(target): await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}") return - sent = await bot.send_message(target, message.text) # type: ignore[union-attr] - await message.reply(f"已发送给用户 {target},message_id={sent.message_id}") + message_id = await send_text_to_user(target, message.text, "tg:reply") + await message.reply(f"已发送给用户 {target},message_id={message_id}") except TelegramAPIError as e: logger.exception("admin reply forwarding failed") await message.reply(f"发送失败:{e}") @@ -590,7 +742,7 @@ async def admin_plain_message(message: Message) -> None: async def user_message(message: Message) -> None: # Only relay private user chats to admin. logger.info("incoming message chat_id=%s chat_type=%s from_user=%s content_type=%s text=%r", message.chat.id, message.chat.type, getattr(message.from_user, 'id', None), message.content_type, (message.text or '')[:80]) - if message.chat.id == admin_chat_id: + if is_admin_chat(message): logger.info("incoming message is admin plain message; ignored by user relay") return if message.chat.type != "private": @@ -607,6 +759,15 @@ async def user_message(message: Message) -> None: await message.answer("发送太快了,请稍后再试。") return inbox_id = create_inbox_message(message, uid, full, username) + spam_hits = spam_keyword_hits(message.text or message.caption or "") + if spam_hits and spam_filter_settings()["auto_block"]: + set_block(uid, True) + mark_inbox_error(inbox_id, "spam: " + ", ".join(spam_hits)) + await admin_send( + f"[垃圾消息已拉黑]\nuser_id: {uid}\n命中:{html_escape(', '.join(spam_hits))}\n内容:{html_escape((message.text or message.caption or '')[:300])}" + ) + await message.answer("消息已被系统拦截。") + return user_row = get_user(uid) note = user_row["note"] if user_row and "note" in user_row.keys() else "" header = ( @@ -618,11 +779,16 @@ async def user_message(message: Message) -> None: f"time: {html_escape(now_iso())}" ) try: - sent = await bot.send_message(admin_chat_id, header) # type: ignore[union-attr] - save_message_map(sent, uid, message.message_id) - copied = await message.copy_to(admin_chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type] - save_message_map(copied, uid, message.message_id) - mark_inbox_forwarded(inbox_id, sent.message_id, copied.message_id) + first_header_id = None + first_copy_id = None + for chat_id in all_admin_chat_ids(): + sent = await bot.send_message(chat_id, header) # type: ignore[union-attr] + save_message_map(sent, uid, message.message_id) + copied = await message.copy_to(chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type] + save_message_map(copied, uid, message.message_id) + first_header_id = first_header_id or sent.message_id + first_copy_id = first_copy_id or copied.message_id + mark_inbox_forwarded(inbox_id, first_header_id, first_copy_id) await message.answer("已转交管理员。") except Exception as e: mark_inbox_error(inbox_id, repr(e)) @@ -900,6 +1066,7 @@ async def run_monitor(monitor: dict[str, Any]) -> int: event_key = stable_key(name, item.key) if is_forum else stable_key(name, item.key, "|".join(reasons), item.price or "", item.stock or "") if not event_not_sent(event_key, name, item.title, item.link): continue + notify_on_tg = bool(monitor.get("notify_telegram", True)) if is_forum: text = ( f"[新帖命中] {html_escape(name)}\n" @@ -921,6 +1088,10 @@ async def run_monitor(monitor: dict[str, Any]) -> int: f"库存:{html_escape(item.stock or '-')}\n" f"时间:{html_escape(now_iso())}" ) + record_monitor_event(name, item.title, item.link, reasons, notify_on_tg) + if not notify_on_tg: + sent_count += 1 + continue if await admin_send_monitor(text, name): sent_count += 1 except Exception: @@ -969,7 +1140,7 @@ async def cleanup_monitor_loop() -> None: async def flush_pending_inbox() -> None: - if not bot or admin_chat_id is None: + if not bot or not all_admin_chat_ids(): return rows = pending_inbox(50) if not rows: @@ -987,9 +1158,12 @@ async def flush_pending_inbox() -> None: f"时间: {html_escape(row['created_at'])}\n\n" f"内容:{html_escape(row['text'] or '(非文本/媒体消息,原始媒体无法补发,仅保留记录)')}" ) - sent = await bot.send_message(admin_chat_id, text) - save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None) - mark_inbox_forwarded(int(row['id']), sent.message_id, None) + first_id = None + for chat_id in all_admin_chat_ids(): + sent = await bot.send_message(chat_id, text) + save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None) + first_id = first_id or sent.message_id + mark_inbox_forwarded(int(row['id']), first_id, None) except Exception as e: mark_inbox_error(int(row['id']), repr(e)) logger.exception("failed to flush inbox message id=%s", row['id']) @@ -1169,6 +1343,7 @@ def monitor_from_form( new_item: bool, price_change: bool, stock_change: bool, + notify_telegram: bool = True, ) -> dict[str, Any]: m: dict[str, Any] = { "name": name.strip(), @@ -1176,6 +1351,7 @@ def monitor_from_form( "url": url.strip(), "interval_seconds": max(int(interval_seconds or MIN_INTERVAL_SECONDS), MIN_INTERVAL_SECONDS), "keywords": parse_lines(keywords), + "notify_telegram": notify_telegram, "notify_on": { "keyword_match": keyword_match, "new_item": new_item, @@ -1208,13 +1384,13 @@ def layout(title: str, body: str) -> str:

{html_escape(title)}

WatchBot Panel
+""" @@ -1243,7 +1419,8 @@ def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -
-
+ +
取消
""" @@ -1284,8 +1461,9 @@ def create_panel_app() -> FastAPI: cfg = cfg_load_fresh() rows = [] for i, m in enumerate(cfg.get("monitors") or []): - rows.append(f"""{html_escape(m.get('type','web'))}{html_escape(m.get('name',''))}
{html_escape(m.get('url',''))}{html_escape(m.get('interval_seconds',60))}s{html_escape(', '.join(m.get('keywords') or []))}编辑 预览 检查 删除""") - body = f"""

监控目标

当前 {len(cfg.get('monitors') or [])} 个;保存后自动重载定时任务。

""" + "".join(rows) + "
类型目标间隔关键词操作
" + tg = "TG" if m.get("notify_telegram", True) else "仅 Web" + rows.append(f"""{html_escape(m.get('type','web'))}{html_escape(m.get('name',''))}
{html_escape(m.get('url',''))}{html_escape(m.get('interval_seconds',60))}s
{tg}{html_escape(', '.join(m.get('keywords') or []))}编辑 预览 检查 删除""") + body = f"""

监控目标

当前 {len(cfg.get('monitors') or [])} 个;保存后自动重载定时任务。

""" + "".join(rows) + "
类型目标间隔/通知关键词操作
" return layout("监控", body) @app.get("/monitor/new", response_class=HTMLResponse) @@ -1318,7 +1496,7 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" return layout("批量新增", body) @app.post("/monitor/bulk") - async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)): + async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form("on")): cfg = cfg_load_fresh() monitors = cfg.setdefault("monitors", []) added = 0 @@ -1334,7 +1512,7 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" name, url = parts[0], parts[1] keywords = parts[2] if len(parts) >= 3 else "" try: - monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change))) + monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram))) added += 1 except Exception as e: errors.append(f"第 {line_no} 行失败:{html_escape(e)}") @@ -1370,10 +1548,11 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" new_item: str | None, price_change: str | None, stock_change: str | None, + notify_telegram: str | None, ) -> RedirectResponse: cfg = cfg_load_fresh() monitors = cfg.setdefault("monitors", []) - m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change)) + m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram)) if original_index is None: monitors.append(m) else: @@ -1386,12 +1565,12 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" return RedirectResponse("/", status_code=303) @app.post("/monitor/create") - async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: - return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram) @app.post("/monitor/save") - async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: - return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram) @app.get("/monitor/{idx}/delete") async def delete_monitor(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse: @@ -1459,28 +1638,37 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" status = "" if bot_ready else "
未填写 Token 或管理员 ID;网页可用,但 Bot 和监控推送不可用。
" body = f"""

Bot / 面板设置

{status}
- +

监控数据自动清理

删除过期监控通知消息,并清理 RSS/网站监控状态和去重记录;不会删除用户、收件箱、双向对话消息。

改 Token、管理员 ID 或端口后需要重启。
""" return layout("设置", body) - @app.post("/settings", response_class=HTMLResponse) - async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: - write_env_values(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}) + def save_panel_settings( + values: dict[str, Any], + cleanup_interval_minutes: int, + cleanup_message_delete_after_minutes: int, + cleanup_retention_minutes: int, + ) -> None: + write_env_values(values) cfg = cfg_load_fresh() cfg["cleanup"] = { "enabled": True, - "interval_minutes": max(1, int(CLEANUP_INTERVAL_MINUTES)), - "monitor_message_delete_after_minutes": max(1, int(CLEANUP_MESSAGE_DELETE_AFTER_MINUTES)), - "monitor_retention_minutes": max(1, int(CLEANUP_RETENTION_MINUTES)), + "interval_minutes": max(1, int(cleanup_interval_minutes)), + "monitor_message_delete_after_minutes": max(1, int(cleanup_message_delete_after_minutes)), + "monitor_retention_minutes": max(1, int(cleanup_retention_minutes)), } cfg_save(cfg) + + @app.post("/settings", response_class=HTMLResponse) + async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: + save_panel_settings(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, CLEANUP_INTERVAL_MINUTES, CLEANUP_MESSAGE_DELETE_AFTER_MINUTES, CLEANUP_RETENTION_MINUTES) return layout("已保存", "
已保存,不会自动重启;修改 Token/管理员 ID 后请重启。

返回 重启机器人

") @app.get("/send", response_class=HTMLResponse) - async def send_page(_: str = Depends(panel_auth)) -> str: + async def send_page(request: Request, _: str = Depends(panel_auth)) -> str: + selected_user_id = request.query_params.get("user_id", "") with closing(db()) as conn: users = conn.execute( "SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 200" @@ -1490,7 +1678,8 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" blocked = "(已封禁)" if u["blocked"] else "" username = f"@{u['username']}" if u["username"] else "" label = f"{u['full_name'] or u['user_id']} {username} · {u['user_id']} {blocked}" - options.append(f"") + selected = "selected" if str(u["user_id"]) == selected_user_id else "" + options.append(f"") body = f"""

主动发消息

只能发送给已经私聊过 Bot 的用户。

@@ -1511,12 +1700,9 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" return layout("发送失败", f"
找不到用户 {uid},对方需要先私聊 Bot。

返回

") if is_blocked(uid): return layout("发送失败", f"
用户 {uid} 已被封禁,请先 /unblock。

返回

") - if not bot: - return layout("发送失败", "
Bot 尚未初始化;请确认服务以正常模式运行,而不是 --panel-only。

返回

") - sent = await bot.send_message(uid, text.strip()) - logger.info("panel sent message to user_id=%s message_id=%s", uid, sent.message_id) - await admin_send(f"[主动发送成功]\nuser_id: {uid}\nmessage_id: {sent.message_id}\n时间:{html_escape(now_iso())}") - return layout("发送成功", f"
已发送给用户 {uid},message_id={sent.message_id}。Bot 也已给管理员发送确认提醒。

继续发送 收件箱

") + message_id = await send_text_to_user(uid, text.strip(), "web:send") + await admin_send(f"[主动发送成功]\nuser_id: {uid}\nmessage_id: {message_id}\n时间:{html_escape(now_iso())}") + return layout("发送成功", f"
已发送给用户 {uid},message_id={message_id}。Bot 也已给管理员发送确认提醒。

继续发送 收件箱

") except TelegramAPIError as e: logger.exception("panel send failed") return layout("发送失败", f"
{html_escape(e)}

返回

") @@ -1530,11 +1716,22 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" rows = conn.execute("SELECT * FROM inbox_messages ORDER BY id DESC LIMIT 200").fetchall() trs = [] for r in rows: - status_txt = "已转发" if r["forwarded"] else "未转发" - status_cls = "ok" if r["forwarded"] else "danger" + direction = r["direction"] if "direction" in r.keys() else "in" + source = r["source"] if "source" in r.keys() else "user" + if direction == "out": + status_txt, status_cls = "已回复", "ok" + elif (r["error"] or "").startswith("spam:"): + status_txt, status_cls = "已拦截", "danger" + else: + status_txt = "已转发" if r["forwarded"] else "未转发" + status_cls = "ok" if r["forwarded"] else "danger" content = html_escape(r["text"] or "(非文本/媒体消息)") - trs.append(f"""#{r['id']}
{status_txt}{html_escape(r['full_name'])}
{r['user_id']} @{html_escape(r['username'] or '')}{html_escape(r['message_type'])}
{html_escape(r['created_at'])}{content}
{html_escape(r['error'] or '')}重试转发""") - body = "

收件箱

用户消息会先写入 SQLite,再转发给管理员;转发失败的消息可以在这里重试。

" + "".join(trs) + "
ID/状态用户类型/时间内容/错误操作
" + flow = "用户 -> 管理员" if direction == "in" else "管理员 -> 用户" + actions = f"回复" + if direction == "in" and not r["forwarded"]: + actions += f" 重试转发" + trs.append(f"""#{r['id']}
{status_txt}{html_escape(r['full_name'])}
{r['user_id']} @{html_escape(r['username'] or '')}{html_escape(flow)}
{html_escape(source)} · {html_escape(r['created_at'])}{content}
{html_escape(r['error'] or '')}{actions}""") + body = "

收件箱

这里显示双向机器人对话记录:用户发来的消息、Web 回复、TG 管理员回复都会记录。转发失败的入站消息可重试。

" + "".join(trs) + "
ID/状态用户方向/来源内容/错误操作
" return layout("收件箱", body) @app.get("/inbox/{msg_id}/retry") @@ -1545,6 +1742,154 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" await flush_pending_inbox() return RedirectResponse("/inbox", status_code=303) + @app.get("/inbox/{msg_id}/reply", response_class=HTMLResponse) + async def inbox_reply_page(msg_id: int, _: str = Depends(panel_auth)) -> str: + row = get_inbox_message(msg_id) + if not row: + raise HTTPException(404, "message not found") + options = "".join( + f"" + for r in list_quick_replies() + ) + body = f"""

回复用户

#{row['id']} · {html_escape(row['full_name'])} · {row['user_id']}

{html_escape(row['text'] or '(非文本/媒体消息)')}
+ + +
返回
""" + return layout("回复用户", body) + + @app.post("/inbox/{msg_id}/reply", response_class=HTMLResponse) + async def inbox_reply_save(msg_id: int, _: str = Depends(panel_auth), text: str = Form("")) -> str: + row = get_inbox_message(msg_id) + if not row: + raise HTTPException(404, "message not found") + try: + message_id = await send_text_to_user(int(row["user_id"]), text, "web:inbox") + await admin_send(f"[Web 回复成功]\nuser_id: {row['user_id']}\nmessage_id: {message_id}") + return layout("回复成功", f"
已回复用户 {row['user_id']}。

返回收件箱

") + except Exception as e: + return layout("回复失败", f"
{html_escape(e)}

返回

") + + @app.get("/users", response_class=HTMLResponse) + async def users_page(_: str = Depends(panel_auth)) -> str: + v = env_values() + with closing(db()) as conn: + rows = conn.execute("SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 300").fetchall() + trs = [] + for u in rows: + status_txt = "封禁" if u["blocked"] else "正常" + action = "unblock" if u["blocked"] else "block" + action_txt = "解封" if u["blocked"] else "封禁" + trs.append(f"""{html_escape(u['full_name'] or u['user_id'])}
{u['user_id']} @{html_escape(u['username'] or '')}{status_txt}
{html_escape(u['updated_at'])}{html_escape(u['note'] or '')}
""") + settings_card = f"""

Bot / 面板配置

这里和“Bot / 面板设置”共用同一份 .env。修改 Token、管理员 ID、端口、账号或密码后不会自动重启,需要手动重启服务。

+ + +
+
重启机器人
""" + body = settings_card + "

用户管理

" + "".join(trs) + "
用户状态备注操作
" + return layout("用户管理", body) + + @app.post("/users/settings", response_class=HTMLResponse) + async def users_settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin")) -> str: + cleanup = (cfg_load_fresh().get("cleanup") or {}) + save_panel_settings( + locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, + int(cleanup.get("interval_minutes", 60)), + int(cleanup.get("monitor_message_delete_after_minutes", 60)), + int(cleanup.get("monitor_retention_minutes", 1440)), + ) + return layout("已保存", "
已保存,不会自动重启;修改 Token、管理员 ID、端口、账号或密码后请重启。

返回用户管理 重启机器人

") + + @app.post("/users/{user_id}/note") + async def user_note_save(user_id: int, _: str = Depends(panel_auth), note: str = Form("")) -> RedirectResponse: + set_note(user_id, note.strip()) + return RedirectResponse("/users", status_code=303) + + @app.get("/users/{user_id}/block") + async def user_block(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + set_block(user_id, True) + return RedirectResponse("/users", status_code=303) + + @app.get("/users/{user_id}/unblock") + async def user_unblock(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + set_block(user_id, False) + return RedirectResponse("/users", status_code=303) + + @app.get("/rules", response_class=HTMLResponse) + async def rules_page(_: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh() + spam = (cfg.get("bot") or {}).get("spam_filter") or {} + keywords = "\n".join(spam.get("keywords") or []) + body = f"""

私聊广告拦截

只拦截用户私聊 Bot 的双向对话消息,不影响 RSS/Web 监控关键词。监控内容过滤请使用监控配置里的屏蔽词。

+
+ +
""" + return layout("拦截规则", body) + + @app.post("/rules") + async def rules_save(_: str = Depends(panel_auth), enabled: str | None = Form(None), auto_block: str | None = Form(None), keywords: str = Form("")) -> RedirectResponse: + cfg = cfg_load_fresh() + bot_cfg = cfg.setdefault("bot", {}) + bot_cfg["spam_filter"] = { + "enabled": bool(enabled), + "auto_block": bool(auto_block), + "keywords": parse_lines(keywords), + } + cfg_save(cfg) + return RedirectResponse("/rules", status_code=303) + + @app.get("/replies", response_class=HTMLResponse) + async def replies_page(_: str = Depends(panel_auth)) -> str: + replies = list_quick_replies() + rows = [] + for i, r in enumerate(replies): + rows.append(f"""{i + 1}{html_escape(r.get('title',''))}{html_escape(r.get('text',''))}删除""") + body = """

快捷回复

""" + "".join(rows) + """
#标题内容操作

新增模板

""" + return layout("快捷回复", body) + + @app.post("/replies") + async def replies_save(_: str = Depends(panel_auth), title: str = Form(""), text: str = Form("")) -> RedirectResponse: + cfg = cfg_load_fresh() + bot_cfg = cfg.setdefault("bot", {}) + replies = bot_cfg.setdefault("quick_replies", []) + replies.append({"title": title.strip(), "text": text.strip()}) + cfg_save(cfg) + return RedirectResponse("/replies", status_code=303) + + @app.get("/replies/{idx}/delete") + async def replies_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse: + cfg = cfg_load_fresh() + replies = (cfg.get("bot") or {}).get("quick_replies") or [] + if 0 <= idx < len(replies): + replies.pop(idx) + cfg_save(cfg) + return RedirectResponse("/replies", status_code=303) + + @app.get("/monitor/events", response_class=HTMLResponse) + async def monitor_events(_: str = Depends(panel_auth)) -> str: + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM monitor_events ORDER BY id DESC LIMIT 300").fetchall() + trs = [] + for r in rows: + status_txt = "已推 TG" if r["pushed"] else "仅 Web" + trs.append(f"""#{r['id']}
{status_txt}{html_escape(r['monitor_name'])}
{html_escape(r['created_at'])}{html_escape(r['title'])}
{html_escape(r['link'])}{html_escape(r['reasons'])}""") + body = "

监控推送历史

" + "".join(trs) + "
ID/状态监控条目原因
" + return layout("推送历史", body) + + @app.get("/config/export", response_class=HTMLResponse) + async def config_export(_: str = Depends(panel_auth)) -> str: + content = CONFIG_PATH.read_text(encoding="utf-8") if CONFIG_PATH.exists() else "" + body = f"""

导出 / 导入配置

只处理 config.yaml,不包含 Token、密码和 Session Secret。

YAML 高级编辑
""" + return layout("导出配置", body) + + @app.post("/config/import", response_class=HTMLResponse) + async def config_import(_: str = Depends(panel_auth), content: str = Form("")) -> str: + try: + data = yaml.safe_load(content) or {} + cfg_save(data) + return layout("导入完成", "
配置已导入并重载。

返回

") + except Exception as e: + return layout("导入失败", f"
{html_escape(e)}

返回

") + @app.get("/restart", response_class=HTMLResponse) async def restart_page(_: str = Depends(panel_auth)) -> str: body = """

重启机器人

用于修改 Token、管理员 ID、面板设置等需要重启生效的配置。

""" @@ -1590,7 +1935,10 @@ def validate_env() -> tuple[str, int]: raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}") if not admin: raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}") - return token, int(admin) + ids = parse_admin_chat_ids(admin) + if not ids: + raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}") + return token, ids[0] def bot_env_configured() -> bool: @@ -1599,7 +1947,7 @@ def bot_env_configured() -> bool: async def main_async(run_once: bool = False, panel_only: bool = False) -> None: - global bot, admin_chat_id, config, scheduler_ref + global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref load_dotenv(ENV_PATH, override=True) config = load_config() setup_logging(os.getenv("LOG_LEVEL", "INFO")) @@ -1613,6 +1961,7 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: # If .env is filled, send notifications during manual test; otherwise just log. try: token, admin_chat_id = validate_env() + admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) except Exception as e: logger.warning("run-once without Telegram notification: %s", e) @@ -1628,6 +1977,7 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: while True: await asyncio.sleep(3600) token, admin_chat_id = validate_env() + admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) dp = Dispatcher() dp.include_router(router) diff --git a/config.example.yaml b/config.example.yaml index a61606b..1d12a47 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -6,6 +6,16 @@ bot: rate_limit: window_seconds: 10 max_messages: 3 + spam_filter: + enabled: false + auto_block: true + keywords: + - 投资 + - 博彩 + - 空投 + quick_replies: + - title: 已收到 + text: 你好,消息已收到,我稍后处理。 monitors: - name: NodeSeek 新帖 type: rss @@ -23,6 +33,7 @@ monitors: new_item: true price_change: false stock_change: false + notify_telegram: true forum: true - name: Linux.do 最新 type: rss @@ -40,6 +51,7 @@ monitors: new_item: true price_change: false stock_change: false + notify_telegram: true forum: true cleanup: enabled: true diff --git a/tests/test_monitor_message_cleanup.py b/tests/test_monitor_message_cleanup.py index 9f52db2..8801121 100644 --- a/tests/test_monitor_message_cleanup.py +++ b/tests/test_monitor_message_cleanup.py @@ -43,6 +43,8 @@ def install_import_stubs() -> None: modules["apscheduler.schedulers.asyncio"].AsyncIOScheduler = object modules["bs4"].BeautifulSoup = object modules["dotenv"].load_dotenv = lambda *args, **kwargs: None + modules["yaml"].safe_load = lambda stream: {"bot": {"spam_filter": {"enabled": True, "keywords": []}}} + modules["yaml"].safe_dump = lambda data, **kwargs: str(data) modules["aiogram"].Bot = object modules["aiogram"].Dispatcher = object modules["aiogram"].F = object() @@ -76,11 +78,16 @@ class FakeBot: def __init__(self) -> None: self.deleted: list[tuple[int, int]] = [] self.sent_texts: list[str] = [] + self.sent_chat_ids: list[int] = [] + self.fail_chat_ids: set[int] = set() async def delete_message(self, chat_id: int, message_id: int) -> None: self.deleted.append((chat_id, message_id)) async def send_message(self, chat_id: int, text: str, disable_web_page_preview: bool = False): + if chat_id in self.fail_chat_ids: + raise RuntimeError("send failed") + self.sent_chat_ids.append(chat_id) self.sent_texts.append(text) return SimpleNamespace(chat=SimpleNamespace(id=chat_id), message_id=3003) @@ -99,10 +106,12 @@ class MonitorMessageCleanupTest(unittest.TestCase): def test_monitor_notification_send_is_recorded_for_later_deletion(self) -> None: old_bot = app.bot old_admin_chat_id = app.admin_chat_id + old_admin_chat_ids = app.admin_chat_ids old_config = app.config fake_bot = FakeBot() app.bot = fake_bot app.admin_chat_id = 1001 + app.admin_chat_ids = [] app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}} try: sent = asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖")) @@ -116,8 +125,57 @@ class MonitorMessageCleanupTest(unittest.TestCase): finally: app.bot = old_bot app.admin_chat_id = old_admin_chat_id + app.admin_chat_ids = old_admin_chat_ids app.config = old_config + def test_monitor_event_history_is_recorded(self) -> None: + app.record_monitor_event("NodeSeek 新帖", "title", "https://example.com", ["关键词"], False) + with closing(sqlite3.connect(app.DB_PATH)) as conn: + row = conn.execute("SELECT monitor_name, title, pushed FROM monitor_events").fetchone() + self.assertEqual(("NodeSeek 新帖", "title", 0), row) + + def test_monitor_notification_is_sent_to_all_admins(self) -> None: + old_bot = app.bot + old_admin_chat_ids = app.admin_chat_ids + old_config = app.config + fake_bot = FakeBot() + app.bot = fake_bot + app.admin_chat_ids = [1001, 1002, 1003] + app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}} + try: + self.assertTrue(asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖"))) + self.assertEqual([1001, 1002, 1003], fake_bot.sent_chat_ids) + finally: + app.bot = old_bot + app.admin_chat_ids = old_admin_chat_ids + app.config = old_config + + def test_monitor_notification_continues_when_one_admin_fails(self) -> None: + old_bot = app.bot + old_admin_chat_ids = app.admin_chat_ids + old_config = app.config + fake_bot = FakeBot() + fake_bot.fail_chat_ids.add(1002) + app.bot = fake_bot + app.admin_chat_ids = [1001, 1002, 1003] + app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}} + try: + with self.assertLogs("tg-watchbot", level="ERROR"): + self.assertTrue(asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖"))) + self.assertEqual([1001, 1003], fake_bot.sent_chat_ids) + finally: + app.bot = old_bot + app.admin_chat_ids = old_admin_chat_ids + app.config = old_config + + def test_outbound_message_is_recorded_in_conversation_log(self) -> None: + app.upsert_user(2001, "User", "user") + outbox_id = app.create_outbox_message(2001, "reply text", "web:inbox", 4004) + with closing(sqlite3.connect(app.DB_PATH)) as conn: + conn.row_factory = sqlite3.Row + row = conn.execute("SELECT direction, source, text, forwarded FROM inbox_messages WHERE id=?", (outbox_id,)).fetchone() + self.assertEqual(("out", "web:inbox", "reply text", 1), (row["direction"], row["source"], row["text"], row["forwarded"])) + def test_expired_monitor_message_is_deleted_and_removed_from_queue(self) -> None: sent_message = SimpleNamespace(chat=SimpleNamespace(id=1001), message_id=2002) app.record_monitor_message(sent_message, "NodeSeek 新帖", delete_after_seconds=60, sent_at_ts=1000) @@ -146,6 +204,9 @@ class MonitorMessageCleanupTest(unittest.TestCase): class BotConfigurationTest(unittest.TestCase): + def test_parse_admin_chat_ids_keeps_unique_first_three(self) -> None: + self.assertEqual([1, 2, 3], app.parse_admin_chat_ids("1,2 2;3,4")) + def test_bot_is_not_configured_without_token_or_admin_chat_id(self) -> None: old_token = os.environ.pop("TELEGRAM_BOT_TOKEN", None) old_admin = os.environ.pop("ADMIN_CHAT_ID", None) @@ -217,9 +278,68 @@ class PanelHtmlContractTest(unittest.TestCase): "name=new_item", "name=price_change", "name=stock_change", + "name=notify_telegram", ]: self.assertIn(expected, html) + def test_monitor_form_can_disable_telegram_notification(self) -> None: + monitor = { + "type": "rss", + "interval_seconds": 60, + "notify_telegram": False, + "notify_on": {"keyword_match": True}, + } + html = app.monitor_form_html(monitor) + self.assertIn("name=notify_telegram", html) + self.assertNotIn("name=notify_telegram checked", html) + + def test_layout_groups_navigation_by_domain(self) -> None: + html = app.layout("测试", "

ok

") + for expected in ["消息", "监控", "配置", "系统", "私聊广告拦截"]: + self.assertIn(expected, html) + + def test_inbox_copy_describes_two_way_conversation(self) -> None: + source = Path("app.py").read_text(encoding="utf-8") + self.assertIn("这里显示双向机器人对话记录", source) + self.assertIn("管理员 -> 用户", source) + + def test_users_page_keeps_shared_settings_form(self) -> None: + source = Path("app.py").read_text(encoding="utf-8") + self.assertIn("action='/users/settings'", source) + self.assertIn("这里和“Bot / 面板设置”共用同一份 .env", source) + + +class SpamAndTemplateConfigTest(unittest.TestCase): + def test_spam_keyword_hits_follow_config(self) -> None: + old_config = app.config + app.config = {"bot": {"spam_filter": {"enabled": True, "keywords": ["博彩", "投资"]}}} + try: + self.assertEqual(["博彩"], app.spam_keyword_hits("这里有博彩广告")) + finally: + app.config = old_config + + def test_quick_replies_are_loaded_from_config(self) -> None: + old_config = app.config + app.config = {"bot": {"quick_replies": [{"title": "收到", "text": "稍后处理"}]}} + try: + self.assertEqual("收到", app.list_quick_replies()[0]["title"]) + finally: + app.config = old_config + + def test_update_spam_keywords_writes_config(self) -> None: + with tempfile.TemporaryDirectory() as temp_dir: + old_config_path = app.CONFIG_PATH + old_config = app.config + app.CONFIG_PATH = Path(temp_dir) / "config.yaml" + app.CONFIG_PATH.write_text("bot:\n spam_filter:\n enabled: true\n keywords: []\n", encoding="utf-8") + app.config = {"bot": {"spam_filter": {"enabled": True, "keywords": []}}} + try: + self.assertEqual(["广告"], app.update_spam_keywords("add", "广告")) + self.assertEqual([], app.update_spam_keywords("delete", "广告")) + finally: + app.CONFIG_PATH = old_config_path + app.config = old_config + if __name__ == "__main__": unittest.main()