commit a53bb099648a3ab54265826356105cc33a803b5a Author: InfernoXuaI <1391197588@qq.com> Date: Wed May 20 15:39:02 2026 +0800 Initial commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..39601e9 --- /dev/null +++ b/.env.example @@ -0,0 +1,16 @@ +# tg-watchbot environment +# Create a bot with @BotFather, then put the token here. +TELEGRAM_BOT_TOKEN= + +# Telegram numeric chat id that receives monitor notifications and user messages. +ADMIN_CHAT_ID= + +LOG_LEVEL=INFO + +# Web admin panel. Bind to localhost by default; use a reverse proxy / Cloudflare Tunnel if exposing it. +WEB_PANEL_ENABLED=true +WEB_PANEL_HOST=127.0.0.1 +WEB_PANEL_PORT=8765 +WEB_PANEL_USER=admin +WEB_PANEL_PASSWORD=change-me +WEB_PANEL_SESSION_SECRET= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..32f3211 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.env +config.yaml +memory.md +*.log +*.sqlite3 +*.sqlite3-* +__pycache__/ +*.pyc +.venv/ +venv/ +.pytest_cache/ +.ruff_cache/ +.DS_Store diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..14fac91 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..775a628 --- /dev/null +++ b/README.md @@ -0,0 +1,287 @@ +# tg-watchbot + +tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人** 和 **Web/RSS 监控推送** 合在一起: + +- 普通用户私聊 Bot,消息会转发给管理员; +- 管理员可以直接回复、主动发文字/图片、封禁/备注用户; +- 后台定时监控 RSS 或网页,命中关键词、新条目、价格/库存变化后推送给管理员; +- 自带一个 Web 管理面板,可配置监控目标、编辑 YAML、查看收件箱和日志。 + +项目为单文件应用,适合个人服务器、NAT 小鸡、轻量 VPS 直接用 systemd 跑。 + +## 功能 + +### Telegram 双向机器人 + +- 使用官方 Telegram Bot API,不做 userbot/selfbot。 +- `/start` 建立用户和管理员之间的联系。 +- 用户消息先写入 SQLite,再转发给管理员,避免转发失败时丢消息。 +- 管理员可通过“回复转发消息”直接回给原用户。 +- 支持显式命令: + - `/reply <内容>`:给指定用户发文字; + - `/sendpic [说明]`:给指定用户发图片; + - `/block `:封禁用户; + - `/unblock `:解封用户; + - `/note <备注>`:给用户加备注; + - `/who `:查看用户信息; + - `/cancel`:取消待发送图片。 +- 普通用户有简单限流,防止刷屏。 + +### Web/RSS 监控 + +- 支持两类监控: + - `rss`:解析 RSS/Atom 条目; + - `web`:用 CSS selector 抓网页条目、标题、链接、价格、库存。 +- 支持触发条件: + - 关键词命中; + - 新条目; + - 价格变化; + - 库存变化。 +- 支持论坛 RSS 增强字段:作者、分类、tags、摘要。 +- 支持去重,避免同一条反复推送。 +- 支持屏蔽词、作者、分类过滤(YAML 高级配置)。 +- 默认最低监控间隔为 60 秒。 + +### Web 管理面板 + +- 登录页 + HttpOnly session cookie,不使用丑陋的浏览器 Basic Auth。 +- 监控列表、新增、编辑、删除、手动检查、预览。 +- NodeSeek / Linux.do RSS 模板。 +- 批量新增监控。 +- YAML 高级编辑。 +- Bot Token / 管理员 ID / 面板账号配置页。 +- 收件箱页面,可查看用户消息和重试转发。 +- 主动发消息页面 `/send`,发送成功后会在页面显示结果,并给管理员聊天发送确认提醒。 +- 自动清理监控/RSS/网站状态数据;不会删除用户、收件箱、双向对话消息。 +- 日志页面和健康检查 `/health`。 + +## 借鉴 / 使用的开源库 + +本项目的业务逻辑为自写,主要使用并参考了以下开源库的公开 API 和常见用法: + +- [`aiogram`](https://github.com/aiogram/aiogram):Telegram Bot API、命令、消息处理、复制/发送消息。 +- [`FastAPI`](https://github.com/fastapi/fastapi):Web 管理面板、表单、路由、中间件。 +- [`Uvicorn`](https://github.com/encode/uvicorn):ASGI 服务运行。 +- [`APScheduler`](https://github.com/agronholm/apscheduler):异步定时监控任务。 +- [`httpx`](https://github.com/encode/httpx):异步 HTTP 抓取。 +- [`feedparser`](https://github.com/kurtmckee/feedparser):RSS/Atom 解析。 +- [`Beautiful Soup`](https://www.crummy.com/software/BeautifulSoup/):HTML 解析和 CSS selector 抽取。 +- [`PyYAML`](https://pyyaml.org/):`config.yaml` 配置读写。 +- [`python-dotenv`](https://github.com/theskumar/python-dotenv):读取 `.env`。 +- Python 标准库 `sqlite3`:消息、用户、去重、监控状态持久化。 + +## 安全 / 脱敏说明 + +- 仓库不包含真实 `TELEGRAM_BOT_TOKEN`、`ADMIN_CHAT_ID`、面板密码、Session Secret、数据库、日志。 +- 真实配置放在 `.env`,不要提交。 +- `.gitignore` 已忽略 `.env`、SQLite、日志、虚拟环境和缓存。 +- 如果要把面板暴露到公网,建议使用 Cloudflare Access / 反代鉴权,并使用强密码。 +- Bot 只能给“已经主动私聊过 Bot 的用户”发消息,这是 Telegram Bot API 的限制。 + +## 快速开始 + +```bash +git clone tg-watchbot +cd tg-watchbot +python3 -m venv .venv +./.venv/bin/pip install -U pip +./.venv/bin/pip install -r requirements.txt +cp .env.example .env +cp config.example.yaml config.yaml +nano .env +``` + +至少填写: + +```dotenv +TELEGRAM_BOT_TOKEN= +ADMIN_CHAT_ID= +WEB_PANEL_USER=admin +WEB_PANEL_PASSWORD= +``` + +启动: + +```bash +./.venv/bin/python app.py +``` + +打开面板: + +```text +http://127.0.0.1:8765 +``` + +如果只是先配置面板、还没有 Telegram Token: + +```bash +./.venv/bin/python app.py --panel-only +``` + +手动跑一次监控: + +```bash +./.venv/bin/python app.py --run-once +``` + +## systemd 部署 + +推荐部署到 `/opt/tg-watchbot`: + +```bash +sudo useradd --system --no-create-home --shell /usr/sbin/nologin tg-watchbot || true +sudo mkdir -p /opt/tg-watchbot +sudo chown -R "$USER:$USER" /opt/tg-watchbot + +cd /opt/tg-watchbot +git clone . +python3 -m venv .venv +./.venv/bin/pip install -U pip +./.venv/bin/pip install -r requirements.txt +cp .env.example .env +cp config.example.yaml config.yaml +nano .env + +sudo chown -R tg-watchbot:tg-watchbot /opt/tg-watchbot +sudo chmod 600 /opt/tg-watchbot/.env +sudo cp systemd/tg-watchbot.service /etc/systemd/system/tg-watchbot.service +sudo systemctl daemon-reload +sudo systemctl enable --now tg-watchbot +sudo journalctl -u tg-watchbot -f +``` + +健康检查: + +```bash +curl http://127.0.0.1:8765/health +``` + +## 配置说明 + +### `.env` + +| 变量 | 说明 | +|---|---| +| `TELEGRAM_BOT_TOKEN` | BotFather 创建的 Telegram Bot Token | +| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id,用于接收用户消息和监控通知 | +| `LOG_LEVEL` | 日志级别,默认 `INFO` | +| `WEB_PANEL_ENABLED` | 是否启用 Web 面板,默认 `true` | +| `WEB_PANEL_HOST` | 面板监听地址,默认 `127.0.0.1` | +| `WEB_PANEL_PORT` | 面板端口,默认 `8765` | +| `WEB_PANEL_USER` | 面板用户名 | +| `WEB_PANEL_PASSWORD` | 面板密码 | +| `WEB_PANEL_SESSION_SECRET` | Session Secret,留空会自动生成并写回 `.env` | + +### `config.yaml` + +监控数据自动清理示例: + +```yaml +cleanup: + enabled: true + interval_minutes: 60 # 每多少分钟执行一次清理 + monitor_retention_minutes: 1440 # RSS/网站监控状态保留多久 +``` + +清理范围只包括: + +- `monitor_state`:网站/RSS 条目状态、价格/库存状态; +- `sent_events`:监控推送去重记录。 + +不会删除: + +- `users`; +- `message_map`; +- `inbox_messages`; +- 任何双向对话/客服消息记录。 + +RSS 示例: + +```yaml +monitors: + - name: NodeSeek 新帖 + type: rss + url: https://rss.nodeseek.com/ + interval_seconds: 60 + keywords: + - VPS + - 优惠 + exclude_keywords: + - 出号 + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +``` + +网页示例: + +```yaml +monitors: + - name: Example Deals + type: web + url: https://example.com/deals + interval_seconds: 300 + keywords: + - discount + selectors: + item: article, .deal, li + title: h1, h2, h3, a + link: a + price: .price + stock: .stock + notify_on: + keyword_match: true + new_item: true + price_change: true + stock_change: true +``` + +## 管理命令 + +管理员在 Telegram 里可用: + +```text +/reply <内容> +/sendpic [图片说明] +/block +/unblock +/note <备注> +/who +/cancel +``` + +也可以直接“回复 Bot 转发给管理员的用户消息”,Bot 会按映射把回复发回原用户。 + +## 面板路由 + +| 路由 | 说明 | +|---|---| +| `/` | 监控列表 | +| `/monitor/new` | 新增监控 | +| `/monitor/templates` | 论坛模板 | +| `/monitor/bulk` | 批量新增 | +| `/monitor/{idx}/preview` | 预览抓取结果,不写入状态、不推送 | +| `/monitor/{idx}/run` | 手动检查单个监控 | +| `/run-once` | 手动检查全部监控 | +| `/yaml` | YAML 高级编辑 | +| `/settings` | `.env` 设置和监控清理策略 | +| `/send` | 主动发消息给已私聊过 Bot 的用户 | +| `/inbox` | 收件箱 | +| `/logs` | 日志 | +| `/health` | 健康检查 | + +## 注意事项 + +- Telegram Bot 不能主动私聊陌生人;对方必须先给 Bot 发过 `/start` 或任意消息。 +- 对公网暴露 Web 面板前,务必改默认密码。 +- RSS 监控建议 60 秒起步;网页监控建议更保守,避免对目标站造成压力。 +- 媒体消息当前只保证记录文本/说明和转发状态;转发失败后的媒体补发需要额外做本地附件存储。 + +## License + +MIT diff --git a/app.py b/app.py new file mode 100644 index 0000000..66dae04 --- /dev/null +++ b/app.py @@ -0,0 +1,1524 @@ +#!/usr/bin/env python3 +"""tg-watchbot: Telegram two-way support bot + web/RSS monitor. + +- Official Telegram Bot API via aiogram (no userbot/selfbot). +- SQLite state for dedupe, users, admin-message mapping, blocks, notes, monitor state. +- APScheduler async jobs for monitoring. +""" + +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import html +import logging +import os +import re +import secrets +import signal +import sqlite3 +import time +from contextlib import closing +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from urllib.parse import urljoin + +import feedparser +import httpx +import yaml +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from bs4 import BeautifulSoup +from dotenv import load_dotenv + +from aiogram import Bot, Dispatcher, F, Router +from aiogram.enums import ParseMode +from aiogram.exceptions import TelegramAPIError +from aiogram.filters import Command, CommandObject +from aiogram.types import Message +from aiogram.client.default import DefaultBotProperties +from fastapi import Depends, FastAPI, Form, HTTPException, Request, Response, status +from fastapi.responses import HTMLResponse, RedirectResponse, PlainTextResponse +import uvicorn + +BASE_DIR = Path(__file__).resolve().parent +DB_PATH = BASE_DIR / "tg-watchbot.sqlite3" +CONFIG_PATH = BASE_DIR / "config.yaml" +ENV_PATH = BASE_DIR / ".env" +LOG_PATH = BASE_DIR / "tg-watchbot.log" +MIN_INTERVAL_SECONDS = 60 + +DEFAULT_UA = ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36 tg-watchbot/1.0" +) + +logger = logging.getLogger("tg-watchbot") +router = Router() +bot: Bot | None = None +admin_chat_id: int | None = None +config: dict[str, Any] = {} +rate_buckets: dict[int, list[float]] = {} +pending_sendpic: dict[int, dict[str, Any]] = {} +scheduler_ref: AsyncIOScheduler | None = None + + +def setup_logging(level: str = "INFO") -> None: + LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + handlers=[logging.StreamHandler(), logging.FileHandler(LOG_PATH, encoding="utf-8")], + ) + + +def load_config() -> dict[str, Any]: + if not CONFIG_PATH.exists(): + raise FileNotFoundError(f"missing config: {CONFIG_PATH}") + with CONFIG_PATH.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return data + + +def monitor_cleanup_settings() -> dict[str, int | bool]: + cleanup = (config.get("cleanup") or {}) if isinstance(config, dict) else {} + return { + "enabled": bool(cleanup.get("enabled", True)), + "interval_minutes": max(1, int(cleanup.get("interval_minutes", 60))), + "retention_minutes": max(1, int(cleanup.get("monitor_retention_minutes", 1440))), + } + + +def db() -> sqlite3.Connection: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db() -> None: + with closing(db()) as conn: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + CREATE TABLE IF NOT EXISTS users ( + user_id INTEGER PRIMARY KEY, + username TEXT, + full_name TEXT, + note TEXT DEFAULT '', + blocked INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS message_map ( + admin_chat_id INTEGER NOT NULL, + admin_message_id INTEGER NOT NULL, + user_id INTEGER NOT NULL, + user_message_id INTEGER, + created_at TEXT NOT NULL, + PRIMARY KEY (admin_chat_id, admin_message_id) + ); + CREATE TABLE IF NOT EXISTS sent_events ( + event_key TEXT PRIMARY KEY, + monitor_name TEXT NOT NULL, + title TEXT, + link TEXT, + created_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS monitor_state ( + monitor_name TEXT NOT NULL, + item_key TEXT NOT NULL, + price TEXT, + stock TEXT, + title TEXT, + link TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY (monitor_name, item_key) + ); + CREATE TABLE IF NOT EXISTS inbox_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + username TEXT, + full_name TEXT, + user_message_id INTEGER, + message_type TEXT, + text TEXT, + forwarded INTEGER DEFAULT 0, + admin_header_message_id INTEGER, + admin_copy_message_id INTEGER, + created_at TEXT NOT NULL, + forwarded_at TEXT, + error TEXT + ); + """ + ) + conn.commit() + + +def now_iso() -> str: + return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + + +def html_escape(text: Any) -> str: + return html.escape(str(text or ""), quote=False) + + +def user_display(message: Message) -> tuple[int, str, str | None]: + u = message.from_user + if not u: + return 0, "unknown", None + full = " ".join(x for x in [u.first_name, u.last_name] if x).strip() or str(u.id) + return u.id, full, u.username + + +def upsert_user(user_id: int, full_name: str, username: str | None) -> None: + ts = now_iso() + with closing(db()) as conn: + conn.execute( + """ + INSERT INTO users(user_id, username, full_name, created_at, updated_at) + VALUES(?, ?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + username=excluded.username, + full_name=excluded.full_name, + updated_at=excluded.updated_at + """, + (user_id, username, full_name, ts, ts), + ) + conn.commit() + + +def get_user(user_id: int) -> sqlite3.Row | None: + with closing(db()) as conn: + return conn.execute("SELECT * FROM users WHERE user_id=?", (user_id,)).fetchone() + + +def is_blocked(user_id: int) -> bool: + row = get_user(user_id) + return bool(row and row["blocked"]) + + +def set_block(user_id: int, blocked: bool) -> None: + with closing(db()) as conn: + conn.execute( + "UPDATE users SET blocked=?, updated_at=? WHERE user_id=?", + (1 if blocked else 0, now_iso(), user_id), + ) + conn.commit() + + +def set_note(user_id: int, note: str) -> None: + with closing(db()) as conn: + conn.execute("UPDATE users SET note=?, updated_at=? WHERE user_id=?", (note, now_iso(), user_id)) + conn.commit() + + +def rate_limited(user_id: int) -> bool: + rl = (config.get("bot") or {}).get("rate_limit") or {} + window = int(rl.get("window_seconds", 10)) + max_messages = int(rl.get("max_messages", 3)) + t = time.time() + bucket = [x for x in rate_buckets.get(user_id, []) if t - x <= window] + bucket.append(t) + rate_buckets[user_id] = bucket + return len(bucket) > max_messages + + +def save_message_map(admin_msg: Message, user_id: int, user_message_id: int | None) -> None: + with closing(db()) as conn: + conn.execute( + "INSERT OR REPLACE INTO message_map(admin_chat_id, admin_message_id, user_id, user_message_id, created_at) VALUES(?,?,?,?,?)", + (admin_msg.chat.id, admin_msg.message_id, user_id, user_message_id, now_iso()), + ) + conn.commit() + + + + +def create_inbox_message(message: Message, user_id: int, full_name: str, username: str | None) -> int: + msg_type = "text" if message.text else (message.content_type or "message") + text = message.text or message.caption or "" + with closing(db()) as conn: + cur = conn.execute( + """ + INSERT INTO inbox_messages(user_id, username, full_name, user_message_id, message_type, text, created_at) + VALUES(?,?,?,?,?,?,?) + """, + (user_id, username, full_name, message.message_id, msg_type, text, now_iso()), + ) + conn.commit() + return int(cur.lastrowid) + + +def mark_inbox_forwarded(inbox_id: int, header_id: int | None = None, copy_id: int | None = None) -> None: + with closing(db()) as conn: + conn.execute( + "UPDATE inbox_messages SET forwarded=1, admin_header_message_id=?, admin_copy_message_id=?, forwarded_at=?, error=NULL WHERE id=?", + (header_id, copy_id, now_iso(), inbox_id), + ) + conn.commit() + + +def mark_inbox_error(inbox_id: int, error: str) -> None: + with closing(db()) as conn: + conn.execute("UPDATE inbox_messages SET error=? WHERE id=?", (error[:1000], inbox_id)) + conn.commit() + + +def pending_inbox(limit: int = 50) -> list[sqlite3.Row]: + with closing(db()) as conn: + return list(conn.execute("SELECT * FROM inbox_messages WHERE forwarded=0 ORDER BY id ASC LIMIT ?", (limit,)).fetchall()) + + +def lookup_reply_target(admin_chat: int, admin_message_id: int) -> int | None: + with closing(db()) as conn: + row = conn.execute( + "SELECT user_id FROM message_map WHERE admin_chat_id=? AND admin_message_id=?", + (admin_chat, admin_message_id), + ).fetchone() + return int(row["user_id"]) if row else None + + +def parse_user_id_and_text(args: str | None) -> tuple[int, str]: + if not args: + raise ValueError("缺少参数") + parts = args.strip().split(maxsplit=1) + if len(parts) < 2: + raise ValueError("格式应为:/reply <内容>") + return int(parts[0]), parts[1] + + +def parse_user_id(args: str | None) -> int: + if not args: + raise ValueError("缺少 user_id") + return int(args.strip().split()[0]) + + +def parse_user_id_and_optional_text(args: str | None) -> tuple[int, str]: + if not args: + raise ValueError("缺少 user_id") + parts = args.strip().split(maxsplit=1) + uid = int(parts[0]) + caption = parts[1] if len(parts) > 1 else "" + return uid, caption + + +def describe_sendpic_target(user_id: int) -> str: + row = get_user(user_id) + if not row: + return str(user_id) + username = f"@{row['username']}" if row['username'] else "" + full_name = row['full_name'] or str(user_id) + return f"{full_name} {username}".strip() + + +async def admin_send(text: str) -> None: + if not bot or admin_chat_id is None: + logger.error("admin_send called before bot/admin init: %s", text) + return + try: + await bot.send_message(admin_chat_id, text, disable_web_page_preview=False) + except Exception: + logger.exception("failed to send admin notification") + + +def is_admin_chat(message: Message) -> bool: + """Dynamic admin-chat filter.""" + return admin_chat_id is not None and message.chat.id == admin_chat_id + + +def is_admin_action_message(message: Message) -> bool: + """Only catch admin messages that are part of an action flow. + + A broad admin-chat handler would swallow ordinary admin messages before the + fallback handler. Keep it narrow: reply-to-user and /sendpic photo flow only. + """ + if not is_admin_chat(message): + return False + if pending_sendpic.get(message.chat.id): + return True + return bool(message.reply_to_message and message.text) + + +@router.message(Command("start")) +async def start(message: Message) -> None: + uid, full, username = user_display(message) + if not uid: + return + upsert_user(uid, full, username) + if is_blocked(uid): + await message.answer("你当前无法发送消息。") + return + await message.answer("已连接客服/管理员。你发来的消息会转交给管理员,请直接输入内容。") + + +async def send_text_to_user_from_admin(message: Message, args: str | None, command_name: str) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, text = parse_user_id_and_text(args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}。对方需要先私聊 Bot 或 /start,Telegram Bot 才能主动发送。") + return + if is_blocked(uid): + await message.reply(f"错误:用户 {uid} 已被封禁,先 /unblock {uid}") + return + if not bot: + await message.reply("错误:Bot 尚未初始化") + return + sent = await bot.send_message(uid, text) # type: ignore[union-attr] + await message.reply(f"{command_name} 成功:已发送给用户 {uid},message_id={sent.message_id}") + except Exception as e: + logger.exception("/%s failed", command_name) + await message.reply(f"/{command_name} 失败:{e}\n用法:/{command_name} <内容>") + + +@router.message(Command("reply")) +async def cmd_reply(message: Message, command: CommandObject) -> None: + await send_text_to_user_from_admin(message, command.args, "reply") + + +@router.message(Command("send")) +async def cmd_send(message: Message, command: CommandObject) -> None: + await send_text_to_user_from_admin(message, command.args, "send") + + +@router.message(Command("sendpic")) +async def cmd_sendpic(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, caption = parse_user_id_and_optional_text(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid},对方需要先 /start 机器人") + return + if is_blocked(uid): + await message.reply(f"错误:用户 {uid} 已被封禁,先 /unblock {uid}") + return + pending_sendpic[message.chat.id] = {"target": uid, "caption": caption, "created_at": time.time()} + suffix = f"\n说明文字:{caption}" if caption else "" + await message.reply( + f"请发送需要转发给 {uid}({html_escape(describe_sendpic_target(uid))})的图片。{suffix}\n" + "2 分钟内发送一张图片即可;发送 /cancel 取消。" + ) + except Exception as e: + logger.exception("/sendpic failed") + await message.reply(f"/sendpic 失败:{e}\n用法:/sendpic 用户ID [可选图片说明]") + + +@router.message(Command("cancel")) +async def cmd_cancel(message: Message) -> None: + if message.chat.id == admin_chat_id and pending_sendpic.pop(message.chat.id, None): + await message.reply("已取消待发送图片。") + + +@router.message(Command("block")) +async def cmd_block(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_block(uid, True) + await message.reply(f"已封禁用户 {uid}") + except Exception as e: + logger.exception("/block failed") + await message.reply(f"/block 失败:{e}") + + +@router.message(Command("unblock")) +async def cmd_unblock(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_block(uid, False) + await message.reply(f"已解封用户 {uid}") + except Exception as e: + logger.exception("/unblock failed") + await message.reply(f"/unblock 失败:{e}") + + +@router.message(Command("note")) +async def cmd_note(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid, note = parse_user_id_and_text(command.args) + if not get_user(uid): + await message.reply(f"错误:找不到用户 {uid}") + return + set_note(uid, note) + await message.reply(f"已更新用户 {uid} 备注") + except Exception as e: + logger.exception("/note failed") + await message.reply(f"/note 失败:{e}") + + +@router.message(Command("who")) +async def cmd_who(message: Message, command: CommandObject) -> None: + if message.chat.id != admin_chat_id: + return + try: + uid = parse_user_id(command.args) + row = get_user(uid) + if not row: + await message.reply(f"错误:找不到用户 {uid}") + return + await message.reply( + "用户信息\n" + f"user_id: {row['user_id']}\n" + f"username: @{row['username']}\n" + f"full_name: {row['full_name']}\n" + f"blocked: {bool(row['blocked'])}\n" + f"note: {row['note'] or ''}\n" + f"created_at: {row['created_at']}\n" + f"updated_at: {row['updated_at']}" + ) + except Exception as e: + logger.exception("/who failed") + await message.reply(f"/who 失败:{e}") + + +@router.message(is_admin_action_message) +async def admin_reply_by_message(message: Message) -> None: + # Pending /sendpic flow: after /sendpic , the next admin photo is copied to target. + pending = pending_sendpic.get(message.chat.id) + if pending: + if time.time() - float(pending.get("created_at", 0)) > 120: + pending_sendpic.pop(message.chat.id, None) + await message.reply("发送图片超时,已取消。请重新使用 /sendpic 用户ID。") + return + if message.photo: + target = int(pending["target"]) + caption = (message.caption or pending.get("caption") or "")[:1024] + try: + if is_blocked(target): + pending_sendpic.pop(message.chat.id, None) + await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}") + return + await bot.send_photo(target, message.photo[-1].file_id, caption=caption or None) # type: ignore[union-attr] + pending_sendpic.pop(message.chat.id, None) + await message.reply(f"已发送图片给用户 {target}") + except TelegramAPIError as e: + logger.exception("/sendpic photo forwarding failed") + await message.reply(f"图片发送失败:{e}") + return + if message.text and message.text.startswith("/"): + return + await message.reply("请发送一张图片;或发送 /cancel 取消。") + return + + # Admin replies to forwarded/copy notification in admin chat. + if not message.reply_to_message or not message.text: + return + target = lookup_reply_target(message.chat.id, message.reply_to_message.message_id) + if not target: + return + try: + if is_blocked(target): + await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}") + return + sent = await bot.send_message(target, message.text) # type: ignore[union-attr] + await message.reply(f"已发送给用户 {target},message_id={sent.message_id}") + except TelegramAPIError as e: + logger.exception("admin reply forwarding failed") + await message.reply(f"发送失败:{e}") + + +@router.message(is_admin_chat) +async def admin_plain_message(message: Message) -> None: + # Do not silently swallow ordinary admin messages. + if message.text and not message.text.startswith("/"): + await message.reply( + "管理员普通消息不会自动转发。请使用:\n" + "/send <内容>\n" + "/reply <内容>\n" + "或在收件箱里回复某条用户消息;也可以打开面板的「主动发消息」。" + ) + + +@router.message() +async def user_message(message: Message) -> None: + # Only relay private user chats to admin. + logger.info("incoming message chat_id=%s chat_type=%s from_user=%s content_type=%s text=%r", message.chat.id, message.chat.type, getattr(message.from_user, 'id', None), message.content_type, (message.text or '')[:80]) + if message.chat.id == admin_chat_id: + logger.info("incoming message is admin plain message; ignored by user relay") + return + if message.chat.type != "private": + logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type) + return + uid, full, username = user_display(message) + if not uid: + return + upsert_user(uid, full, username) + if is_blocked(uid): + await message.answer("你当前无法发送消息。") + return + if rate_limited(uid): + await message.answer("发送太快了,请稍后再试。") + return + inbox_id = create_inbox_message(message, uid, full, username) + user_row = get_user(uid) + note = user_row["note"] if user_row and "note" in user_row.keys() else "" + header = ( + f"[用户消息 #{inbox_id}]\n" + f"user_id: {uid}\n" + f"name: {html_escape(full)}\n" + f"username: @{html_escape(username or '')}\n" + f"note: {html_escape(note)}\n" + f"time: {html_escape(now_iso())}" + ) + try: + sent = await bot.send_message(admin_chat_id, header) # type: ignore[union-attr] + save_message_map(sent, uid, message.message_id) + copied = await message.copy_to(admin_chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type] + save_message_map(copied, uid, message.message_id) + mark_inbox_forwarded(inbox_id, sent.message_id, copied.message_id) + await message.answer("已转交管理员。") + except Exception as e: + mark_inbox_error(inbox_id, repr(e)) + logger.exception("failed to relay user message, saved inbox_id=%s", inbox_id) + await message.answer("已收到留言,但转发管理员暂时失败;系统会稍后自动重试。") + + +@dataclass +class MonitorItem: + key: str + title: str + link: str + text: str + price: str | None = None + stock: str | None = None + author: str | None = None + published: str | None = None + category: str | None = None + + +def stable_key(*parts: str) -> str: + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest() + + +def extract_price(text: str) -> str | None: + m = re.search(r"(?:¥|¥|\$|USD|CNY)?\s*\d+(?:[.,]\d{1,2})?", text, re.I) + return m.group(0).strip() if m else None + + +def keyword_hits(text: str, keywords: list[str]) -> list[str]: + low = text.lower() + return [k for k in keywords if k and k.lower() in low] + + +def item_blocked(item: MonitorItem, monitor: dict[str, Any]) -> tuple[bool, str]: + text = f"{item.title} {item.text} {item.author or ''} {item.category or ''}" + exclude_hits = keyword_hits(text, monitor.get("exclude_keywords") or []) + if exclude_hits: + return True, "屏蔽词 " + ", ".join(exclude_hits) + authors = [a.lower() for a in (monitor.get("authors") or []) if a] + if authors and (item.author or "").lower() not in authors: + return True, "作者不匹配" + categories = [c.lower() for c in (monitor.get("categories") or []) if c] + if categories and not any(c in (item.category or "").lower() for c in categories): + return True, "分类不匹配" + return False, "" + + +async def fetch_url(client: httpx.AsyncClient, url: str) -> str: + resp = await client.get(url, follow_redirects=True) + resp.raise_for_status() + return resp.text + + +def parse_web_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]: + selectors = monitor.get("selectors") or {} + item_sel = selectors.get("item") or "article, .thread, .post, li" + title_sel = selectors.get("title") or "h1, h2, h3, a" + link_sel = selectors.get("link") or "a" + price_sel = selectors.get("price") + stock_sel = selectors.get("stock") + soup = BeautifulSoup(body, "html.parser") + nodes = soup.select(item_sel)[:100] + if not nodes: + nodes = [soup] + items: list[MonitorItem] = [] + for node in nodes: + title_node = node.select_one(title_sel) if title_sel else None + link_node = node.select_one(link_sel) if link_sel else None + title = (title_node.get_text(" ", strip=True) if title_node else node.get_text(" ", strip=True)[:120]).strip() + href = link_node.get("href") if link_node else "" + link = urljoin(monitor.get("url", ""), href) if href else monitor.get("url", "") + text = node.get_text(" ", strip=True) + price = None + stock = None + if price_sel and (p := node.select_one(price_sel)): + price = p.get_text(" ", strip=True) + else: + price = extract_price(text) + if stock_sel and (s := node.select_one(stock_sel)): + stock = s.get_text(" ", strip=True) + for hint in ["有货", "无货", "缺货", "in stock", "out of stock", "sold out", "available"]: + if hint.lower() in text.lower(): + stock = hint + break + if title or text: + key = stable_key(link, title or text[:80]) + items.append(MonitorItem(key=key, title=title or "(no title)", link=link, text=text, price=price, stock=stock)) + return items + + +def canonical_forum_key(link: str, entry_id: str = "") -> str: + """Return a stable topic/post key that survives title edits and RSS updated ids.""" + target = link or entry_id + patterns = [ + r"nodeseek\.com/post-(\d+)", + r"linux\.do/t/(?:[^/]+/)?(\d+)", + r"/t/(?:[^/]+/)?(\d+)", + ] + for value in [target, entry_id, link]: + for pattern in patterns: + m = re.search(pattern, value or "", re.I) + if m: + return m.group(1) + return stable_key(link or entry_id) + + +def parse_rss_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]: + feed = feedparser.parse(body) + items: list[MonitorItem] = [] + for e in feed.entries[:100]: + title = getattr(e, "title", "(no title)") + link = getattr(e, "link", monitor.get("url", "")) + summary = getattr(e, "summary", "") + content = " ".join([c.get("value", "") for c in getattr(e, "content", []) if isinstance(c, dict)]) + published = getattr(e, "published", "") or getattr(e, "updated", "") + author = getattr(e, "author", "") or getattr(e, "dc_creator", "") + category = "" + tags = getattr(e, "tags", None) or [] + if tags: + category = ", ".join([t.get("term", "") for t in tags if isinstance(t, dict) and t.get("term")]) + entry_id = getattr(e, "id", "") or getattr(e, "guid", "") + key = canonical_forum_key(link, entry_id) if (monitor.get("forum") or monitor.get("type") == "rss") else stable_key(entry_id, link, title) + items.append(MonitorItem(key=key, title=title, link=link, text=f"{title} {summary} {content} {published} {author} {category}", author=author, published=published, category=category)) + return items + + +def should_notify_and_update(monitor: dict[str, Any], item: MonitorItem, hits: list[str]) -> list[str]: + name = monitor["name"] + notify_on = monitor.get("notify_on") or {} + reasons: list[str] = [] + with closing(db()) as conn: + prev = conn.execute( + "SELECT * FROM monitor_state WHERE monitor_name=? AND item_key=?", + (name, item.key), + ).fetchone() + is_forum = bool(monitor.get("forum") or monitor.get("type") == "rss") + if is_forum: + # 论坛/RSS 帖子只在首次出现并命中时通知一次。 + # 后续 RSS 因回复/编辑把同一链接重新排到前面时,只更新状态,不再重复推送。 + if prev is None: + if notify_on.get("new_item", False): + reasons.append("新条目") + if notify_on.get("keyword_match", True) and hits: + reasons.append("关键词 " + ", ".join(hits)) + else: + if prev is None: + if notify_on.get("new_item", False): + reasons.append("新条目") + else: + if notify_on.get("price_change", False) and (item.price or "") != (prev["price"] or ""): + reasons.append(f"价格变化 {prev['price'] or '-'} -> {item.price or '-'}") + if notify_on.get("stock_change", False) and (item.stock or "") != (prev["stock"] or ""): + reasons.append(f"库存变化 {prev['stock'] or '-'} -> {item.stock or '-'}") + if notify_on.get("keyword_match", True) and hits: + reasons.append("关键词 " + ", ".join(hits)) + conn.execute( + """ + INSERT INTO monitor_state(monitor_name, item_key, price, stock, title, link, updated_at) + VALUES(?,?,?,?,?,?,?) + ON CONFLICT(monitor_name, item_key) DO UPDATE SET + price=excluded.price, stock=excluded.stock, title=excluded.title, + link=excluded.link, updated_at=excluded.updated_at + """, + (name, item.key, item.price, item.stock, item.title, item.link, now_iso()), + ) + conn.commit() + return reasons + + +def event_not_sent(event_key: str, monitor_name: str, title: str, link: str) -> bool: + with closing(db()) as conn: + try: + conn.execute( + "INSERT INTO sent_events(event_key, monitor_name, title, link, created_at) VALUES(?,?,?,?,?)", + (event_key, monitor_name, title, link, now_iso()), + ) + conn.commit() + return True + except sqlite3.IntegrityError: + return False + + +async def run_monitor(monitor: dict[str, Any]) -> int: + name = monitor.get("name", "unnamed") + mtype = monitor.get("type", "web") + url = monitor.get("url") + if not url: + logger.error("monitor %s missing url", name) + return 0 + keywords = monitor.get("keywords") or [] + timeout = int((config.get("http") or {}).get("timeout_seconds", 20)) + ua = (config.get("http") or {}).get("user_agent") or DEFAULT_UA + headers = {"User-Agent": ua, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"} + sent_count = 0 + try: + async with httpx.AsyncClient(timeout=timeout, headers=headers) as client: + body = await fetch_url(client, url) + items = parse_rss_items(monitor, body) if mtype == "rss" else parse_web_items(monitor, body) + for item in items: + blocked, block_reason = item_blocked(item, monitor) + if blocked: + logger.debug("monitor %s skipped item %s: %s", name, item.title, block_reason) + continue + hits = keyword_hits(f"{item.title} {item.text}", keywords) + # If keywords are configured and keyword_match is enabled, do not push unrelated new posts. + notify_on = monitor.get("notify_on") or {} + if keywords and notify_on.get("keyword_match", True) and not hits and not (notify_on.get("price_change") or notify_on.get("stock_change")): + should_notify_and_update(monitor, item, []) # still remember state to avoid later old flood + continue + reasons = should_notify_and_update(monitor, item, hits) + if not reasons: + continue + # 论坛/RSS 以帖子本身作为事件键;不要把“命中原因/检查时间/编辑变化”放进去,避免同一帖重复发。 + is_forum = monitor.get("forum") or mtype == "rss" + event_key = stable_key(name, item.key) if is_forum else stable_key(name, item.key, "|".join(reasons), item.price or "", item.stock or "") + if not event_not_sent(event_key, name, item.title, item.link): + continue + if is_forum: + text = ( + f"[新帖命中] {html_escape(name)}\n" + f"标题:{html_escape(item.title)}\n" + f"作者:{html_escape(item.author or '-')}\n" + f"分类:{html_escape(item.category or '-')}\n" + f"链接:{html_escape(item.link)}\n" + f"命中:{html_escape('; '.join(reasons))}\n" + f"发布时间:{html_escape(item.published or '-')}\n" + f"检查时间:{html_escape(now_iso())}" + ) + else: + text = ( + f"[库存/关键词命中] {html_escape(name)}\n" + f"标题:{html_escape(item.title)}\n" + f"链接:{html_escape(item.link)}\n" + f"命中:{html_escape('; '.join(reasons))}\n" + f"价格:{html_escape(item.price or '-')}\n" + f"库存:{html_escape(item.stock or '-')}\n" + f"时间:{html_escape(now_iso())}" + ) + await admin_send(text) + sent_count += 1 + except Exception: + logger.exception("monitor failed: %s %s", name, url) + return sent_count + + + +def cleanup_monitor_data(retention_minutes: int) -> tuple[int, int]: + """Delete only website/RSS monitor state older than retention. + + Keeps two-way conversation tables intact: users, message_map, inbox_messages. + """ + cutoff_ts = time.time() - max(1, int(retention_minutes)) * 60 + cutoff = datetime.fromtimestamp(cutoff_ts, timezone.utc).astimezone().isoformat(timespec="seconds") + with closing(db()) as conn: + cur1 = conn.execute("DELETE FROM monitor_state WHERE updated_at < ?", (cutoff,)) + cur2 = conn.execute("DELETE FROM sent_events WHERE created_at < ?", (cutoff,)) + conn.commit() + return int(cur1.rowcount or 0), int(cur2.rowcount or 0) + + +async def cleanup_monitor_loop() -> None: + while True: + settings = monitor_cleanup_settings() + await asyncio.sleep(int(settings["interval_minutes"]) * 60) + if not settings["enabled"]: + continue + try: + state_n, sent_n = cleanup_monitor_data(int(settings["retention_minutes"])) + logger.info( + "monitor cleanup done retention=%smin deleted monitor_state=%s sent_events=%s", + settings["retention_minutes"], state_n, sent_n, + ) + except Exception: + logger.exception("monitor cleanup failed") + + + +async def flush_pending_inbox() -> None: + if not bot or admin_chat_id is None: + return + rows = pending_inbox(50) + if not rows: + return + logger.info("flushing pending inbox messages: %d", len(rows)) + for row in rows: + try: + text = ( + f"[补发用户消息 #{row['id']}]\n" + f"user_id: {row['user_id']}\n" + f"name: {html_escape(row['full_name'])}\n" + f"username: @{html_escape(row['username'] or '')}\n" + f"原消息ID: {row['user_message_id']}\n" + f"类型: {html_escape(row['message_type'])}\n" + f"时间: {html_escape(row['created_at'])}\n\n" + f"内容:{html_escape(row['text'] or '(非文本/媒体消息,原始媒体无法补发,仅保留记录)')}" + ) + sent = await bot.send_message(admin_chat_id, text) + save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None) + mark_inbox_forwarded(int(row['id']), sent.message_id, None) + except Exception as e: + mark_inbox_error(int(row['id']), repr(e)) + logger.exception("failed to flush inbox message id=%s", row['id']) + + +async def flush_pending_loop() -> None: + while True: + try: + await flush_pending_inbox() + except Exception: + logger.exception("flush_pending_loop failed") + await asyncio.sleep(60) + + +async def run_all_monitors_once() -> None: + monitors = config.get("monitors") or [] + logger.info("manual/all monitor run start, count=%d", len(monitors)) + total = 0 + for m in monitors: + total += await run_monitor(m) + logger.info("manual/all monitor run done, notifications=%d", total) + + +def schedule_monitors(scheduler: AsyncIOScheduler) -> None: + for idx, m in enumerate(config.get("monitors") or []): + name = m.get("name", "unnamed") + requested = int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) + interval = max(requested, MIN_INTERVAL_SECONDS) + if requested < MIN_INTERVAL_SECONDS: + logger.warning("monitor %s interval %s raised to %s", name, requested, interval) + # Use index+hash, not just name: duplicate names should not crash saving/reloading. + job_key = stable_key(str(idx), name, m.get("url", ""))[:16] + scheduler.add_job(run_monitor, "interval", seconds=interval, args=[m], id=f"monitor:{idx}:{job_key}", max_instances=1, coalesce=True, replace_existing=True, next_run_time=datetime.now(timezone.utc)) + logger.info("scheduled monitor %s every %ss", name, interval) + + + +# ----------------------------- +# Web admin panel +# ----------------------------- + +def panel_enabled() -> bool: + return os.getenv("WEB_PANEL_ENABLED", "true").lower() not in {"0", "false", "no", "off"} + + +def session_secret() -> str: + secret = os.getenv("WEB_PANEL_SESSION_SECRET", "").strip() + if not secret: + secret = secrets.token_urlsafe(32) + vals = env_values() + vals["WEB_PANEL_SESSION_SECRET"] = secret + write_env_values(vals) + return secret + + +def session_token(username: str) -> str: + raw = f"{username}|{session_secret()}" + return hashlib.sha256(raw.encode()).hexdigest() + + +def is_logged_in(request: Request) -> bool: + username = os.getenv("WEB_PANEL_USER", "admin") + token = request.cookies.get("tg_watchbot_session", "") + return bool(token) and secrets.compare_digest(token, session_token(username)) + + +def panel_auth(request: Request) -> str: + # Actual redirect is handled by middleware; dependencies cannot reliably return redirects. + return os.getenv("WEB_PANEL_USER", "admin") + + +def login_page(error: str = "") -> str: + err = f"" if error else "" + return f""" +登录 · tg-watchbot +

tg-watchbot

登录后管理 Telegram 机器人、关键词监控、库存/价格提醒。

{err}
your-domain.example · Cloudflare Tunnel
""" + + +def env_values() -> dict[str, str]: + load_dotenv(ENV_PATH, override=True) + return { + "TELEGRAM_BOT_TOKEN": os.getenv("TELEGRAM_BOT_TOKEN", ""), + "ADMIN_CHAT_ID": os.getenv("ADMIN_CHAT_ID", ""), + "LOG_LEVEL": os.getenv("LOG_LEVEL", "INFO"), + "WEB_PANEL_ENABLED": os.getenv("WEB_PANEL_ENABLED", "true"), + "WEB_PANEL_HOST": os.getenv("WEB_PANEL_HOST", "127.0.0.1"), + "WEB_PANEL_PORT": os.getenv("WEB_PANEL_PORT", "8765"), + "WEB_PANEL_USER": os.getenv("WEB_PANEL_USER", "admin"), + "WEB_PANEL_PASSWORD": os.getenv("WEB_PANEL_PASSWORD", "admin"), + "WEB_PANEL_SESSION_SECRET": os.getenv("WEB_PANEL_SESSION_SECRET", ""), + } + + +def write_env_values(values: dict[str, str]) -> None: + lines = [ + "# tg-watchbot environment", + f"TELEGRAM_BOT_TOKEN={values.get('TELEGRAM_BOT_TOKEN','')}", + f"ADMIN_CHAT_ID={values.get('ADMIN_CHAT_ID','')}", + f"LOG_LEVEL={values.get('LOG_LEVEL','INFO')}", + "", + "# Web 管理面板;默认只监听本机,建议用 SSH 隧道或反代再暴露", + f"WEB_PANEL_ENABLED={values.get('WEB_PANEL_ENABLED','true')}", + f"WEB_PANEL_HOST={values.get('WEB_PANEL_HOST','127.0.0.1')}", + f"WEB_PANEL_PORT={values.get('WEB_PANEL_PORT','8765')}", + f"WEB_PANEL_USER={values.get('WEB_PANEL_USER','admin')}", + f"WEB_PANEL_PASSWORD={values.get('WEB_PANEL_PASSWORD','admin')}", + f"WEB_PANEL_SESSION_SECRET={values.get('WEB_PANEL_SESSION_SECRET','')}", + "", + ] + ENV_PATH.write_text("\n".join(lines), encoding="utf-8") + ENV_PATH.chmod(0o600) + load_dotenv(ENV_PATH, override=True) + + +def cfg_load_fresh() -> dict[str, Any]: + return load_config() + + +def cfg_save(new_cfg: dict[str, Any]) -> None: + if not isinstance(new_cfg, dict): + raise ValueError("配置根节点必须是对象") + monitors = new_cfg.setdefault("monitors", []) + if not isinstance(monitors, list): + raise ValueError("monitors 必须是列表") + for m in monitors: + if not isinstance(m, dict): + raise ValueError("每个 monitor 必须是对象") + if int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) < MIN_INTERVAL_SECONDS: + m["interval_seconds"] = MIN_INTERVAL_SECONDS + CONFIG_PATH.write_text(yaml.safe_dump(new_cfg, allow_unicode=True, sort_keys=False), encoding="utf-8") + global config + config = new_cfg + reload_scheduler_jobs() + + +def reload_scheduler_jobs() -> None: + if scheduler_ref: + for job in list(scheduler_ref.get_jobs()): + if job.id.startswith("monitor:"): + scheduler_ref.remove_job(job.id) + schedule_monitors(scheduler_ref) + + +def parse_lines(text: str) -> list[str]: + return [x.strip() for x in (text or "").splitlines() if x.strip()] + + +def monitor_from_form( + original_index: int | None, + name: str, + mtype: str, + url: str, + interval_seconds: int, + keywords: str, + item_selector: str, + title_selector: str, + link_selector: str, + price_selector: str, + stock_selector: str, + keyword_match: bool, + new_item: bool, + price_change: bool, + stock_change: bool, +) -> dict[str, Any]: + m: dict[str, Any] = { + "name": name.strip(), + "type": mtype, + "url": url.strip(), + "interval_seconds": max(int(interval_seconds or MIN_INTERVAL_SECONDS), MIN_INTERVAL_SECONDS), + "keywords": parse_lines(keywords), + "notify_on": { + "keyword_match": keyword_match, + "new_item": new_item, + "price_change": price_change, + "stock_change": stock_change, + }, + } + if mtype == "rss": + m["forum"] = True + if mtype == "web": + selectors = { + "item": item_selector.strip() or "article, .thread, .post, li", + "title": title_selector.strip() or "h1, h2, h3, a", + "link": link_selector.strip() or "a", + } + if price_selector.strip(): + selectors["price"] = price_selector.strip() + if stock_selector.strip(): + selectors["stock"] = stock_selector.strip() + m["selectors"] = selectors + if not m["name"] or not m["url"]: + raise ValueError("名称和 URL 必填") + return m + + +def layout(title: str, body: str) -> str: + return f""" +{html_escape(title)} · tg-watchbot +""" + + +def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str: + m = m or {"type": "web", "interval_seconds": 60, "notify_on": {"keyword_match": True, "new_item": True, "price_change": True, "stock_change": True}, "selectors": {}} + selectors = m.get("selectors") or {} + no = m.get("notify_on") or {} + keywords = "\n".join(m.get("keywords") or []) + action = "/monitor/save" if idx is not None else "/monitor/create" + hidden = f"" if idx is not None else "" + def checked(k: str) -> str: + return "checked" if no.get(k, False) else "" + return f"""
{hidden} +
+
+
+
+ +

Web 选择器(RSS 可忽略)

+
+
+
+
+
+

提醒条件

+ + + + +

取消

""" + + +def create_panel_app() -> FastAPI: + app = FastAPI(title="tg-watchbot Panel") + + @app.middleware("http") + async def require_login_middleware(request: Request, call_next): + public_paths = {"/login", "/health", "/favicon.ico"} + if request.url.path in public_paths or is_logged_in(request): + return await call_next(request) + return RedirectResponse("/login", status_code=303) + + @app.get("/login", response_class=HTMLResponse) + async def login_get(request: Request): + if is_logged_in(request): + return RedirectResponse("/", status_code=303) + return HTMLResponse(login_page()) + + @app.post("/login") + async def login_post(username: str = Form(""), password: str = Form("")): + expected_user = os.getenv("WEB_PANEL_USER", "admin") + expected_pass = os.getenv("WEB_PANEL_PASSWORD", "admin") + if secrets.compare_digest(username, expected_user) and secrets.compare_digest(password, expected_pass): + resp = RedirectResponse("/", status_code=303) + resp.set_cookie("tg_watchbot_session", session_token(expected_user), httponly=True, secure=True, samesite="lax", max_age=60 * 60 * 24 * 14) + return resp + return HTMLResponse(login_page("用户名或密码错误"), status_code=401) + + @app.get("/logout") + async def logout() -> RedirectResponse: + resp = RedirectResponse("/login", status_code=303) + resp.delete_cookie("tg_watchbot_session") + return resp + + @app.get("/", response_class=HTMLResponse) + async def index(_: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh() + rows = [] + for i, m in enumerate(cfg.get("monitors") or []): + rows.append(f"""{html_escape(m.get('type','web'))}{html_escape(m.get('name',''))}
{html_escape(m.get('url',''))}{html_escape(m.get('interval_seconds',60))}s{html_escape(', '.join(m.get('keywords') or []))}编辑 预览 检查 删除""") + body = f"""

监控目标

当前 {len(cfg.get('monitors') or [])} 个;不限制数量,可继续新增。保存后自动重载定时任务。

""" + "".join(rows) + "
类型目标间隔关键词操作
" + return layout("监控", body) + + @app.get("/monitor/new", response_class=HTMLResponse) + async def new_monitor(_: str = Depends(panel_auth)) -> str: + return layout("新增监控", "

这里是新增单个监控。要一次加多个网站,用左侧/首页的「批量新增」。

" + monitor_form_html()) + + @app.get("/monitor/templates", response_class=HTMLResponse) + async def monitor_templates(_: str = Depends(panel_auth)) -> str: + body = """

论坛监控模板

NodeSeek / Linux.do 建议用 RSS,不抓网页 HTML,抗 Cloudflare 更稳。

""" + return layout("监控模板", body) + + @app.get("/monitor/template/{kind}", response_class=HTMLResponse) + async def monitor_template(kind: str, _: str = Depends(panel_auth)) -> str: + templates = { + "nodeseek": {"name": "NodeSeek 新帖", "type": "rss", "url": "https://rss.nodeseek.com/", "interval_seconds": 60, "keywords": ["NAT", "优惠", "补货", "VPS", "免费"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + "linuxdo": {"name": "Linux.do 最新", "type": "rss", "url": "https://linux.do/latest.rss", "interval_seconds": 60, "keywords": ["Claude", "Codex", "API", "VPS", "NAT"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + "linuxdo-resource": {"name": "Linux.do 资源荟萃", "type": "rss", "url": "https://linux.do/c/resource/14.rss", "interval_seconds": 60, "keywords": ["免费", "开源", "API", "Claude"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}}, + } + m = templates.get(kind) + if not m: + raise HTTPException(404, "template not found") + return layout("使用模板新增", "

这是预设模板,保存即可加入监控;也可以先调整关键词。

" + monitor_form_html(m)) + + @app.get("/monitor/bulk", response_class=HTMLResponse) + async def bulk_monitor(_: str = Depends(panel_auth)) -> str: + sample = """NodeSeek|https://www.nodeseek.com/|免费鸡,优惠码,NAT +Linux.do|https://linux.do|公益,codex,claude +HostLoc|https://hostloc.com|VPS,补货,优惠""" + body = f"""

批量新增监控

一行一个网站,格式:名称|URL|关键词1,关键词2,关键词3。保存后会追加到现有列表,不会覆盖原有监控。

默认提醒条件

取消

""" + return layout("批量新增", body) + + @app.post("/monitor/bulk") + async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)): + cfg = cfg_load_fresh() + monitors = cfg.setdefault("monitors", []) + added = 0 + errors = [] + for line_no, raw in enumerate(items.splitlines(), 1): + line = raw.strip() + if not line or line.startswith('#'): + continue + parts = [x.strip() for x in line.split('|')] + if len(parts) < 2: + errors.append(f"第 {line_no} 行格式错误:{html_escape(line)}") + continue + name, url = parts[0], parts[1] + keywords = parts[2] if len(parts) >= 3 else "" + try: + monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change))) + added += 1 + except Exception as e: + errors.append(f"第 {line_no} 行失败:{html_escape(e)}") + try: + cfg_save(cfg) + except Exception as e: + logger.exception("bulk save failed") + return HTMLResponse(layout("批量新增失败", f"
{html_escape(e)}
"), status_code=500) + if errors: + return HTMLResponse(layout("批量新增完成", f"
已新增 {added} 个,部分行有问题:
{'
'.join(errors)}

返回

")) + return RedirectResponse("/", status_code=303) + + @app.get("/monitor/{idx}/edit", response_class=HTMLResponse) + async def edit_monitor(idx: int, _: str = Depends(panel_auth)) -> str: + monitors = cfg_load_fresh().get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + return layout("编辑监控", "

编辑监控

" + monitor_form_html(monitors[idx], idx)) + + async def save_form_common( + original_index: int | None, + name: str, + mtype: str, + url: str, + interval_seconds: int, + keywords: str, + item_selector: str, + title_selector: str, + link_selector: str, + price_selector: str, + stock_selector: str, + keyword_match: str | None, + new_item: str | None, + price_change: str | None, + stock_change: str | None, + ) -> RedirectResponse: + cfg = cfg_load_fresh() + monitors = cfg.setdefault("monitors", []) + m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change)) + if original_index is None: + monitors.append(m) + else: + monitors[original_index] = m + try: + cfg_save(cfg) + except Exception as e: + logger.exception("save monitor failed") + return HTMLResponse(layout("保存失败", f"

保存失败

{html_escape(e)}

返回

"), status_code=500) + return RedirectResponse("/", status_code=303) + + @app.post("/monitor/create") + async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + + @app.post("/monitor/save") + async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse: + return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change) + + @app.get("/monitor/{idx}/delete") + async def delete_monitor(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if 0 <= idx < len(monitors): + monitors.pop(idx); cfg_save(cfg) + return RedirectResponse("/", status_code=303) + + + @app.get("/monitor/{idx}/preview", response_class=HTMLResponse) + async def monitor_preview(idx: int, _: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + m = monitors[idx] + timeout = int((cfg.get("http") or {}).get("timeout_seconds", 20)) + ua = (cfg.get("http") or {}).get("user_agent") or DEFAULT_UA + headers = {"User-Agent": ua} + try: + async with httpx.AsyncClient(timeout=timeout, headers=headers) as client: + body = await fetch_url(client, m.get("url")) + items = parse_rss_items(m, body) if m.get("type") == "rss" else parse_web_items(m, body) + rows=[] + for it in items[:15]: + blocked, br = item_blocked(it, m) + hits = keyword_hits(f"{it.title} {it.text}", m.get("keywords") or []) + rows.append(f"""{html_escape(it.title)}
{html_escape(it.link)}{html_escape(it.author or '-')}{html_escape(it.category or '-')}{html_escape(', '.join(hits) or '-')}{'跳过: '+html_escape(br) if blocked else '可推送/记录'}""") + body_html = "

抓取预览

只预览最近 15 条,不写入去重状态、不推送。

" + "".join(rows) + "
标题/链接作者分类命中状态
" + return layout("抓取预览", body_html) + except Exception as e: + return HTMLResponse(layout("抓取预览失败", f"
{html_escape(e)}
"), status_code=500) + + @app.get("/monitor/{idx}/run", response_class=HTMLResponse) + async def run_monitor_now(idx: int, _: str = Depends(panel_auth)) -> str: + cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or [] + if idx < 0 or idx >= len(monitors): + raise HTTPException(404, "monitor not found") + count = await run_monitor(monitors[idx]) + return layout("检查完成", f"
已手动检查:{html_escape(monitors[idx].get('name'))},推送 {count} 条。

返回

") + + @app.get("/run-once", response_class=HTMLResponse) + async def run_once_page(_: str = Depends(panel_auth)) -> str: + await run_all_monitors_once() + return layout("手动检查完成", "
已执行全部监控检查。具体结果看日志/Telegram 推送。

返回

") + + @app.get("/yaml", response_class=HTMLResponse) + async def yaml_edit(_: str = Depends(panel_auth)) -> str: + text = CONFIG_PATH.read_text(encoding="utf-8") + return layout("YAML 高级编辑", f"

YAML 高级编辑

") + + @app.post("/yaml", response_class=HTMLResponse) + async def yaml_save(_: str = Depends(panel_auth), content: str = Form(...)) -> str: + try: + data = yaml.safe_load(content) or {} + cfg_save(data) + return layout("已保存", "
YAML 已保存并重载。

返回

") + except Exception as e: + return layout("保存失败", f"

保存失败

{html_escape(e)}

返回

") + + @app.get("/settings", response_class=HTMLResponse) + async def settings(_: str = Depends(panel_auth)) -> str: + v = env_values() + cleanup = (cfg_load_fresh().get("cleanup") or {}) + body = f"""

Bot / 面板设置

+ + +
+

监控数据自动清理

只清理 RSS/网站监控状态和去重记录,不删除用户、收件箱、双向对话消息。

+

改 Token、chat_id 或面板监听端口后建议重启服务:sudo systemctl restart tg-watchbot
""" + return layout("设置", body) + + @app.post("/settings", response_class=HTMLResponse) + async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str: + write_env_values(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}) + cfg = cfg_load_fresh() + cfg["cleanup"] = { + "enabled": True, + "interval_minutes": max(1, int(CLEANUP_INTERVAL_MINUTES)), + "monitor_retention_minutes": max(1, int(CLEANUP_RETENTION_MINUTES)), + } + cfg_save(cfg) + return layout("已保存", "
.env 和监控清理设置已保存。Token/chat_id 需重启服务后生效。

返回

") + + + @app.get("/send", response_class=HTMLResponse) + async def send_page(_: str = Depends(panel_auth)) -> str: + with closing(db()) as conn: + users = conn.execute( + "SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 200" + ).fetchall() + options = [] + for u in users: + blocked = "(已封禁)" if u["blocked"] else "" + username = f"@{u['username']}" if u["username"] else "" + label = f"{u['full_name'] or u['user_id']} {username} · {u['user_id']} {blocked}" + options.append(f"") + body = f"""

主动发消息

只能发送给已经私聊过 Bot 的用户;这是 Telegram Bot API 限制。

+ + + +

查看收件箱

""" + return layout("主动发消息", body) + + @app.post("/send", response_class=HTMLResponse) + async def send_save(_: str = Depends(panel_auth), user_id: str = Form(""), manual_user_id: str = Form(""), text: str = Form("")) -> str: + raw_uid = (manual_user_id or user_id or "").strip() + if not raw_uid: + return layout("发送失败", "
缺少 user_id

返回

") + if not text.strip(): + return layout("发送失败", "
消息内容不能为空

返回

") + try: + uid = int(raw_uid) + if not get_user(uid): + return layout("发送失败", f"
找不到用户 {uid},对方需要先私聊 Bot。

返回

") + if is_blocked(uid): + return layout("发送失败", f"
用户 {uid} 已被封禁,请先 /unblock。

返回

") + if not bot: + return layout("发送失败", "
Bot 尚未初始化;请确认服务以正常模式运行,而不是 --panel-only。

返回

") + sent = await bot.send_message(uid, text.strip()) + logger.info("panel sent message to user_id=%s message_id=%s", uid, sent.message_id) + await admin_send(f"[主动发送成功]\nuser_id: {uid}\nmessage_id: {sent.message_id}\n时间:{html_escape(now_iso())}") + return layout("发送成功", f"
已发送给用户 {uid},message_id={sent.message_id}。Bot 也已给管理员发送确认提醒。

继续发送 收件箱

") + except TelegramAPIError as e: + logger.exception("panel send failed") + return layout("发送失败", f"
{html_escape(e)}

返回

") + except Exception as e: + logger.exception("panel send failed") + return layout("发送失败", f"
{html_escape(e)}

返回

") + + @app.get("/inbox", response_class=HTMLResponse) + async def inbox_page(_: str = Depends(panel_auth)) -> str: + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM inbox_messages ORDER BY id DESC LIMIT 200").fetchall() + trs = [] + for r in rows: + status_txt = "已转发" if r["forwarded"] else "未转发" + status_cls = "ok" if r["forwarded"] else "danger" + content = html_escape(r["text"] or "(非文本/媒体消息)") + trs.append(f"""#{r['id']}
{status_txt}{html_escape(r['full_name'])}
{r['user_id']} @{html_escape(r['username'] or '')}{html_escape(r['message_type'])}
{html_escape(r['created_at'])}{content}
{html_escape(r['error'] or '')}重试转发""") + body = "

收件箱

用户消息会先写入 SQLite,再转发给管理员;转发失败的消息可以在这里重试。

" + "".join(trs) + "
ID/状态用户类型/时间内容/错误操作
" + return layout("收件箱", body) + + @app.get("/inbox/{msg_id}/retry") + async def retry_inbox(msg_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + with closing(db()) as conn: + conn.execute("UPDATE inbox_messages SET forwarded=0, error=NULL WHERE id=?", (msg_id,)) + conn.commit() + await flush_pending_inbox() + return RedirectResponse("/inbox", status_code=303) + + @app.get("/restart", response_class=HTMLResponse) + async def restart_page(_: str = Depends(panel_auth)) -> str: + body = """

重启机器人

用于修改 Token、管理员 ID、面板设置等需要重启生效的配置。

""" + return layout("重启机器人", body) + + @app.post("/restart") + async def restart_post(_: str = Depends(panel_auth)) -> HTMLResponse: + async def delayed_restart(): + await asyncio.sleep(1.0) + # Exit with failure so systemd Restart=on-failure brings the service back up. + os._exit(1) + asyncio.create_task(delayed_restart()) + return HTMLResponse(layout("正在重启", "
已发送重启命令,约 5-10 秒后刷新页面。

返回首页

")) + + @app.get("/logs", response_class=HTMLResponse) + async def logs(_: str = Depends(panel_auth)) -> str: + text = LOG_PATH.read_text(encoding="utf-8", errors="replace")[-20000:] if LOG_PATH.exists() else "暂无日志" + return layout("日志", f"

最近应用日志

{html_escape(text)}
") + + @app.get("/health", response_class=PlainTextResponse) + async def health() -> str: + return "ok" + + return app + + +async def start_panel_server() -> uvicorn.Server | None: + if not panel_enabled(): + logger.info("web panel disabled") + return None + host = os.getenv("WEB_PANEL_HOST", "127.0.0.1") + port = int(os.getenv("WEB_PANEL_PORT", "8765")) + server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info")) + asyncio.create_task(server.serve()) + logger.info("web panel listening on http://%s:%s", host, port) + return server + +def validate_env() -> tuple[str, int]: + load_dotenv(ENV_PATH) + token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip() + admin = os.getenv("ADMIN_CHAT_ID", "").strip() + if not token: + raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}") + if not admin: + raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}") + return token, int(admin) + + +async def main_async(run_once: bool = False, panel_only: bool = False) -> None: + global bot, admin_chat_id, config, scheduler_ref + load_dotenv(ENV_PATH, override=True) + config = load_config() + setup_logging(os.getenv("LOG_LEVEL", "INFO")) + init_db() + if panel_only: + await start_panel_server() + logger.info("panel-only mode start") + while True: + await asyncio.sleep(3600) + if run_once: + # If .env is filled, send notifications during manual test; otherwise just log. + try: + token, admin_chat_id = validate_env() + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + except Exception as e: + logger.warning("run-once without Telegram notification: %s", e) + await run_all_monitors_once() + if bot: + await bot.session.close() + return + token, admin_chat_id = validate_env() + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + dp = Dispatcher() + dp.include_router(router) + scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") + scheduler_ref = scheduler + schedule_monitors(scheduler) + scheduler.start() + await start_panel_server() + asyncio.create_task(flush_pending_loop()) + asyncio.create_task(cleanup_monitor_loop()) + await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") + logger.info("bot polling start") + await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent") + parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured") + args = parser.parse_args() + try: + asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only)) + except KeyboardInterrupt: + pass + except Exception: + logger.exception("fatal error") + raise + + +if __name__ == "__main__": + main() diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..7e75443 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,47 @@ +http: + timeout_seconds: 20 + user_agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) + Chrome/124.0 Safari/537.36 tg-watchbot/1.0 +bot: + rate_limit: + window_seconds: 10 + max_messages: 3 +monitors: +- name: NodeSeek 新帖 + type: rss + url: https://rss.nodeseek.com/ + interval_seconds: 60 + keywords: + - VPS + - 优惠 + - 免费 + exclude_keywords: [] + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +- name: Linux.do 最新 + type: rss + url: https://linux.do/latest.rss + interval_seconds: 60 + keywords: + - API + - 教程 + - 开源 + exclude_keywords: [] + authors: [] + categories: [] + notify_on: + keyword_match: true + new_item: true + price_change: false + stock_change: false + forum: true +cleanup: + enabled: true + interval_minutes: 60 + monitor_retention_minutes: 1440 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..17855f9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +aiogram==3.22.0 +APScheduler==3.11.1 +beautifulsoup4==4.14.3 +fastapi==0.136.1 +feedparser==6.0.12 +httpx==0.28.1 +python-dotenv==1.2.1 +python-multipart==0.0.20 +PyYAML==6.0.3 +uvicorn[standard]==0.38.0 diff --git a/systemd/tg-watchbot.service b/systemd/tg-watchbot.service new file mode 100644 index 0000000..2cb70fe --- /dev/null +++ b/systemd/tg-watchbot.service @@ -0,0 +1,22 @@ +[Unit] +Description=tg-watchbot +After=network-online.target +Wants=network-online.target + +[Service] +Type=simple +WorkingDirectory=/opt/tg-watchbot +EnvironmentFile=/opt/tg-watchbot/.env +ExecStart=/opt/tg-watchbot/.venv/bin/python /opt/tg-watchbot/app.py +Restart=on-failure +RestartSec=10 +User=tg-watchbot +Group=tg-watchbot +UMask=0077 +NoNewPrivileges=true +PrivateTmp=true +ProtectSystem=full +ReadWritePaths=/opt/tg-watchbot + +[Install] +WantedBy=multi-user.target