From a53bb099648a3ab54265826356105cc33a803b5a Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Wed, 20 May 2026 15:39:02 +0800 Subject: [PATCH] Initial commit --- .env.example | 16 + .gitignore | 13 + LICENSE | 21 + README.md | 287 +++++++ app.py | 1524 +++++++++++++++++++++++++++++++++++ config.example.yaml | 47 ++ requirements.txt | 10 + systemd/tg-watchbot.service | 22 + 8 files changed, 1940 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 app.py create mode 100644 config.example.yaml create mode 100644 requirements.txt create mode 100644 systemd/tg-watchbot.service diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..39601e9 --- /dev/null +++ b/.env.example @@ -0,0 +1,16 @@ +# tg-watchbot environment +# Create a bot with @BotFather, then put the token here. +TELEGRAM_BOT_TOKEN= + +# Telegram numeric chat id that receives monitor notifications and user messages. +ADMIN_CHAT_ID= + +LOG_LEVEL=INFO + +# Web admin panel. Bind to localhost by default; use a reverse proxy / Cloudflare Tunnel if exposing it. +WEB_PANEL_ENABLED=true +WEB_PANEL_HOST=127.0.0.1 +WEB_PANEL_PORT=8765 +WEB_PANEL_USER=admin +WEB_PANEL_PASSWORD=change-me +WEB_PANEL_SESSION_SECRET= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..32f3211 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.env +config.yaml +memory.md +*.log +*.sqlite3 +*.sqlite3-* +__pycache__/ +*.pyc +.venv/ +venv/ +.pytest_cache/ +.ruff_cache/ +.DS_Store diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..14fac91 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..775a628 --- /dev/null +++ b/README.md @@ -0,0 +1,287 @@ +# tg-watchbot + +tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人** 和 **Web/RSS 监控推送** 合在一起: + +- 普通用户私聊 Bot,消息会转发给管理员; +- 管理员可以直接回复、主动发文字/图片、封禁/备注用户; +- 后台定时监控 RSS 或网页,命中关键词、新条目、价格/库存变化后推送给管理员; +- 自带一个 Web 管理面板,可配置监控目标、编辑 YAML、查看收件箱和日志。 + +项目为单文件应用,适合个人服务器、NAT 小鸡、轻量 VPS 直接用 systemd 跑。 + +## 功能 + +### Telegram 双向机器人 + +- 使用官方 Telegram Bot API,不做 userbot/selfbot。 +- `/start` 建立用户和管理员之间的联系。 +- 用户消息先写入 SQLite,再转发给管理员,避免转发失败时丢消息。 +- 管理员可通过“回复转发消息”直接回给原用户。 +- 支持显式命令: + - `/reply <内容>`:给指定用户发文字; + - `/sendpic [说明]`:给指定用户发图片; + - `/block `:封禁用户; + - `/unblock `:解封用户; + - `/note <备注>`:给用户加备注; + - `/who `:查看用户信息; + - `/cancel`:取消待发送图片。 +- 普通用户有简单限流,防止刷屏。 + +### Web/RSS 监控 + +- 支持两类监控: + - `rss`:解析 RSS/Atom 条目; + - `web`:用 CSS selector 抓网页条目、标题、链接、价格、库存。 +- 支持触发条件: + - 关键词命中; + - 新条目; + - 价格变化; + - 库存变化。 +- 支持论坛 RSS 增强字段:作者、分类、tags、摘要。 +- 支持去重,避免同一条反复推送。 +- 支持屏蔽词、作者、分类过滤(YAML 高级配置)。 +- 默认最低监控间隔为 60 秒。 + +### Web 管理面板 + +- 登录页 + HttpOnly session cookie,不使用丑陋的浏览器 Basic Auth。 +- 监控列表、新增、编辑、删除、手动检查、预览。 +- NodeSeek / Linux.do RSS 模板。 +- 批量新增监控。 +- YAML 高级编辑。 +- Bot Token / 管理员 ID / 面板账号配置页。 +- 收件箱页面,可查看用户消息和重试转发。 +- 主动发消息页面 `/send`,发送成功后会在页面显示结果,并给管理员聊天发送确认提醒。 +- 自动清理监控/RSS/网站状态数据;不会删除用户、收件箱、双向对话消息。 +- 日志页面和健康检查 `/health`。 + +## 借鉴 / 使用的开源库 + +本项目的业务逻辑为自写,主要使用并参考了以下开源库的公开 API 和常见用法: + +- [`aiogram`](https://github.com/aiogram/aiogram):Telegram Bot API、命令、消息处理、复制/发送消息。 +- [`FastAPI`](https://github.com/fastapi/fastapi):Web 管理面板、表单、路由、中间件。 +- [`Uvicorn`](https://github.com/encode/uvicorn):ASGI 服务运行。 +- [`APScheduler`](https://github.com/agronholm/apscheduler):异步定时监控任务。 +- [`httpx`](https://github.com/encode/httpx):异步 HTTP 抓取。 +- [`feedparser`](https://github.com/kurtmckee/feedparser):RSS/Atom 解析。 +- [`Beautiful Soup`](https://www.crummy.com/software/BeautifulSoup/):HTML 解析和 CSS selector 抽取。 +- [`PyYAML`](https://pyyaml.org/):`config.yaml` 配置读写。 +- [`python-dotenv`](https://github.com/theskumar/python-dotenv):读取 `.env`。 +- Python 标准库 `sqlite3`:消息、用户、去重、监控状态持久化。 + +## 安全 / 脱敏说明 + +- 仓库不包含真实 `TELEGRAM_BOT_TOKEN`、`ADMIN_CHAT_ID`、面板密码、Session Secret、数据库、日志。 +- 真实配置放在 `.env`,不要提交。 +- `.gitignore` 已忽略 `.env`、SQLite、日志、虚拟环境和缓存。 +- 如果要把面板暴露到公网,建议使用 Cloudflare Access / 反代鉴权,并使用强密码。 +- Bot 只能给“已经主动私聊过 Bot 的用户”发消息,这是 Telegram Bot API 的限制。 + +## 快速开始 + +```bash +git clone tg-watchbot +cd tg-watchbot +python3 -m venv .venv +./.venv/bin/pip install -U pip +./.venv/bin/pip install -r requirements.txt +cp .env.example .env +cp config.example.yaml config.yaml +nano .env +``` + +至少填写: + +```dotenv +TELEGRAM_BOT_TOKEN= +ADMIN_CHAT_ID= +WEB_PANEL_USER=admin +WEB_PANEL_PASSWORD= +``` + +启动: + +```bash +./.venv/bin/python app.py +``` + +打开面板: + +```text +http://127.0.0.1:8765 +``` + +如果只是先配置面板、还没有 Telegram Token: + +```bash +./.venv/bin/python app.py --panel-only +``` + +手动跑一次监控: + +```bash +./.venv/bin/python app.py --run-once +``` + +## systemd 部署 + +推荐部署到 `/opt/tg-watchbot`: + +```bash +sudo useradd --system --no-create-home --shell /usr/sbin/nologin tg-watchbot || true +sudo mkdir -p /opt/tg-watchbot +sudo chown -R "$USER:$USER" /opt/tg-watchbot + +cd /opt/tg-watchbot +git clone . +python3 -m venv .venv +./.venv/bin/pip install -U pip +./.venv/bin/pip install -r requirements.txt +cp .env.example .env +cp config.example.yaml config.yaml +nano .env + +sudo chown -R tg-watchbot:tg-watchbot /opt/tg-watchbot +sudo chmod 600 /opt/tg-watchbot/.env +sudo cp systemd/tg-watchbot.service /etc/systemd/system/tg-watchbot.service +sudo systemctl daemon-reload +sudo systemctl enable --now tg-watchbot +sudo journalctl -u tg-watchbot -f +``` + +健康检查: + +```bash +curl http://127.0.0.1:8765/health +``` + +## 配置说明 + +### `.env` + +| 变量 | 说明 | +|---|---| +| `TELEGRAM_BOT_TOKEN` | BotFather 创建的 Telegram Bot Token | +| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id,用于接收用户消息和监控通知 | +| `LOG_LEVEL` | 日志级别,默认 `INFO` | +| `WEB_PANEL_ENABLED` | 是否启用 Web 面板,默认 `true` | +| `WEB_PANEL_HOST` | 面板监听地址,默认 `127.0.0.1` | +| `WEB_PANEL_PORT` | 面板端口,默认 `8765` | +| `WEB_PANEL_USER` | 面板用户名 | +| `WEB_PANEL_PASSWORD` | 面板密码 | +| `WEB_PANEL_SESSION_SECRET` | Session Secret,留空会自动生成并写回 `.env` | + +### `config.yaml` + +监控数据自动清理示例: + +```yaml +cleanup: + enabled: true + interval_minutes: 60 # 每多少分钟执行一次清理 + monitor_retention_minutes: 1440 # RSS/网站监控状态保留多久 +``` + +清理范围只包括: + +- `monitor_state`:网站/RSS 条目状态、价格/库存状态; +- `sent_events`:监控推送去重记录。 + +不会删除: + +- `users`; +- `message_map`; +- `inbox_messages`; +- 任何双向对话/客服消息记录。 + +RSS 示例: + +```yaml +monitors: + - name: NodeSeek 新帖 + type: rss + url: https://rss.nodeseek.com/ + interval_seconds: 60 + keywords: + - VPS + - 优惠 + exclude_keywords: + - 出号 + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +``` + +网页示例: + +```yaml +monitors: + - name: Example Deals + type: web + url: https://example.com/deals + interval_seconds: 300 + keywords: + - discount + selectors: + item: article, .deal, li + title: h1, h2, h3, a + link: a + price: .price + stock: .stock + notify_on: + keyword_match: true + new_item: true + price_change: true + stock_change: true +``` + +## 管理命令 + +管理员在 Telegram 里可用: + +```text +/reply <内容> +/sendpic [图片说明] +/block +/unblock +/note <备注> +/who +/cancel +``` + +也可以直接“回复 Bot 转发给管理员的用户消息”,Bot 会按映射把回复发回原用户。 + +## 面板路由 + +| 路由 | 说明 | +|---|---| +| `/` | 监控列表 | +| `/monitor/new` | 新增监控 | +| `/monitor/templates` | 论坛模板 | +| `/monitor/bulk` | 批量新增 | +| `/monitor/{idx}/preview` | 预览抓取结果,不写入状态、不推送 | +| `/monitor/{idx}/run` | 手动检查单个监控 | +| `/run-once` | 手动检查全部监控 | +| `/yaml` | YAML 高级编辑 | +| `/settings` | `.env` 设置和监控清理策略 | +| `/send` | 主动发消息给已私聊过 Bot 的用户 | +| `/inbox` | 收件箱 | +| `/logs` | 日志 | +| `/health` | 健康检查 | + +## 注意事项 + +- Telegram Bot 不能主动私聊陌生人;对方必须先给 Bot 发过 `/start` 或任意消息。 +- 对公网暴露 Web 面板前,务必改默认密码。 +- RSS 监控建议 60 秒起步;网页监控建议更保守,避免对目标站造成压力。 +- 媒体消息当前只保证记录文本/说明和转发状态;转发失败后的媒体补发需要额外做本地附件存储。 + +## License + +MIT diff --git a/app.py b/app.py new file mode 100644 index 0000000..66dae04 --- /dev/null +++ b/app.py @@ -0,0 +1,1524 @@ +#!/usr/bin/env python3 +"""tg-watchbot: Telegram two-way support bot + web/RSS monitor. + +- Official Telegram Bot API via aiogram (no userbot/selfbot). +- SQLite state for dedupe, users, admin-message mapping, blocks, notes, monitor state. +- APScheduler async jobs for monitoring. +""" + +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import html +import logging +import os +import re +import secrets +import signal +import sqlite3 +import time +from contextlib import closing +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.parse import urljoin + +import feedparser +import httpx +import yaml +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from bs4 import BeautifulSoup +from dotenv import load_dotenv + +from aiogram import Bot, Dispatcher, F, Router +from aiogram.enums import ParseMode +from aiogram.exceptions import TelegramAPIError +from aiogram.filters import Command, CommandObject +from aiogram.types import Message +from aiogram.client.default import DefaultBotProperties +from fastapi import Depends, FastAPI, Form, HTTPException, Request, Response, status +from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse +import uvicorn + +BASE_DIR = Path(__file__).resolve().parent +DB_PATH = BASE_DIR / "tg-watchbot.sqlite3" +CONFIG_PATH = BASE_DIR / "config.yaml" +ENV_PATH = BASE_DIR / ".env" +LOG_PATH = BASE_DIR / "tg-watchbot.log" +MIN_INTERVAL_SECONDS = 60 + +DEFAULT_UA = ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36 tg-watchbot/1.0" +) + +logger = logging.getLogger("tg-watchbot") +router = Router() +bot: Bot | None = None +admin_chat_id: int | None = None +config: dict[str, Any] = {} +rate_buckets: dict[int, list[float]] = {} +pending_sendpic: dict[int, dict[str, Any]] = {} +scheduler_ref: AsyncIOScheduler | None = None + + +def setup_logging(level: str = "INFO") -> None: + LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + handlers=[logging.StreamHandler(), logging.FileHandler(LOG_PATH, encoding="utf-8")], + ) + + +def load_config() -> dict[str, Any]: + if not CONFIG_PATH.exists(): + raise FileNotFoundError(f"missing config: {CONFIG_PATH}") + with CONFIG_PATH.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return data + + +def monitor_cleanup_settings() -> dict[str, int | bool]: + cleanup = (config.get("cleanup") or {}) if isinstance(config, dict) else {} + return { + "enabled": bool(cleanup.get("enabled", True)), + "interval_minutes": max(1, int(cleanup.get("interval_minutes", 60))), + "retention_minutes": max(1, int(cleanup.get("monitor_retention_minutes", 1440))), + } + + +def db() -> sqlite3.Connection: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db() -> None: + with closing(db()) as conn: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + CREATE TABLE IF NOT EXISTS users ( + user_id INTEGER PRIMARY KEY, + username TEXT, + full_name TEXT, + note TEXT DEFAULT '', + blocked INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS message_map ( + admin_chat_id INTEGER NOT NULL, + admin_message_id INTEGER NOT NULL, + user_id INTEGER NOT NULL, + user_message_id INTEGER, + created_at TEXT NOT NULL, + PRIMARY KEY (admin_chat_id, admin_message_id) + ); + CREATE TABLE IF NOT EXISTS sent_events ( + event_key TEXT PRIMARY KEY, + monitor_name TEXT NOT NULL, + title TEXT, + link TEXT, + created_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS monitor_state ( + monitor_name TEXT NOT NULL, + item_key TEXT NOT NULL, + price TEXT, + stock TEXT, + title TEXT, + link TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (monitor_name, item_key) + ); + CREATE TABLE IF NOT EXISTS inbox_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + username TEXT, + full_name TEXT, + user_message_id INTEGER, + message_type TEXT, + text TEXT, + forwarded INTEGER DEFAULT 0, + admin_header_message_id INTEGER, + admin_copy_message_id INTEGER, + created_at TEXT NOT NULL, + forwarded_at TEXT, + error TEXT + ); + """ + ) + conn.commit() + + +def now_iso() -> str: + return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + + +def html_escape(text: Any) -> str: + return html.escape(str(text or ""), quote=False) + + +def user_display(message: Message) -> tuple[int, str, str | None]: + u = message.from_user + if not u: + return 0, "unknown", None + full = " ".join(x for x in [u.first_name, u.last_name] if x).strip() or str(u.id) + return u.id, full, u.username + + +def upsert_user(user_id: int, full_name: str, username: str | None) -> None: + ts = now_iso() + with closing(db()) as conn: + conn.execute( + """ + INSERT INTO users(user_id, username, full_name, created_at, updated_at) + VALUES(?, ?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + username=excluded.username, + full_name=excluded.full_name, + updated_at=excluded.updated_at + """, + (user_id, username, full_name, ts, ts), + ) + conn.commit() + + +def get_user(user_id: int) -> sqlite3.Row | None: + with closing(db()) as conn: + return conn.execute("SELECT * FROM users WHERE user_id=?", (user_id,)).fetchone() + + +def is_blocked(user_id: int) -> bool: + row = get_user(user_id) + return bool(row and row["blocked"]) + + +def set_block(user_id: int, blocked: bool) -> None: + with closing(db()) as conn: + conn.execute( + "UPDATE users SET blocked=?, updated_at=? WHERE user_id=?", + (1 if blocked else 0, now_iso(), user_id), + ) + conn.commit() + + +def set_note(user_id: int, note: str) -> None: + with closing(db()) as conn: + conn.execute("UPDATE users SET note=?, updated_at=? WHERE user_id=?", (note, now_iso(), user_id)) + conn.commit() + + +def rate_limited(user_id: int) -> bool: + rl = (config.get("bot") or {}).get("rate_limit") or {} + window = int(rl.get("window_seconds", 10)) + max_messages = int(rl.get("max_messages", 3)) + t = time.time() + bucket = [x for x in rate_buckets.get(user_id, []) if t - x <= window] + bucket.append(t) + rate_buckets[user_id] = bucket + return len(bucket) > max_messages + + +def save_message_map(admin_msg: Message, user_id: int, user_message_id: int | None) -> None: + with closing(db()) as conn: + conn.execute( + "INSERT OR REPLACE INTO message_map(admin_chat_id, admin_message_id, user_id, user_message_id, created_at) VALUES(?,?,?,?,?)", + (admin_msg.chat.id, admin_msg.message_id, user_id, user_message_id, now_iso()), + ) + conn.commit() + + + + +def create_inbox_message(message: Message, user_id: int, full_name: str, username: str | None) -> int: + msg_type = "text" if message.text else (message.content_type or "message") + text = message.text or message.caption or "" + with closing(db()) as conn: + cur = conn.execute( + """ + INSERT INTO inbox_messages(user_id, username, full_name, user_message_id, message_type, text, created_at) + VALUES(?,?,?,?,?,?,?) + """, + (user_id, username, full_name, message.message_id, msg_type, text, now_iso()), + ) + conn.commit() + return int(cur.lastrowid) + + +def mark_inbox_forwarded(inbox_id: int, header_id: int | None = None, copy_id: int | None = None) -> None: + with closing(db()) as conn: + conn.execute( + "UPDATE inbox_messages SET forwarded=1, admin_header_message_id=?, admin_copy_message_id=?, forwarded_at=?, error=NULL WHERE id=?", + (header_id, copy_id, now_iso(), inbox_id), + ) + conn.commit() + + +def mark_inbox_error(inbox_id: int, error: str) -> None: + with closing(db()) as conn: + conn.execute("UPDATE inbox_messages SET error=? WHERE id=?", (error[:1000], inbox_id)) + conn.commit() + + +def pending_inbox(limit: int = 50) -> list[sqlite3.Row]: + with closing(db()) as conn: + return list(conn.execute("SELECT * FROM inbox_messages WHERE forwarded=0 ORDER BY id ASC LIMIT ?", (limit,)).fetchall()) + + +def lookup_reply_target(admin_chat: int, admin_message_id: int) -> int | None: + with closing(db()) as conn: + row = conn.execute( + "SELECT user_id FROM message_map WHERE admin_chat_id=? AND admin_message_id=?", + (admin_chat, admin_message_id), + ).fetchone() + return int(row["user_id"]) if row else None + + +def parse_user_id_and_text(args: str | None) -> tuple[int, str]: + if not args: + raise ValueError("缺少参数") + parts = args.strip().split(maxsplit=1) + if len(parts) < 2: + raise ValueError("格式应为:/reply <内容>") + return int(parts[0]), parts[1] + + +def parse_user_id(args: str | None) -> int: + if not args: + raise ValueError("缺少 user_id") + return int(args.strip().split()[0]) + + +def parse_user_id_and_optional_text(args: str | None) -> tuple[int, str]: + if not args: + raise ValueError("缺少 user_id") + parts = args.strip().split(maxsplit=1) + uid = int(parts[0]) + caption = parts[1] if len(parts) > 1 else "" + return uid, caption + + +def describe_sendpic_target(user_id: int) -> str: + row = get_user(user_id) + if not row: + return str(user_id) + username = f"@{row['username']}" if row['username'] else "" + full_name = row['full_name'] or str(user_id) + return f"{full_name} {username}".strip() + + +async def admin_send(text: str) -> None: + if not bot or admin_chat_id is None: + logger.error("admin_send called before bot/admin init: %s", text) + return + try: + await bot.send_message(admin_chat_id, text, disable_web_page_preview=False) + except Exception: + logger.exception("failed to send admin notification") + + +def is_admin_chat(message: Message) -> bool: + """Dynamic admin-chat filter.""" + return admin_chat_id is not None and message.chat.id == admin_chat_id + + +def is_admin_action_message(message: Message) -> bool: + """Only catch admin messages that are part of an action flow. + + A broad admin-chat handler would swallow ordinary admin messages before the + fallback handler. Keep it narrow: reply-to-user and /sendpic photo flow only. + """ + if not is_admin_chat(message): + return False + if pending_sendpic.get(message.chat.id): + return True + return bool(message.reply_to_message and message.text) + + +@router.message(Command("start")) +async def start(message: Message) -> None: + uid, full, username = user_display(message) + if not uid: + return + upsert_user(uid, full, username) + if is_blocked(uid): + await message.answer("你当前无法发送消息。") + return + await message.answer("已连接客服/管理员。你发来的消息会转交给管理员,请直接输入内容。") + + +async def send_text_to_user_from_admin(message: Message, args: str | None, command_name: str) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, text = parse_user_id_and_text(args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}。对方需要先私聊 Bot 或 /start,Telegram Bot 才能主动发送。") + return + if is_blocked(uid): + await message.reply(f"错误:用户 {uid} 已被封禁,先 /unblock {uid}") + return + if not bot: + await message.reply("错误:Bot 尚未初始化") + return + sent = await bot.send_message(uid, text) # type: ignore[union-attr] + await message.reply(f"{command_name} 成功:已发送给用户 {uid},message_id={sent.message_id}") + except Exception as e: + logger.exception("/%s failed", command_name) + await message.reply(f"/{command_name} 失败:{e}\n用法:/{command_name} <内容>") + + +@router.message(Command("reply")) +async def cmd_reply(message: Message, command: CommandObject) -> None: + await send_text_to_user_from_admin(message, command.args, "reply") + + +@router.message(Command("send")) +async def cmd_send(message: Message, command: CommandObject) -> None: + await send_text_to_user_from_admin(message, command.args, "send") + + +@router.message(Command("sendpic")) +async def cmd_sendpic(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, caption = parse_user_id_and_optional_text(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid},对方需要先 /start 机器人") + return + if is_blocked(uid): + await message.reply(f"错误:用户 {uid} 已被封禁,先 /unblock {uid}") + return + pending_sendpic[message.chat.id] = {"target": uid, "caption": caption, "created_at": time.time()} + suffix = f"\n说明文字:{caption}" if caption else "" + await message.reply( + f"请发送需要转发给 {uid}({html_escape(describe_sendpic_target(uid))})的图片。{suffix}\n" + "2 分钟内发送一张图片即可;发送 /cancel 取消。" + ) + except Exception as e: + logger.exception("/sendpic failed") + await message.reply(f"/sendpic 失败:{e}\n用法:/sendpic 用户ID [可选图片说明]") + + +@router.message(Command("cancel")) +async def cmd_cancel(message: Message) -> None: + if message.chat.id == admin_chat_id and pending_sendpic.pop(message.chat.id, None): + await message.reply("已取消待发送图片。") + + +@router.message(Command("block")) +async def cmd_block(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_block(uid, True) + await message.reply(f"已封禁用户 {uid}") + except Exception as e: + logger.exception("/block failed") + await message.reply(f"/block 失败:{e}") + + +@router.message(Command("unblock")) +async def cmd_unblock(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_block(uid, False) + await message.reply(f"已解封用户 {uid}") + except Exception as e: + logger.exception("/unblock failed") + await message.reply(f"/unblock 失败:{e}") + + +@router.message(Command("note")) +async def cmd_note(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, note = parse_user_id_and_text(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_note(uid, note) + await message.reply(f"已更新用户 {uid} 备注") + except Exception as e: + logger.exception("/note failed") + await message.reply(f"/note 失败:{e}") + + +@router.message(Command("who")) +async def cmd_who(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + row = get_user(uid) + if not row: + await message.reply(f"错误:找不到用户 {uid}") + return + await message.reply( + "用户信息\n" + f"user_id: {row['user_id']}\n" + f"username: @{row['username']}\n" + f"full_name: {row['full_name']}\n" + f"blocked: {bool(row['blocked'])}\n" + f"note: {row['note'] or ''}\n" + f"created_at: {row['created_at']}\n" + f"updated_at: {row['updated_at']}" + ) + except Exception as e: + logger.exception("/who failed") + await message.reply(f"/who 失败:{e}") + + +@router.message(is_admin_action_message) +async def admin_reply_by_message(message: Message) -> None: + # Pending /sendpic flow: after /sendpic , the next admin photo is copied to target. + pending = pending_sendpic.get(message.chat.id) + if pending: + if time.time() - float(pending.get("created_at", 0)) > 120: + pending_sendpic.pop(message.chat.id, None) + await message.reply("发送图片超时,已取消。请重新使用 /sendpic 用户ID。") + return + if message.photo: + target = int(pending["target"]) + caption = (message.caption or pending.get("caption") or "")[:1024] + try: + if is_blocked(target): + pending_sendpic.pop(message.chat.id, None) + await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}") + return + await bot.send_photo(target, message.photo[-1].file_id, caption=caption or None) # type: ignore[union-attr] + pending_sendpic.pop(message.chat.id, None) + await message.reply(f"已发送图片给用户 {target}") + except TelegramAPIError as e: + logger.exception("/sendpic photo forwarding failed") + await message.reply(f"图片发送失败:{e}") + return + if message.text and message.text.startswith("/"): + return + await message.reply("请发送一张图片;或发送 /cancel 取消。") + return + + # Admin replies to forwarded/copy notification in admin chat. + if not message.reply_to_message or not message.text: + return + target = lookup_reply_target(message.chat.id, message.reply_to_message.message_id) + if not target: + return + try: + if is_blocked(target): + await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}") + return + sent = await bot.send_message(target, message.text) # type: ignore[union-attr] + await message.reply(f"已发送给用户 {target},message_id={sent.message_id}") + except TelegramAPIError as e: + logger.exception("admin reply forwarding failed") + await message.reply(f"发送失败:{e}") + + +@router.message(is_admin_chat) +async def admin_plain_message(message: Message) -> None: + # Do not silently swallow ordinary admin messages. + if message.text and not message.text.startswith("/"): + await message.reply( + "管理员普通消息不会自动转发。请使用:\n" + "/send <内容>\n" + "/reply <内容>\n" + "或在收件箱里回复某条用户消息;也可以打开面板的「主动发消息」。" + ) + + +@router.message() +async def user_message(message: Message) -> None: + # Only relay private user chats to admin. + logger.info("incoming message chat_id=%s chat_type=%s from_user=%s content_type=%s text=%r", message.chat.id, message.chat.type, getattr(message.from_user, 'id', None), message.content_type, (message.text or '')[:80]) + if message.chat.id == admin_chat_id: + logger.info("incoming message is admin plain message; ignored by user relay") + return + if message.chat.type != "private": + logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type) + return + uid, full, username = user_display(message) + if not uid: + return + upsert_user(uid, full, username) + if is_blocked(uid): + await message.answer("你当前无法发送消息。") + return + if rate_limited(uid): + await message.answer("发送太快了,请稍后再试。") + return + inbox_id = create_inbox_message(message, uid, full, username) + user_row = get_user(uid) + note = user_row["note"] if user_row and "note" in user_row.keys() else "" + header = ( + f"[用户消息 #{inbox_id}]\n" + f"user_id: {uid}\n" + f"name: {html_escape(full)}\n" + f"username: @{html_escape(username or '')}\n" + f"note: {html_escape(note)}\n" + f"time: {html_escape(now_iso())}" + ) + try: + sent = await bot.send_message(admin_chat_id, header) # type: ignore[union-attr] + save_message_map(sent, uid, message.message_id) + copied = await message.copy_to(admin_chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type] + save_message_map(copied, uid, message.message_id) + mark_inbox_forwarded(inbox_id, sent.message_id, copied.message_id) + await message.answer("已转交管理员。") + except Exception as e: + mark_inbox_error(inbox_id, repr(e)) + logger.exception("failed to relay user message, saved inbox_id=%s", inbox_id) + await message.answer("已收到留言,但转发管理员暂时失败;系统会稍后自动重试。") + + +@dataclass +class MonitorItem: + key: str + title: str + link: str + text: str + price: str | None = None + stock: str | None = None + author: str | None = None + published: str | None = None + category: str | None = None + + +def stable_key(*parts: str) -> str: + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest() + + +def extract_price(text: str) -> str | None: + m = re.search(r"(?:¥|¥|\$|USD|CNY)?\s*\d+(?:[.,]\d{1,2})?", text, re.I) + return m.group(0).strip() if m else None + + +def keyword_hits(text: str, keywords: list[str]) -> list[str]: + low = text.lower() + return [k for k in keywords if k and k.lower() in low] + + +def item_blocked(item: MonitorItem, monitor: dict[str, Any]) -> tuple[bool, str]: + text = f"{item.title} {item.text} {item.author or ''} {item.category or ''}" + exclude_hits = keyword_hits(text, monitor.get("exclude_keywords") or []) + if exclude_hits: + return True, "屏蔽词 " + ", ".join(exclude_hits) + authors = [a.lower() for a in (monitor.get("authors") or []) if a] + if authors and (item.author or "").lower() not in authors: + return True, "作者不匹配" + categories = [c.lower() for c in (monitor.get("categories") or []) if c] + if categories and not any(c in (item.category or "").lower() for c in categories): + return True, "分类不匹配" + return False, "" + + +async def fetch_url(client: httpx.AsyncClient, url: str) -> str: + resp = await client.get(url, follow_redirects=True) + resp.raise_for_status() + return resp.text + + +def parse_web_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]: + selectors = monitor.get("selectors") or {} + item_sel = selectors.get("item") or "article, .thread, .post, li" + title_sel = selectors.get("title") or "h1, h2, h3, a" + link_sel = selectors.get("link") or "a" + price_sel = selectors.get("price") + stock_sel = selectors.get("stock") + soup = BeautifulSoup(body, "html.parser") + nodes = soup.select(item_sel)[:100] + if not nodes: + nodes = [soup] + items: list[MonitorItem] = [] + for node in nodes: + title_node = node.select_one(title_sel) if title_sel else None + link_node = node.select_one(link_sel) if link_sel else None + title = (title_node.get_text(" ", strip=True) if title_node else node.get_text(" ", strip=True)[:120]).strip() + href = link_node.get("href") if link_node else "" + link = urljoin(monitor.get("url", ""), href) if href else monitor.get("url", "") + text = node.get_text(" ", strip=True) + price = None + stock = None + if price_sel and (p := node.select_one(price_sel)): + price = p.get_text(" ", strip=True) + else: + price = extract_price(text) + if stock_sel and (s := node.select_one(stock_sel)): + stock = s.get_text(" ", strip=True) + for hint in ["有货", "无货", "缺货", "in stock", "out of stock", "sold out", "available"]: + if hint.lower() in text.lower(): + stock = hint + break + if title or text: + key = stable_key(link, title or text[:80]) + items.append(MonitorItem(key=key, title=title or "(no title)", link=link, text=text, price=price, stock=stock)) + return items + + +def canonical_forum_key(link: str, entry_id: str = "") -> str: + """Return a stable topic/post key that survives title edits and RSS updated ids.""" + target = link or entry_id + patterns = [ + r"nodeseek\.com/post-(\d+)", + r"linux\.do/t/(?:[^/]+/)?(\d+)", + r"/t/(?:[^/]+/)?(\d+)", + ] + for value in [target, entry_id, link]: + for pattern in patterns: + m = re.search(pattern, value or "", re.I) + if m: + return m.group(1) + return stable_key(link or entry_id) + + +def parse_rss_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]: + feed = feedparser.parse(body) + items: list[MonitorItem] = [] + for e in feed.entries[:100]: + title = getattr(e, "title", "(no title)") + link = getattr(e, "link", monitor.get("url", "")) + summary = getattr(e, "summary", "") + content = " ".join([c.get("value", "") for c in getattr(e, "content", []) if isinstance(c, dict)]) + published = getattr(e, "published", "") or getattr(e, "updated", "") + author = getattr(e, "author", "") or getattr(e, "dc_creator", "") + category = "" + tags = getattr(e, "tags", None) or [] + if tags: + category = ", ".join([t.get("term", "") for t in tags if isinstance(t, dict) and t.get("term")]) + entry_id = getattr(e, "id", "") or getattr(e, "guid", "") + key = canonical_forum_key(link, entry_id) if (monitor.get("forum") or monitor.get("type") == "rss") else stable_key(entry_id, link, title) + items.append(MonitorItem(key=key, title=title, link=link, text=f"{title} {summary} {content} {published} {author} {category}", author=author, published=published, category=category)) + return items + + +def should_notify_and_update(monitor: dict[str, Any], item: MonitorItem, hits: list[str]) -> list[str]: + name = monitor["name"] + notify_on = monitor.get("notify_on") or {} + reasons: list[str] = [] + with closing(db()) as conn: + prev = conn.execute( + "SELECT * FROM monitor_state WHERE monitor_name=? AND item_key=?", + (name, item.key), + ).fetchone() + is_forum = bool(monitor.get("forum") or monitor.get("type") == "rss") + if is_forum: + # 论坛/RSS 帖子只在首次出现并命中时通知一次。 + # 后续 RSS 因回复/编辑把同一链接重新排到前面时,只更新状态,不再重复推送。 + if prev is None: + if notify_on.get("new_item", False): + reasons.append("新条目") + if notify_on.get("keyword_match", True) and hits: + reasons.append("关键词 " + ", ".join(hits)) + else: + if prev is None: + if notify_on.get("new_item", False): + reasons.append("新条目") + else: + if notify_on.get("price_change", False) and (item.price or "") != (prev["price"] or ""): + reasons.append(f"价格变化 {prev['price'] or '-'} -> {item.price or '-'}") + if notify_on.get("stock_change", False) and (item.stock or "") != (prev["stock"] or ""): + reasons.append(f"库存变化 {prev['stock'] or '-'} -> {item.stock or '-'}") + if notify_on.get("keyword_match", True) and hits: + reasons.append("关键词 " + ", ".join(hits)) + conn.execute( + """ + INSERT INTO monitor_state(monitor_name, item_key, price, stock, title, link, updated_at) + VALUES(?,?,?,?,?,?,?) + ON CONFLICT(monitor_name, item_key) DO UPDATE SET + price=excluded.price, stock=excluded.stock, title=excluded.title, + link=excluded.link, updated_at=excluded.updated_at + """, + (name, item.key, item.price, item.stock, item.title, item.link, now_iso()), + ) + conn.commit() + return reasons + + +def event_not_sent(event_key: str, monitor_name: str, title: str, link: str) -> bool: + with closing(db()) as conn: + try: + conn.execute( + "INSERT INTO sent_events(event_key, monitor_name, title, link, created_at) VALUES(?,?,?,?,?)", + (event_key, monitor_name, title, link, now_iso()), + ) + conn.commit() + return True + except sqlite3.IntegrityError: + return False + + +async def run_monitor(monitor: dict[str, Any]) -> int: + name = monitor.get("name", "unnamed") + mtype = monitor.get("type", "web") + url = monitor.get("url") + if not url: + logger.error("monitor %s missing url", name) + return 0 + keywords = monitor.get("keywords") or [] + timeout = int((config.get("http") or {}).get("timeout_seconds", 20)) + ua = (config.get("http") or {}).get("user_agent") or DEFAULT_UA + headers = {"User-Agent": ua, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"} + sent_count = 0 + try: + async with httpx.AsyncClient(timeout=timeout, headers=headers) as client: + body = await fetch_url(client, url) + items = parse_rss_items(monitor, body) if mtype == "rss" else parse_web_items(monitor, body) + for item in items: + blocked, block_reason = item_blocked(item, monitor) + if blocked: + logger.debug("monitor %s skipped item %s: %s", name, item.title, block_reason) + continue + hits = keyword_hits(f"{item.title} {item.text}", keywords) + # If keywords are configured and keyword_match is enabled, do not push unrelated new posts. + notify_on = monitor.get("notify_on") or {} + if keywords and notify_on.get("keyword_match", True) and not hits and not (notify_on.get("price_change") or notify_on.get("stock_change")): + should_notify_and_update(monitor, item, []) # still remember state to avoid later old flood + continue + reasons = should_notify_and_update(monitor, item, hits) + if not reasons: + continue + # 论坛/RSS 以帖子本身作为事件键;不要把“命中原因/检查时间/编辑变化”放进去,避免同一帖重复发。 + is_forum = monitor.get("forum") or mtype == "rss" + event_key = stable_key(name, item.key) if is_forum else stable_key(name, item.key, "|".join(reasons), item.price or "", item.stock or "") + if not event_not_sent(event_key, name, item.title, item.link): + continue + if is_forum: + text = ( + f"[新帖命中] {html_escape(name)}\n" + f"标题:{html_escape(item.title)}\n" + f"作者:{html_escape(item.author or '-')}\n" + f"分类:{html_escape(item.category or '-')}\n" + f"链接:{html_escape(item.link)}\n" + f"命中:{html_escape('; '.join(reasons))}\n" + f"发布时间:{html_escape(item.published or '-')}\n" + f"检查时间:{html_escape(now_iso())}" + ) + else: + text = ( + f"[库存/关键词命中] {html_escape(name)}\n" + f"标题:{html_escape(item.title)}\n" + f"链接:{html_escape(item.link)}\n" + f"命中:{html_escape('; '.join(reasons))}\n" + f"价格:{html_escape(item.price or '-')}\n" + f"库存:{html_escape(item.stock or '-')}\n" + f"时间:{html_escape(now_iso())}" + ) + await admin_send(text) + sent_count += 1 + except Exception: + logger.exception("monitor failed: %s %s", name, url) + return sent_count + + + +def cleanup_monitor_data(retention_minutes: int) -> tuple[int, int]: + """Delete only website/RSS monitor state older than retention. + + Keeps two-way conversation tables intact: users, message_map, inbox_messages. + """ + cutoff_ts = time.time() - max(1, int(retention_minutes)) * 60 + cutoff = datetime.fromtimestamp(cutoff_ts, timezone.utc).astimezone().isoformat(timespec="seconds") + with closing(db()) as conn: + cur1 = conn.execute("DELETE FROM monitor_state WHERE updated_at < ?", (cutoff,)) + cur2 = conn.execute("DELETE FROM sent_events WHERE created_at < ?", (cutoff,)) + conn.commit() + return int(cur1.rowcount or 0), int(cur2.rowcount or 0) + + +async def cleanup_monitor_loop() -> None: + while True: + settings = monitor_cleanup_settings() + await asyncio.sleep(int(settings["interval_minutes"]) * 60) + if not settings["enabled"]: + continue + try: + state_n, sent_n = cleanup_monitor_data(int(settings["retention_minutes"])) + logger.info( + "monitor cleanup done retention=%smin deleted monitor_state=%s sent_events=%s", + settings["retention_minutes"], state_n, sent_n, + ) + except Exception: + logger.exception("monitor cleanup failed") + + + +async def flush_pending_inbox() -> None: + if not bot or admin_chat_id is None: + return + rows = pending_inbox(50) + if not rows: + return + logger.info("flushing pending inbox messages: %d", len(rows)) + for row in rows: + try: + text = ( + f"[补发用户消息 #{row['id']}]\n" + f"user_id: {row['user_id']}\n" + f"name: {html_escape(row['full_name'])}\n" + f"username: @{html_escape(row['username'] or '')}\n" + f"原消息ID: {row['user_message_id']}\n" + f"类型: {html_escape(row['message_type'])}\n" + f"时间: {html_escape(row['created_at'])}\n\n" + f"内容:{html_escape(row['text'] or '(非文本/媒体消息,原始媒体无法补发,仅保留记录)')}" + ) + sent = await bot.send_message(admin_chat_id, text) + save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None) + mark_inbox_forwarded(int(row['id']), sent.message_id, None) + except Exception as e: + mark_inbox_error(int(row['id']), repr(e)) + logger.exception("failed to flush inbox message id=%s", row['id']) + + +async def flush_pending_loop() -> None: + while True: + try: + await flush_pending_inbox() + except Exception: + logger.exception("flush_pending_loop failed") + await asyncio.sleep(60) + + +async def run_all_monitors_once() -> None: + monitors = config.get("monitors") or [] + logger.info("manual/all monitor run start, count=%d", len(monitors)) + total = 0 + for m in monitors: + total += await run_monitor(m) + logger.info("manual/all monitor run done, notifications=%d", total) + + +def schedule_monitors(scheduler: AsyncIOScheduler) -> None: + for idx, m in enumerate(config.get("monitors") or []): + name = m.get("name", "unnamed") + requested = int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) + interval = max(requested, MIN_INTERVAL_SECONDS) + if requested < MIN_INTERVAL_SECONDS: + logger.warning("monitor %s interval %s raised to %s", name, requested, interval) + # Use index+hash, not just name: duplicate names should not crash saving/reloading. + job_key = stable_key(str(idx), name, m.get("url", ""))[:16] + scheduler.add_job(run_monitor, "interval", seconds=interval, args=[m], id=f"monitor:{idx}:{job_key}", max_instances=1, coalesce=True, replace_existing=True, next_run_time=datetime.now(timezone.utc)) + logger.info("scheduled monitor %s every %ss", name, interval) + + + +# ----------------------------- +# Web admin panel +# ----------------------------- + +def panel_enabled() -> bool: + return os.getenv("WEB_PANEL_ENABLED", "true").lower() not in {"0", "false", "no", "off"} + + +def session_secret() -> str: + secret = os.getenv("WEB_PANEL_SESSION_SECRET", "").strip() + if not secret: + secret = secrets.token_urlsafe(32) + vals = env_values() + vals["WEB_PANEL_SESSION_SECRET"] = secret + write_env_values(vals) + return secret + + +def session_token(username: str) -> str: + raw = f"{username}|{session_secret()}" + return hashlib.sha256(raw.encode()).hexdigest() + + +def is_logged_in(request: Request) -> bool: + username = os.getenv("WEB_PANEL_USER", "admin") + token = request.cookies.get("tg_watchbot_session", "") + return bool(token) and secrets.compare_digest(token, session_token(username)) + + +def panel_auth(request: Request) -> str: + # Actual redirect is handled by middleware; dependencies cannot reliably return redirects. + return os.getenv("WEB_PANEL_USER", "admin") + + +def login_page(error: str = "") -> str: + err = f"" if error else "" + return f""" +登录 · tg-watchbot +

tg-watchbot

登录后管理 Telegram 机器人、关键词监控、库存/价格提醒。

{err}
your-domain.example · Cloudflare Tunnel
""" + + +def env_values() -> dict[str, str]: + load_dotenv(ENV_PATH, override=True) + return { + "TELEGRAM_BOT_TOKEN": os.getenv("TELEGRAM_BOT_TOKEN", ""), + "ADMIN_CHAT_ID": os.getenv("ADMIN_CHAT_ID", ""), + "LOG_LEVEL": os.getenv("LOG_LEVEL", "INFO"), + "WEB_PANEL_ENABLED": os.getenv("WEB_PANEL_ENABLED", "true"), + "WEB_PANEL_HOST": os.getenv("WEB_PANEL_HOST", "127.0.0.1"), + "WEB_PANEL_PORT": os.getenv("WEB_PANEL_PORT", "8765"), + "WEB_PANEL_USER": os.getenv("WEB_PANEL_USER", "admin"), + "WEB_PANEL_PASSWORD": os.getenv("WEB_PANEL_PASSWORD", "admin"), + "WEB_PANEL_SESSION_SECRET": os.getenv("WEB_PANEL_SESSION_SECRET", ""), + } + + +def write_env_values(values: dict[str, str]) -> None: + lines = [ + "# tg-watchbot environment", + f"TELEGRAM_BOT_TOKEN={values.get('TELEGRAM_BOT_TOKEN','')}", + f"ADMIN_CHAT_ID={values.get('ADMIN_CHAT_ID','')}", + f"LOG_LEVEL={values.get('LOG_LEVEL','INFO')}", + "", + "# Web 管理面板;默认只监听本机,建议用 SSH 隧道或反代再暴露", + f"WEB_PANEL_ENABLED={values.get('WEB_PANEL_ENABLED','true')}", + f"WEB_PANEL_HOST={values.get('WEB_PANEL_HOST','127.0.0.1')}", + f"WEB_PANEL_PORT={values.get('WEB_PANEL_PORT','8765')}", + f"WEB_PANEL_USER={values.get('WEB_PANEL_USER','admin')}", + f"WEB_PANEL_PASSWORD={values.get('WEB_PANEL_PASSWORD','admin')}", + f"WEB_PANEL_SESSION_SECRET={values.get('WEB_PANEL_SESSION_SECRET','')}", + "", + ] + ENV_PATH.write_text("\n".join(lines), encoding="utf-8") + ENV_PATH.chmod(0o600) + load_dotenv(ENV_PATH, override=True) + + +def cfg_load_fresh() -> dict[str, Any]: + return load_config() + + +def cfg_save(new_cfg: dict[str, Any]) -> None: + if not isinstance(new_cfg, dict): + raise ValueError("配置根节点必须是对象") + monitors = new_cfg.setdefault("monitors", []) + if not isinstance(monitors, list): + raise ValueError("monitors 必须是列表") + for m in monitors: + if not isinstance(m, dict): + raise ValueError("每个 monitor 必须是对象") + if int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) < MIN_INTERVAL_SECONDS: + m["interval_seconds"] = MIN_INTERVAL_SECONDS + CONFIG_PATH.write_text(yaml.safe_dump(new_cfg, allow_unicode=True, sort_keys=False), encoding="utf-8") + global config + config = new_cfg + reload_scheduler_jobs() + + +def reload_scheduler_jobs() -> None: + if scheduler_ref: + for job in list(scheduler_ref.get_jobs()): + if job.id.startswith("monitor:"): + scheduler_ref.remove_job(job.id) + schedule_monitors(scheduler_ref) + + +def parse_lines(text: str) -> list[str]: + return [x.strip() for x in (text or "").splitlines() if x.strip()] + + +def monitor_from_form( + original_index: int | None, + name: str, + mtype: str, + url: str, + interval_seconds: int, + keywords: str, + item_selector: str, + title_selector: str, + link_selector: str, + price_selector: str, + stock_selector: str, + keyword_match: bool, + new_item: bool, + price_change: bool, + stock_change: bool, +) -> dict[str, Any]: + m: dict[str, Any] = { + "name": name.strip(), + "type": mtype, + "url": url.strip(), + "interval_seconds": max(int(interval_seconds or MIN_INTERVAL_SECONDS), MIN_INTERVAL_SECONDS), + "keywords": parse_lines(keywords), + "notify_on": { + "keyword_match": keyword_match, + "new_item": new_item, + "price_change": price_change, + "stock_change": stock_change, + }, + } + if mtype == "rss": + m["forum"] = True + if mtype == "web": + selectors = { + "item": item_selector.strip() or "article, .thread, .post, li", + "title": title_selector.strip() or "h1, h2, h3, a", + "link": link_selector.strip() or "a", + } + if price_selector.strip(): + selectors["price"] = price_selector.strip() + if stock_selector.strip(): + selectors["stock"] = stock_selector.strip() + m["selectors"] = selectors + if not m["name"] or not m["url"]: + raise ValueError("名称和 URL 必填") + return m + + +def layout(title: str, body: str) -> str: + return f""" +{html_escape(title)} · tg-watchbot +""" + + +def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str: + m = m or {"type": "web", "interval_seconds": 60, "notify_on": {"keyword_match": True, "new_item": True, "price_change": True, "stock_change": True}, "selectors": {}} + selectors = m.get("selectors") or {} + no = m.get("notify_on") or {} + keywords = "\n".join(m.get("keywords") or []) + action = "/monitor/save" if idx is not None else "/monitor/create" + hidden = f"" if idx is not None else "" + def checked(k: str) -> str: + return "checked" if no.get(k, False) else "" + return f"""
{hidden} +
+
+
+
+ +

Web 选择器(RSS 可忽略)

+
+
+
+
+
+

提醒条件

+ + + + +

取消

""" + + +def create_panel_app() -> FastAPI: + app = FastAPI(title="tg-watchbot Panel") + + @app.middleware("http") + async def require_login_middleware(request: Request, call_next): + public_paths = {"/login", "/health", "/favicon.ico"} + if request.url.path in public_paths or is_logged_in(request): + return await call_next(request) + return RedirectResponse("/login", status_code=303) + + @app.get("/login", response_class=HTMLResponse) + async def login_get(request: Request): + if is_logged_in(request): + return RedirectResponse("/", status_code=303) + return HTMLResponse(login_page()) + + @app.post("/login") + async def login_post(username: str = Form(""), password: str = Form("")): + expected_user = os.getenv("WEB_PANEL_USER", "admin") + expected_pass = os.getenv("WEB_PANEL_PASSWORD", "admin") + if secrets.compare_digest(username, expected_user) and secrets.compare_digest(password, expected_pass): + resp = RedirectResponse("/", status_code=303) + resp.set_cookie("tg_watchbot_session", session_token(expected_user), httponly=True, secure=True, samesite="lax", max_age=60 * 60 * 24 * 14) + return resp + return HTMLResponse(login_page("用户名或密码错误"), status_code=401) + + @app.get("/logout") + async def logout() -> RedirectResponse: + resp = RedirectResponse("/login", status_code=303) + resp.delete_cookie("tg_watchbot_session") + return resp + + @app.get("/", response_class=HTMLResponse) + async def index(_: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh() + rows = [] + for i, m in enumerate(cfg.get("monitors") or []): + rows.append(f"""{html_escape(m.get('type','web'))}{html_escape(m.get('name',''))}
{html_escape(m.get('url',''))}{html_escape(m.get('interval_seconds',60))}s{html_escape(', '.join(m.get('keywords') or []))}编辑 预览 检查 删除""") + body = f"""

监控目标

当前 {len(cfg.get('monitors') or [])} 个;不限制数量,可继续新增。保存后自动重载定时任务。

""" + "".join(rows) + "
类型目标间隔关键词操作
" + return layout("监控", body) + + @app.get("/monitor/new", response_class=HTMLResponse) + async def new_monitor(_: str = Depends(panel_auth)) -> str: + return layout("新增监控", "

这里是新增单个监控。要一次加多个网站,用左侧/首页的「批量新增」。

" + monitor_form_html()) + + @app.get("/monitor/templates", response_class=HTMLResponse) + async def monitor_templates(_: str = Depends(panel_auth)) -> str: + body = """

论坛监控模板

NodeSeek / Linux.do 建议用 RSS,不抓网页 HTML,抗 Cloudflare 更稳。

""" + return layout("监控模板", body) + + @app.get("/monitor/template/{kind}", response_class=HTMLResponse) + async def monitor_template(kind: str, _: str = Depends(panel_auth)) -> str: + templates = { + "nodeseek": {"name": "NodeSeek 新帖", "type": "rss", "url": "https://rss.nodeseek.com/", "interval_seconds": 60, "keywords": ["NAT", "优惠", "补货", "VPS", "免费"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + "linuxdo": {"name": "Linux.do 最新", "type": "rss", "url": "https://linux.do/latest.rss", "interval_seconds": 60, "keywords": ["Claude", "Codex", "API", "VPS", "NAT"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + "linuxdo-resource": {"name": "Linux.do 资源荟萃", "type": "rss", "url": "https://linux.do/c/resource/14.rss", "interval_seconds": 60, "keywords": ["免费", "开源", "API", "Claude"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + } + m = templates.get(kind) + if not m: + raise HTTPException(404, "template not found") + return layout("使用模板新增", "

这是预设模板,保存即可加入监控;也可以先调整关键词。

" + monitor_form_html(m)) + + @app.get("/monitor/bulk", response_class=HTMLResponse) + async def bulk_monitor(_: str = Depends(panel_auth)) -> str: + sample = """NodeSeek|https://www.nodeseek.com/|免费鸡,优惠码,NAT +Linux.do|https://linux.do|公益,codex,claude +HostLoc|https://hostloc.com|VPS,补货,优惠""" + body = f"""

批量新增监控

一行一个网站,格式:名称|URL|关键词1,关键词2,关键词3。保存后会追加到现有列表,不会覆盖原有监控。

默认提醒条件

取消

""" + return layout("批量新增", body) + + @app.post("/monitor/bulk") + async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)): + cfg = cfg_load_fresh() + monitors = cfg.setdefault("monitors", []) + added = 0 + errors = [] + for line_no, raw in enumerate(items.splitlines(), 1): + line = raw.strip() + if not line or line.startswith('#'): + continue + parts = [x.strip() for x in line.split('|')] + if len(parts) < 2: + errors.append(f"第 {line_no} 行格式错误:{html_escape(line)}") + continue + name, url = parts[0], parts[1] + keywords = parts[2] if len(parts) >= 3 else "" + try: + monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change))) + added += 1 + except Exception as e: + errors.append(f"第 {line_no} 行失败:{html_escape(e)}") + try: + cfg_save(cfg) + except Exception as e: + logger.exception("bulk save failed") + return HTMLResponse(layout("批量新增失败", f"
{html_escape(e)}
"), status_code=500) + if errors: + return HTMLResponse(layout("批量新增完成", f"
已新增 {added} 个,部分行有问题:
{'
'.join(errors)}

返回

")) + return RedirectResponse("/", status_code=303) + + @app.get("/monitor/{idx}/edit", response_class=HTMLResponse) + async def edit_monitor(idx: int, _: str = Depends(panel_auth)) -> str: + monitors = cfg_load_fresh().get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + return layout("编辑监控", "

编辑监控

" + monitor_form_html(monitors[idx], idx)) + + async def save_form_common( + original_index: int | None, + name: str, + mtype: str, + url: str, + interval_seconds: int, + keywords: str, + item_selector: str, + title_selector: str, + link_selector: str, + price_selector: str, + stock_selector: str, + keyword_match: str | None, + new_item: str | None, + price_change: str | None, + stock_change: str | None, + ) -> RedirectResponse: + cfg = cfg_load_fresh() + monitors = cfg.setdefault("monitors", []) + m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change)) + if original_index is None: + monitors.append(m) + else: + monitors[original_index] = m + try: + cfg_save(cfg) + except Exception as e: + logger.exception("save monitor failed") + return HTMLResponse(layout("保存失败", f"

保存失败

{html_escape(e)}

返回

"), status_code=500) + return RedirectResponse("/", status_code=303) + + @app.post("/monitor/create") + async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + + @app.post("/monitor/save") + async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + + @app.get("/monitor/{idx}/delete") + async def delete_monitor(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if 0 <= idx < len(monitors): + monitors.pop(idx); cfg_save(cfg) + return RedirectResponse("/", status_code=303) + + + @app.get("/monitor/{idx}/preview", response_class=HTMLResponse) + async def monitor_preview(idx: int, _: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + m = monitors[idx] + timeout = int((cfg.get("http") or {}).get("timeout_seconds", 20)) + ua = (cfg.get("http") or {}).get("user_agent") or DEFAULT_UA + headers = {"User-Agent": ua} + try: + async with httpx.AsyncClient(timeout=timeout, headers=headers) as client: + body = await fetch_url(client, m.get("url")) + items = parse_rss_items(m, body) if m.get("type") == "rss" else parse_web_items(m, body) + rows=[] + for it in items[:15]: + blocked, br = item_blocked(it, m) + hits = keyword_hits(f"{it.title} {it.text}", m.get("keywords") or []) + rows.append(f"""{html_escape(it.title)}
{html_escape(it.link)}{html_escape(it.author or '-')}{html_escape(it.category or '-')}{html_escape(', '.join(hits) or '-')}{'跳过: '+html_escape(br) if blocked else '可推送/记录'}""") + body_html = "

抓取预览

只预览最近 15 条,不写入去重状态、不推送。

" + "".join(rows) + "
标题/链接作者分类命中状态
" + return layout("抓取预览", body_html) + except Exception as e: + return HTMLResponse(layout("抓取预览失败", f"
{html_escape(e)}
"), status_code=500) + + @app.get("/monitor/{idx}/run", response_class=HTMLResponse) + async def run_monitor_now(idx: int, _: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + count = await run_monitor(monitors[idx]) + return layout("检查完成", f"
已手动检查:{html_escape(monitors[idx].get('name'))},推送 {count} 条。

返回

") + + @app.get("/run-once", response_class=HTMLResponse) + async def run_once_page(_: str = Depends(panel_auth)) -> str: + await run_all_monitors_once() + return layout("手动检查完成", "
已执行全部监控检查。具体结果看日志/Telegram 推送。

返回

") + + @app.get("/yaml", response_class=HTMLResponse) + async def yaml_edit(_: str = Depends(panel_auth)) -> str: + text = CONFIG_PATH.read_text(encoding="utf-8") + return layout("YAML 高级编辑", f"

YAML 高级编辑

") + + @app.post("/yaml", response_class=HTMLResponse) + async def yaml_save(_: str = Depends(panel_auth), content: str = Form(...)) -> str: + try: + data = yaml.safe_load(content) or {} + cfg_save(data) + return layout("已保存", "
YAML 已保存并重载。

返回

") + except Exception as e: + return layout("保存失败", f"

保存失败

{html_escape(e)}

返回

") + + @app.get("/settings", response_class=HTMLResponse) + async def settings(_: str = Depends(panel_auth)) -> str: + v = env_values() + cleanup = (cfg_load_fresh().get("cleanup") or {}) + body = f"""

Bot / 面板设置

+ + +
+

监控数据自动清理

只清理 RSS/网站监控状态和去重记录,不删除用户、收件箱、双向对话消息。

+

改 Token、chat_id 或面板监听端口后建议重启服务:sudo systemctl restart tg-watchbot
""" + return layout("设置", body) + + @app.post("/settings", response_class=HTMLResponse) + async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: + write_env_values(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}) + cfg = cfg_load_fresh() + cfg["cleanup"] = { + "enabled": True, + "interval_minutes": max(1, int(CLEANUP_INTERVAL_MINUTES)), + "monitor_retention_minutes": max(1, int(CLEANUP_RETENTION_MINUTES)), + } + cfg_save(cfg) + return layout("已保存", "
.env 和监控清理设置已保存。Token/chat_id 需重启服务后生效。

返回

") + + + @app.get("/send", response_class=HTMLResponse) + async def send_page(_: str = Depends(panel_auth)) -> str: + with closing(db()) as conn: + users = conn.execute( + "SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 200" + ).fetchall() + options = [] + for u in users: + blocked = "(已封禁)" if u["blocked"] else "" + username = f"@{u['username']}" if u["username"] else "" + label = f"{u['full_name'] or u['user_id']} {username} · {u['user_id']} {blocked}" + options.append(f"") + body = f"""

主动发消息

只能发送给已经私聊过 Bot 的用户;这是 Telegram Bot API 限制。

+ + + +

查看收件箱

""" + return layout("主动发消息", body) + + @app.post("/send", response_class=HTMLResponse) + async def send_save(_: str = Depends(panel_auth), user_id: str = Form(""), manual_user_id: str = Form(""), text: str = Form("")) -> str: + raw_uid = (manual_user_id or user_id or "").strip() + if not raw_uid: + return layout("发送失败", "
缺少 user_id

返回

") + if not text.strip(): + return layout("发送失败", "
消息内容不能为空

返回

") + try: + uid = int(raw_uid) + if not get_user(uid): + return layout("发送失败", f"
找不到用户 {uid},对方需要先私聊 Bot。

返回

") + if is_blocked(uid): + return layout("发送失败", f"
用户 {uid} 已被封禁,请先 /unblock。

返回

") + if not bot: + return layout("发送失败", "
Bot 尚未初始化;请确认服务以正常模式运行,而不是 --panel-only。

返回

") + sent = await bot.send_message(uid, text.strip()) + logger.info("panel sent message to user_id=%s message_id=%s", uid, sent.message_id) + await admin_send(f"[主动发送成功]\nuser_id: {uid}\nmessage_id: {sent.message_id}\n时间:{html_escape(now_iso())}") + return layout("发送成功", f"
已发送给用户 {uid},message_id={sent.message_id}。Bot 也已给管理员发送确认提醒。

继续发送 收件箱

") + except TelegramAPIError as e: + logger.exception("panel send failed") + return layout("发送失败", f"
{html_escape(e)}

返回

") + except Exception as e: + logger.exception("panel send failed") + return layout("发送失败", f"
{html_escape(e)}

返回

") + + @app.get("/inbox", response_class=HTMLResponse) + async def inbox_page(_: str = Depends(panel_auth)) -> str: + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM inbox_messages ORDER BY id DESC LIMIT 200").fetchall() + trs = [] + for r in rows: + status_txt = "已转发" if r["forwarded"] else "未转发" + status_cls = "ok" if r["forwarded"] else "danger" + content = html_escape(r["text"] or "(非文本/媒体消息)") + trs.append(f"""#{r['id']}
{status_txt}{html_escape(r['full_name'])}
{r['user_id']} @{html_escape(r['username'] or '')}{html_escape(r['message_type'])}
{html_escape(r['created_at'])}{content}
{html_escape(r['error'] or '')}重试转发""") + body = "

收件箱

用户消息会先写入 SQLite,再转发给管理员;转发失败的消息可以在这里重试。

" + "".join(trs) + "
ID/状态用户类型/时间内容/错误操作
" + return layout("收件箱", body) + + @app.get("/inbox/{msg_id}/retry") + async def retry_inbox(msg_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + with closing(db()) as conn: + conn.execute("UPDATE inbox_messages SET forwarded=0, error=NULL WHERE id=?", (msg_id,)) + conn.commit() + await flush_pending_inbox() + return RedirectResponse("/inbox", status_code=303) + + @app.get("/restart", response_class=HTMLResponse) + async def restart_page(_: str = Depends(panel_auth)) -> str: + body = """

重启机器人

用于修改 Token、管理员 ID、面板设置等需要重启生效的配置。

""" + return layout("重启机器人", body) + + @app.post("/restart") + async def restart_post(_: str = Depends(panel_auth)) -> HTMLResponse: + async def delayed_restart(): + await asyncio.sleep(1.0) + # Exit with failure so systemd Restart=on-failure brings the service back up. + os._exit(1) + asyncio.create_task(delayed_restart()) + return HTMLResponse(layout("正在重启", "
已发送重启命令,约 5-10 秒后刷新页面。

返回首页

")) + + @app.get("/logs", response_class=HTMLResponse) + async def logs(_: str = Depends(panel_auth)) -> str: + text = LOG_PATH.read_text(encoding="utf-8", errors="replace")[-20000:] if LOG_PATH.exists() else "暂无日志" + return layout("日志", f"

最近应用日志

{html_escape(text)}
") + + @app.get("/health", response_class=PlainTextResponse) + async def health() -> str: + return "ok" + + return app + + +async def start_panel_server() -> uvicorn.Server | None: + if not panel_enabled(): + logger.info("web panel disabled") + return None + host = os.getenv("WEB_PANEL_HOST", "127.0.0.1") + port = int(os.getenv("WEB_PANEL_PORT", "8765")) + server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info")) + asyncio.create_task(server.serve()) + logger.info("web panel listening on http://%s:%s", host, port) + return server + +def validate_env() -> tuple[str, int]: + load_dotenv(ENV_PATH) + token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip() + admin = os.getenv("ADMIN_CHAT_ID", "").strip() + if not token: + raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}") + if not admin: + raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}") + return token, int(admin) + + +async def main_async(run_once: bool = False, panel_only: bool = False) -> None: + global bot, admin_chat_id, config, scheduler_ref + load_dotenv(ENV_PATH, override=True) + config = load_config() + setup_logging(os.getenv("LOG_LEVEL", "INFO")) + init_db() + if panel_only: + await start_panel_server() + logger.info("panel-only mode start") + while True: + await asyncio.sleep(3600) + if run_once: + # If .env is filled, send notifications during manual test; otherwise just log. + try: + token, admin_chat_id = validate_env() + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + except Exception as e: + logger.warning("run-once without Telegram notification: %s", e) + await run_all_monitors_once() + if bot: + await bot.session.close() + return + token, admin_chat_id = validate_env() + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + dp = Dispatcher() + dp.include_router(router) + scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") + scheduler_ref = scheduler + schedule_monitors(scheduler) + scheduler.start() + await start_panel_server() + asyncio.create_task(flush_pending_loop()) + asyncio.create_task(cleanup_monitor_loop()) + await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") + logger.info("bot polling start") + await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent") + parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured") + args = parser.parse_args() + try: + asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only)) + except KeyboardInterrupt: + pass + except Exception: + logger.exception("fatal error") + raise + + +if __name__ == "__main__": + main() diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..7e75443 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,47 @@ +http: + timeout_seconds: 20 + user_agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) + Chrome/124.0 Safari/537.36 tg-watchbot/1.0 +bot: + rate_limit: + window_seconds: 10 + max_messages: 3 +monitors: +- name: NodeSeek 新帖 + type: rss + url: https://rss.nodeseek.com/ + interval_seconds: 60 + keywords: + - VPS + - 优惠 + - 免费 + exclude_keywords: [] + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +- name: Linux.do 最新 + type: rss + url: https://linux.do/latest.rss + interval_seconds: 60 + keywords: + - API + - 教程 + - 开源 + exclude_keywords: [] + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +cleanup: + enabled: true + interval_minutes: 60 + monitor_retention_minutes: 1440 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..17855f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +aiogram==3.22.0 +APScheduler==3.11.1 +beautifulsoup4==4.14.3 +fastapi==0.136.1 +feedparser==6.0.12 +httpx==0.28.1 +python-dotenv==1.2.1 +python-multipart==0.0.20 +PyYAML==6.0.3 +uvicorn[standard]==0.38.0 diff --git a/systemd/tg-watchbot.service b/systemd/tg-watchbot.service new file mode 100644 index 0000000..2cb70fe --- /dev/null +++ b/systemd/tg-watchbot.service @@ -0,0 +1,22 @@ +[Unit] +Description=tg-watchbot +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +WorkingDirectory=/opt/tg-watchbot +EnvironmentFile=/opt/tg-watchbot/.env +ExecStart=/opt/tg-watchbot/.venv/bin/python /opt/tg-watchbot/app.py +Restart=on-failure +RestartSec=10 +User=tg-watchbot +Group=tg-watchbot +UMask=0077 +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=full +ReadWritePaths=/opt/tg-watchbot + +[Install] +WantedBy=multi-user.target