Expand web panel conversation management

This commit is contained in:
InfernoXuaI
2026-05-21 14:34:05 +08:00
parent 44356076e9
commit 9e3a56abb7
4 changed files with 605 additions and 76 deletions
+50 -3
View File
@@ -11,7 +11,15 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
## 更新日志
### 2026-05-21
### 2026-05-21 第二次更新
- Web 面板新增收件箱直接回复、用户管理、快捷回复、私聊广告拦截、监控推送历史、配置导入/导出。
- 收件箱改为完整双向对话记录:用户消息、Web 回复、TG 管理员回复都会显示。
- 用户管理页新增 Bot / 面板配置卡片,和设置页共用同一份配置;修改 Token、管理员 ID、端口、账号或密码后需要重启。
- `ADMIN_CHAT_ID` 支持最多 3 个管理员,用逗号分隔。
- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。
### 2026-05-21 第一次更新
- 默认启动改为先启动 Web 面板:未填写 `TELEGRAM_BOT_TOKEN` / `ADMIN_CHAT_ID` 时,面板仍可打开,同时 Telegram 收发、监控推送不可用。
- 面板配置页可填写 Bot Token、管理员 ID、面板账号和清理策略;保存后需要重启服务让 Bot 配置生效。
@@ -34,8 +42,13 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
- `/unblock <user_id>`:解封用户;
- `/note <user_id> <备注>`:给用户加备注;
- `/who <user_id>`:查看用户信息;
- `/spamwords`:查看广告关键词;
- `/spamadd <关键词>`:添加广告关键词;
- `/spamdel <关键词>`:删除广告关键词;
- `/cancel`:取消待发送图片。
- 普通用户有简单限流,防止刷屏。
- 支持最多 3 个管理员 chat id,用逗号分隔配置。
- 支持私聊广告关键词自动拦截和自动拉黑,不影响 RSS/Web 监控。
![示例图片](https://pic.gongyichuren.de/file/1779287173835_8521cab29a9635743a603582ceb7ba02.png)
@@ -52,6 +65,7 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
- 支持论坛 RSS 增强字段:作者、分类、tags、摘要。
- 支持去重,避免同一条反复推送。
- 支持屏蔽词、作者、分类过滤(YAML 高级配置)。
- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。
- 默认最低监控间隔为 60 秒。
![示例图片](https://pic.gongyichuren.de/file/1779287170665_17b7c8b4040d6334ea62a108d08db644.png)
@@ -64,7 +78,11 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
- 批量新增监控。
- YAML 高级编辑。
- Bot Token / 管理员 ID / 面板账号配置页。
- 收件箱页面,可查看用户消息和重试转发
- 收件箱页面,可查看完整双向对话记录、重试转发、直接回复
- 用户管理页,可备注、封禁、解封、主动发消息,并可编辑 Bot / 面板配置。
- 私聊广告拦截规则和快捷回复模板可在 Web 面板编辑。
- 监控推送历史页,可查看 Telegram 推送和仅 Web 记录。
- `config.yaml` 导入/导出页面,方便迁移。
- 主动发消息页面 `/send`,发送成功后会在页面显示结果,并给管理员聊天发送确认提醒。
- 自动清理监控/RSS/网站状态数据;支持定时删除 Telegram 监控通知消息;不会删除用户、收件箱、双向对话消息。
- 日志页面和健康检查 `/health`
@@ -213,7 +231,7 @@ curl http://127.0.0.1:8765/health
| 变量 | 说明 |
|---|---|
| `TELEGRAM_BOT_TOKEN` | BotFather 创建的 Telegram Bot Token |
| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id,用于接收用户消息和监控通知 |
| `ADMIN_CHAT_ID` | 管理员 Telegram 数字 chat id;最多 3 个,用逗号分隔 |
| `LOG_LEVEL` | 日志级别,默认 `INFO` |
| `WEB_PANEL_ENABLED` | 是否启用 Web 面板,默认 `true` |
| `WEB_PANEL_HOST` | 面板监听地址,默认 `127.0.0.1` |
@@ -224,6 +242,25 @@ curl http://127.0.0.1:8765/health
### `config.yaml`
Bot 扩展配置示例:
```yaml
bot:
rate_limit:
window_seconds: 10
max_messages: 3
spam_filter:
enabled: true
auto_block: true
keywords:
- 投资
- 博彩
- 空投
quick_replies:
- title: 已收到
text: 你好,消息已收到,我稍后处理。
```
监控数据自动清理示例:
```yaml
@@ -267,6 +304,7 @@ monitors:
new_item: true
price_change: false
stock_change: false
notify_telegram: true
forum: true
```
@@ -291,6 +329,7 @@ monitors:
new_item: true
price_change: true
stock_change: true
notify_telegram: true
```
## 管理命令
@@ -304,6 +343,9 @@ monitors:
/unblock <user_id>
/note <user_id> <备注>
/who <user_id>
/spamwords
/spamadd <关键词>
/spamdel <关键词>
/cancel
```
@@ -324,6 +366,11 @@ monitors:
| `/settings` | `.env` 设置和监控清理策略 |
| `/send` | 主动发消息给已私聊过 Bot 的用户 |
| `/inbox` | 收件箱 |
| `/users` | 用户管理 |
| `/rules` | 私聊广告拦截规则 |
| `/replies` | 快捷回复模板 |
| `/monitor/events` | 监控推送历史 |
| `/config/export` | 导出 / 导入 `config.yaml` |
| `/logs` | 日志 |
| `/health` | 健康检查 |
+423 -73
View File
@@ -60,6 +60,7 @@ logger = logging.getLogger("tg-watchbot")
router = Router()
bot: Bot | None = None
admin_chat_id: int | None = None
admin_chat_ids: list[int] = []
config: dict[str, Any] = {}
rate_buckets: dict[int, list[float]] = {}
pending_sendpic: dict[int, dict[str, Any]] = {}
@@ -161,6 +162,8 @@ def init_db() -> None:
username TEXT,
full_name TEXT,
user_message_id INTEGER,
direction TEXT DEFAULT 'in',
source TEXT DEFAULT 'user',
message_type TEXT,
text TEXT,
forwarded INTEGER DEFAULT 0,
@@ -170,8 +173,25 @@ def init_db() -> None:
forwarded_at TEXT,
error TEXT
);
CREATE TABLE IF NOT EXISTS monitor_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitor_name TEXT NOT NULL,
title TEXT,
link TEXT,
reasons TEXT,
pushed INTEGER DEFAULT 0,
created_at TEXT NOT NULL
);
"""
)
for sql in [
"ALTER TABLE inbox_messages ADD COLUMN direction TEXT DEFAULT 'in'",
"ALTER TABLE inbox_messages ADD COLUMN source TEXT DEFAULT 'user'",
]:
try:
conn.execute(sql)
except sqlite3.OperationalError:
pass
conn.commit()
@@ -232,6 +252,21 @@ def set_block(user_id: int, blocked: bool) -> None:
conn.commit()
def all_admin_chat_ids() -> list[int]:
if admin_chat_ids:
return list(dict.fromkeys(admin_chat_ids[:3]))
return [admin_chat_id] if admin_chat_id is not None else []
def parse_admin_chat_ids(raw: str) -> list[int]:
ids: list[int] = []
for part in re.split(r"[\s,;]+", raw.strip()):
if not part:
continue
ids.append(int(part))
return list(dict.fromkeys(ids))[:3]
def set_note(user_id: int, note: str) -> None:
with closing(db()) as conn:
conn.execute("UPDATE users SET note=?, updated_at=? WHERE user_id=?", (note, now_iso(), user_id))
@@ -275,6 +310,22 @@ def create_inbox_message(message: Message, user_id: int, full_name: str, usernam
return int(cur.lastrowid)
def create_outbox_message(user_id: int, text: str, source: str, user_message_id: int | None = None) -> int:
row = get_user(user_id)
username = row["username"] if row else None
full_name = row["full_name"] if row else str(user_id)
with closing(db()) as conn:
cur = conn.execute(
"""
INSERT INTO inbox_messages(user_id, username, full_name, user_message_id, direction, source, message_type, text, forwarded, created_at, forwarded_at)
VALUES(?,?,?,?,?,?,?,?,?,?,?)
""",
(user_id, username, full_name, user_message_id, "out", source, "text", text, 1, now_iso(), now_iso()),
)
conn.commit()
return int(cur.lastrowid)
def mark_inbox_forwarded(inbox_id: int, header_id: int | None = None, copy_id: int | None = None) -> None:
with closing(db()) as conn:
conn.execute(
@@ -295,6 +346,60 @@ def pending_inbox(limit: int = 50) -> list[sqlite3.Row]:
return list(conn.execute("SELECT * FROM inbox_messages WHERE forwarded=0 ORDER BY id ASC LIMIT ?", (limit,)).fetchall())
def get_inbox_message(inbox_id: int) -> sqlite3.Row | None:
with closing(db()) as conn:
return conn.execute("SELECT * FROM inbox_messages WHERE id=?", (inbox_id,)).fetchone()
def list_quick_replies() -> list[dict[str, str]]:
replies = (config.get("bot") or {}).get("quick_replies") or []
return [r for r in replies if isinstance(r, dict)]
def spam_filter_settings() -> dict[str, Any]:
spam = (config.get("bot") or {}).get("spam_filter") or {}
return {
"enabled": bool(spam.get("enabled", False)),
"auto_block": bool(spam.get("auto_block", True)),
"keywords": [str(k) for k in spam.get("keywords") or [] if str(k).strip()],
}
def spam_keyword_hits(text: str) -> list[str]:
settings = spam_filter_settings()
if not settings["enabled"]:
return []
return keyword_hits(text, settings["keywords"])
def update_spam_keywords(action: str, word: str) -> list[str]:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
spam = bot_cfg.setdefault("spam_filter", {"enabled": True, "auto_block": True, "keywords": []})
words = [str(k).strip() for k in spam.get("keywords") or [] if str(k).strip()]
if action == "add" and word and word not in words:
words.append(word)
if action == "delete":
words = [k for k in words if k != word]
spam["keywords"] = words
spam.setdefault("enabled", True)
spam.setdefault("auto_block", True)
cfg_save(cfg)
return words
def record_monitor_event(monitor_name: str, title: str, link: str, reasons: list[str], pushed: bool) -> None:
with closing(db()) as conn:
conn.execute(
"""
INSERT INTO monitor_events(monitor_name, title, link, reasons, pushed, created_at)
VALUES(?,?,?,?,?,?)
""",
(monitor_name, title, link, "; ".join(reasons), 1 if pushed else 0, now_iso()),
)
conn.commit()
def lookup_reply_target(admin_chat: int, admin_message_id: int) -> int | None:
with closing(db()) as conn:
row = conn.execute(
@@ -338,36 +443,50 @@ def describe_sendpic_target(user_id: int) -> str:
async def admin_send(text: str) -> None:
if not bot or admin_chat_id is None:
if not bot or not all_admin_chat_ids():
logger.error("admin_send called before bot/admin init: %s", text)
return
try:
await bot.send_message(admin_chat_id, text, disable_web_page_preview=False)
except Exception:
logger.exception("failed to send admin notification")
for chat_id in all_admin_chat_ids():
try:
await bot.send_message(chat_id, text, disable_web_page_preview=False)
except Exception:
logger.exception("failed to send admin notification chat_id=%s", chat_id)
async def admin_send_monitor(text: str, monitor_name: str) -> bool:
if not bot or admin_chat_id is None:
if not bot or not all_admin_chat_ids():
logger.error("admin_send_monitor called before bot/admin init: %s", text)
return False
try:
sent = await bot.send_message(admin_chat_id, text, disable_web_page_preview=False)
settings = monitor_cleanup_settings()
record_monitor_message(
sent,
monitor_name,
int(settings["message_delete_after_minutes"]) * 60,
)
return True
except Exception:
logger.exception("failed to send monitor notification")
return False
sent_any = False
for chat_id in all_admin_chat_ids():
try:
sent = await bot.send_message(chat_id, text, disable_web_page_preview=False)
settings = monitor_cleanup_settings()
record_monitor_message(
sent,
monitor_name,
int(settings["message_delete_after_minutes"]) * 60,
)
sent_any = True
except Exception:
logger.exception("failed to send monitor notification chat_id=%s", chat_id)
return sent_any
async def send_text_to_user(user_id: int, text: str, source: str = "web") -> int:
if is_blocked(user_id):
raise ValueError(f"用户 {user_id} 已被封禁")
if not bot:
raise RuntimeError("Bot 尚未初始化")
sent = await bot.send_message(user_id, text.strip())
create_outbox_message(user_id, text.strip(), source, sent.message_id)
logger.info("sent message to user_id=%s message_id=%s", user_id, sent.message_id)
return int(sent.message_id)
def is_admin_chat(message: Message) -> bool:
"""Dynamic admin-chat filter."""
return admin_chat_id is not None and message.chat.id == admin_chat_id
return message.chat.id in all_admin_chat_ids()
def is_admin_action_message(message: Message) -> bool:
@@ -396,7 +515,7 @@ async def start(message: Message) -> None:
async def send_text_to_user_from_admin(message: Message, args: str | None, command_name: str) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid, text = parse_user_id_and_text(args)
@@ -409,8 +528,8 @@ async def send_text_to_user_from_admin(message: Message, args: str | None, comma
if not bot:
await message.reply("错误:Bot 尚未初始化")
return
sent = await bot.send_message(uid, text) # type: ignore[union-attr]
await message.reply(f"{command_name} 成功:已发送给用户 {uid}message_id={sent.message_id}")
message_id = await send_text_to_user(uid, text, f"tg:{command_name}")
await message.reply(f"{command_name} 成功:已发送给用户 {uid}message_id={message_id}")
except Exception as e:
logger.exception("/%s failed", command_name)
await message.reply(f"/{command_name} 失败:{e}\n用法:/{command_name} <user_id> <内容>")
@@ -428,7 +547,7 @@ async def cmd_send(message: Message, command: CommandObject) -> None:
@router.message(Command("sendpic"))
async def cmd_sendpic(message: Message, command: CommandObject) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid, caption = parse_user_id_and_optional_text(command.args)
@@ -451,13 +570,13 @@ async def cmd_sendpic(message: Message, command: CommandObject) -> None:
@router.message(Command("cancel"))
async def cmd_cancel(message: Message) -> None:
if message.chat.id == admin_chat_id and pending_sendpic.pop(message.chat.id, None):
if is_admin_chat(message) and pending_sendpic.pop(message.chat.id, None):
await message.reply("已取消待发送图片。")
@router.message(Command("block"))
async def cmd_block(message: Message, command: CommandObject) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid = parse_user_id(command.args)
@@ -473,7 +592,7 @@ async def cmd_block(message: Message, command: CommandObject) -> None:
@router.message(Command("unblock"))
async def cmd_unblock(message: Message, command: CommandObject) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid = parse_user_id(command.args)
@@ -489,7 +608,7 @@ async def cmd_unblock(message: Message, command: CommandObject) -> None:
@router.message(Command("note"))
async def cmd_note(message: Message, command: CommandObject) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid, note = parse_user_id_and_text(command.args)
@@ -505,7 +624,7 @@ async def cmd_note(message: Message, command: CommandObject) -> None:
@router.message(Command("who"))
async def cmd_who(message: Message, command: CommandObject) -> None:
if message.chat.id != admin_chat_id:
if not is_admin_chat(message):
return
try:
uid = parse_user_id(command.args)
@@ -528,6 +647,39 @@ async def cmd_who(message: Message, command: CommandObject) -> None:
await message.reply(f"/who 失败:{e}")
@router.message(Command("spamwords"))
async def cmd_spamwords(message: Message) -> None:
if not is_admin_chat(message):
return
words = spam_filter_settings()["keywords"]
text = "\n".join(f"- {html_escape(w)}" for w in words) or "暂无广告关键词"
await message.reply(f"广告关键词:\n{text}")
@router.message(Command("spamadd"))
async def cmd_spamadd(message: Message, command: CommandObject) -> None:
if not is_admin_chat(message):
return
word = (command.args or "").strip()
if not word:
await message.reply("用法:/spamadd 关键词")
return
words = update_spam_keywords("add", word)
await message.reply(f"已添加广告关键词:{html_escape(word)}\n当前共 {len(words)} 个。")
@router.message(Command("spamdel"))
async def cmd_spamdel(message: Message, command: CommandObject) -> None:
if not is_admin_chat(message):
return
word = (command.args or "").strip()
if not word:
await message.reply("用法:/spamdel 关键词")
return
words = update_spam_keywords("delete", word)
await message.reply(f"已删除广告关键词:{html_escape(word)}\n当前共 {len(words)} 个。")
@router.message(is_admin_action_message)
async def admin_reply_by_message(message: Message) -> None:
# Pending /sendpic flow: after /sendpic <uid>, the next admin photo is copied to target.
@@ -567,8 +719,8 @@ async def admin_reply_by_message(message: Message) -> None:
if is_blocked(target):
await message.reply(f"错误:用户 {target} 已被封禁,先 /unblock {target}")
return
sent = await bot.send_message(target, message.text) # type: ignore[union-attr]
await message.reply(f"已发送给用户 {target}message_id={sent.message_id}")
message_id = await send_text_to_user(target, message.text, "tg:reply")
await message.reply(f"已发送给用户 {target}message_id={message_id}")
except TelegramAPIError as e:
logger.exception("admin reply forwarding failed")
await message.reply(f"发送失败:{e}")
@@ -590,7 +742,7 @@ async def admin_plain_message(message: Message) -> None:
async def user_message(message: Message) -> None:
# Only relay private user chats to admin.
logger.info("incoming message chat_id=%s chat_type=%s from_user=%s content_type=%s text=%r", message.chat.id, message.chat.type, getattr(message.from_user, 'id', None), message.content_type, (message.text or '')[:80])
if message.chat.id == admin_chat_id:
if is_admin_chat(message):
logger.info("incoming message is admin plain message; ignored by user relay")
return
if message.chat.type != "private":
@@ -607,6 +759,15 @@ async def user_message(message: Message) -> None:
await message.answer("发送太快了,请稍后再试。")
return
inbox_id = create_inbox_message(message, uid, full, username)
spam_hits = spam_keyword_hits(message.text or message.caption or "")
if spam_hits and spam_filter_settings()["auto_block"]:
set_block(uid, True)
mark_inbox_error(inbox_id, "spam: " + ", ".join(spam_hits))
await admin_send(
f"[垃圾消息已拉黑]\nuser_id: <code>{uid}</code>\n命中:{html_escape(', '.join(spam_hits))}\n内容:{html_escape((message.text or message.caption or '')[:300])}"
)
await message.answer("消息已被系统拦截。")
return
user_row = get_user(uid)
note = user_row["note"] if user_row and "note" in user_row.keys() else ""
header = (
@@ -618,11 +779,16 @@ async def user_message(message: Message) -> None:
f"time: {html_escape(now_iso())}"
)
try:
sent = await bot.send_message(admin_chat_id, header) # type: ignore[union-attr]
save_message_map(sent, uid, message.message_id)
copied = await message.copy_to(admin_chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type]
save_message_map(copied, uid, message.message_id)
mark_inbox_forwarded(inbox_id, sent.message_id, copied.message_id)
first_header_id = None
first_copy_id = None
for chat_id in all_admin_chat_ids():
sent = await bot.send_message(chat_id, header) # type: ignore[union-attr]
save_message_map(sent, uid, message.message_id)
copied = await message.copy_to(chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type]
save_message_map(copied, uid, message.message_id)
first_header_id = first_header_id or sent.message_id
first_copy_id = first_copy_id or copied.message_id
mark_inbox_forwarded(inbox_id, first_header_id, first_copy_id)
await message.answer("已转交管理员。")
except Exception as e:
mark_inbox_error(inbox_id, repr(e))
@@ -900,6 +1066,7 @@ async def run_monitor(monitor: dict[str, Any]) -> int:
event_key = stable_key(name, item.key) if is_forum else stable_key(name, item.key, "|".join(reasons), item.price or "", item.stock or "")
if not event_not_sent(event_key, name, item.title, item.link):
continue
notify_on_tg = bool(monitor.get("notify_telegram", True))
if is_forum:
text = (
f"[新帖命中] {html_escape(name)}\n"
@@ -921,6 +1088,10 @@ async def run_monitor(monitor: dict[str, Any]) -> int:
f"库存:{html_escape(item.stock or '-')}\n"
f"时间:{html_escape(now_iso())}"
)
record_monitor_event(name, item.title, item.link, reasons, notify_on_tg)
if not notify_on_tg:
sent_count += 1
continue
if await admin_send_monitor(text, name):
sent_count += 1
except Exception:
@@ -969,7 +1140,7 @@ async def cleanup_monitor_loop() -> None:
async def flush_pending_inbox() -> None:
if not bot or admin_chat_id is None:
if not bot or not all_admin_chat_ids():
return
rows = pending_inbox(50)
if not rows:
@@ -987,9 +1158,12 @@ async def flush_pending_inbox() -> None:
f"时间: {html_escape(row['created_at'])}\n\n"
f"内容:{html_escape(row['text'] or '(非文本/媒体消息,原始媒体无法补发,仅保留记录)')}"
)
sent = await bot.send_message(admin_chat_id, text)
save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None)
mark_inbox_forwarded(int(row['id']), sent.message_id, None)
first_id = None
for chat_id in all_admin_chat_ids():
sent = await bot.send_message(chat_id, text)
save_message_map(sent, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None)
first_id = first_id or sent.message_id
mark_inbox_forwarded(int(row['id']), first_id, None)
except Exception as e:
mark_inbox_error(int(row['id']), repr(e))
logger.exception("failed to flush inbox message id=%s", row['id'])
@@ -1169,6 +1343,7 @@ def monitor_from_form(
new_item: bool,
price_change: bool,
stock_change: bool,
notify_telegram: bool = True,
) -> dict[str, Any]:
m: dict[str, Any] = {
"name": name.strip(),
@@ -1176,6 +1351,7 @@ def monitor_from_form(
"url": url.strip(),
"interval_seconds": max(int(interval_seconds or MIN_INTERVAL_SECONDS), MIN_INTERVAL_SECONDS),
"keywords": parse_lines(keywords),
"notify_telegram": notify_telegram,
"notify_on": {
"keyword_match": keyword_match,
"new_item": new_item,
@@ -1208,13 +1384,13 @@ def layout(title: str, body: str) -> str:
<style>
:root{{--canvas:#f0f0f0;--ink:#121212;--muted:#5c5c5c;--red:#d02020;--blue:#1040c0;--yellow:#f0c020;--white:#fff;--gray:#e0e0e0}}
*{{box-sizing:border-box}}body{{font-family:Outfit,Aptos,'Segoe UI',sans-serif;background:var(--canvas);color:var(--ink);margin:0;letter-spacing:0}}body:before{{content:"";position:fixed;right:-70px;top:110px;width:190px;height:190px;border:4px solid var(--ink);border-radius:50%;background:var(--yellow);z-index:-1}}body:after{{content:"";position:fixed;left:190px;bottom:-80px;width:190px;height:190px;border:4px solid var(--ink);background:var(--blue);transform:rotate(45deg);z-index:-1}}
a{{color:var(--ink);text-decoration:none}}a:hover{{text-decoration:underline}}.shell{{display:grid;grid-template-columns:238px minmax(0,1fr);min-height:100vh}}aside{{border-right:4px solid var(--ink);background:var(--white);padding:18px 14px;position:sticky;top:0;height:100vh}}main{{padding:24px 30px;min-width:0;max-width:1440px}}.brand{{display:flex;gap:10px;align-items:center;margin-bottom:20px;padding:0 4px 16px;border-bottom:4px solid var(--ink)}}.mark{{width:44px;height:44px;border:4px solid var(--ink);background:var(--white);position:relative;box-shadow:4px 4px 0 var(--ink);flex:0 0 auto}}.mark:before{{content:"";position:absolute;left:5px;top:5px;width:13px;height:13px;border:3px solid var(--ink);border-radius:50%;background:var(--red)}}.mark:after{{content:"";position:absolute;right:4px;top:5px;width:13px;height:13px;border:3px solid var(--ink);background:var(--blue)}}.mark i{{position:absolute;left:8px;bottom:4px;width:25px;height:18px;background:var(--yellow);border:3px solid var(--ink);clip-path:polygon(50% 0,0 100%,100% 100%)}}.brand b{{font-size:18px;color:var(--ink);font-weight:900;text-transform:uppercase}}.brand small{{display:block;color:var(--muted);margin-top:2px;font-weight:700}}nav{{display:grid;gap:7px}}nav a{{position:relative;padding:10px 11px;border:3px solid var(--ink);background:var(--white);color:var(--ink);font-weight:900;text-transform:uppercase;font-size:13px;box-shadow:3px 3px 0 var(--ink)}}nav a:nth-child(3n+1){{background:var(--yellow)}}nav a:nth-child(3n+2){{background:#fff}}nav a:nth-child(3n){{background:#eef2ff}}nav a:hover{{text-decoration:none;transform:translate(-1px,-1px);box-shadow:5px 5px 0 var(--ink)}}.logout{{background:var(--red)!important;color:white}}.top{{display:flex;justify-content:space-between;align-items:center;gap:12px;margin-bottom:20px;border-bottom:4px solid var(--ink);padding-bottom:14px}}.top h1{{margin:0;font-size:34px;line-height:.95;color:var(--ink);font-weight:900;text-transform:uppercase}}.top .badge{{background:var(--blue);color:white}}
a{{color:var(--ink);text-decoration:none}}a:hover{{text-decoration:underline}}.shell{{display:grid;grid-template-columns:254px minmax(0,1fr);min-height:100vh}}aside{{border-right:4px solid var(--ink);background:var(--white);padding:18px 14px;position:sticky;top:0;height:100vh;overflow:auto}}main{{padding:24px 30px;min-width:0;max-width:1440px}}.brand{{display:flex;gap:10px;align-items:center;margin-bottom:18px;padding:0 4px 16px;border-bottom:4px solid var(--ink)}}.mark{{width:44px;height:44px;border:4px solid var(--ink);background:var(--white);position:relative;box-shadow:4px 4px 0 var(--ink);flex:0 0 auto}}.mark:before{{content:"";position:absolute;left:5px;top:5px;width:13px;height:13px;border:3px solid var(--ink);border-radius:50%;background:var(--red)}}.mark:after{{content:"";position:absolute;right:4px;top:5px;width:13px;height:13px;border:3px solid var(--ink);background:var(--blue)}}.mark i{{position:absolute;left:8px;bottom:4px;width:25px;height:18px;background:var(--yellow);border:3px solid var(--ink);clip-path:polygon(50% 0,0 100%,100% 100%)}}.brand b{{font-size:18px;color:var(--ink);font-weight:900;text-transform:uppercase}}.brand small{{display:block;color:var(--muted);margin-top:2px;font-weight:700}}nav{{display:grid;gap:13px}}nav section{{display:grid;gap:6px;padding:9px;border:3px solid var(--ink);background:#fff;box-shadow:3px 3px 0 var(--ink)}}nav section>b{{display:inline-block;width:max-content;margin:-20px 0 2px -2px;padding:3px 8px;border:3px solid var(--ink);background:var(--yellow);font-size:12px;font-weight:900;text-transform:uppercase}}nav a{{position:relative;padding:9px 10px;border:3px solid var(--ink);background:var(--white);color:var(--ink);font-weight:900;text-transform:uppercase;font-size:12px;box-shadow:2px 2px 0 var(--ink)}}nav section:nth-child(2)>b{{background:var(--blue);color:white}}nav section:nth-child(3)>b{{background:var(--red);color:white}}nav section:nth-child(4)>b{{background:var(--gray)}}nav a:hover{{text-decoration:none;transform:translate(-1px,-1px);box-shadow:4px 4px 0 var(--ink)}}.logout{{background:var(--red)!important;color:white}}.top{{display:flex;justify-content:space-between;align-items:center;gap:12px;margin-bottom:20px;border-bottom:4px solid var(--ink);padding-bottom:14px}}.top h1{{margin:0;font-size:34px;line-height:.95;color:var(--ink);font-weight:900;text-transform:uppercase}}.top .badge{{background:var(--blue);color:white}}
.btn{{background:var(--white);color:var(--ink);padding:7px 11px;border:3px solid var(--ink);border-radius:0;display:inline-block;cursor:pointer;font-weight:900;line-height:1.35;text-transform:uppercase;font-size:12px;box-shadow:3px 3px 0 var(--ink)}}.btn:hover{{text-decoration:none;transform:translate(-1px,-1px);box-shadow:5px 5px 0 var(--ink)}}.btn:active{{transform:translate(2px,2px);box-shadow:1px 1px 0 var(--ink)}}.btn.primary{{background:var(--red);color:white}}.btn.danger{{background:var(--red);color:white}}.btn.ok{{background:var(--yellow);color:var(--ink)}}.actions{{display:flex;gap:8px;align-items:center;flex-wrap:wrap}}
.card{{position:relative;background:var(--white);border:4px solid var(--ink);border-radius:0;padding:18px;margin:16px 0;box-shadow:8px 8px 0 var(--ink)}}.card:after{{content:"";position:absolute;top:12px;right:12px;width:14px;height:14px;background:var(--red);border:3px solid var(--ink)}}.toolbar{{display:flex;justify-content:space-between;gap:14px;align-items:flex-start;flex-wrap:wrap;padding-right:34px}}.grid{{display:grid;grid-template-columns:repeat(auto-fit,minmax(250px,1fr));gap:14px}}.form-actions{{display:flex;gap:10px;align-items:center;flex-wrap:wrap;margin:18px 0 0}}h2,h3{{font-weight:900;text-transform:uppercase;letter-spacing:0}}h2{{font-size:24px}}h3{{font-size:16px;border-bottom:3px solid var(--ink);padding-bottom:6px;margin-top:20px}}
input,select,textarea{{width:100%;box-sizing:border-box;background:#fff;color:var(--ink);border:3px solid var(--ink);border-radius:0;padding:10px 11px;outline:none;font-size:14px;font-weight:600}}input:focus,select:focus,textarea:focus{{box-shadow:4px 4px 0 var(--blue)}}textarea{{min-height:116px;font-family:'Cascadia Mono',Consolas,monospace}}label{{display:block;margin:10px 0 5px;color:var(--ink);font-weight:900;font-size:12px;text-transform:uppercase;letter-spacing:.06em}}.check-row{{display:flex;gap:10px;align-items:center;flex-wrap:wrap}}.check-row label{{display:flex;gap:7px;align-items:center;margin:0;padding:8px 10px;border:3px solid var(--ink);background:var(--gray)}}.check-row input{{width:auto}}
small,.muted{{color:var(--muted);line-height:1.5;font-weight:600}}table{{width:100%;border-collapse:collapse;border:3px solid var(--ink);background:white}}td,th{{border:3px solid var(--ink);padding:10px;text-align:left;vertical-align:top}}th{{color:var(--ink);font-size:12px;background:var(--yellow);text-transform:uppercase;letter-spacing:.06em}}tr:nth-child(even) td{{background:#fafafa}}.badge{{padding:4px 8px;border:3px solid var(--ink);border-radius:999px;background:var(--blue);color:white;font-size:12px;font-weight:900;text-transform:uppercase}}.msg{{padding:11px 12px;border:3px solid var(--ink);background:var(--yellow);color:var(--ink);margin:10px 0;font-weight:900;box-shadow:4px 4px 0 var(--ink)}}pre{{white-space:pre-wrap;background:#121212;color:#fff;padding:13px;border:4px solid var(--ink);max-height:420px;overflow:auto;box-shadow:5px 5px 0 var(--yellow)}}
@media(max-width:860px){{.shell{{grid-template-columns:1fr}}aside{{position:relative;height:auto}}main{{padding:18px}}nav{{grid-template-columns:repeat(2,minmax(0,1fr))}}.top{{align-items:flex-start;flex-direction:column}}.card{{box-shadow:5px 5px 0 var(--ink)}}}}
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><a href='/'>监控面板</a><a href='/inbox'>收件箱</a><a href='/send'>主动发消息</a><a href='/monitor/new'>新增监控</a><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/logs'>运行日志</a><a href='/run-once'>手动检查</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><section><b>消息</b><a href='/inbox'>收件箱</a><a href='/users'>用户管理</a><a href='/send'>主动发消息</a><a href='/replies'>快捷回复</a><a href='/rules'>私聊广告拦截</a></section><section><b>监控</b><a href='/'>监控面板</a><a href='/monitor/new'>新增监控</a><a href='/monitor/events'>推送历史</a><a href='/run-once'>手动检查</a></section><section><b>配置</b><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/config/export'>导出配置</a></section><section><b>系统</b><a href='/logs'>运行日志</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></section></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
{body}</main></div></body></html>"""
@@ -1243,7 +1419,8 @@ def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -
<div class=check-row><label><input type=checkbox name=keyword_match {checked('keyword_match')}> 关键词命中</label>
<label><input type=checkbox name=new_item {checked('new_item')}> 新条目</label>
<label><input type=checkbox name=price_change {checked('price_change')}> 价格变化</label>
<label><input type=checkbox name=stock_change {checked('stock_change')}> 库存变化</label></div>
<label><input type=checkbox name=stock_change {checked('stock_change')}> 库存变化</label>
<label><input type=checkbox name=notify_telegram {'checked' if m.get('notify_telegram', True) else ''}> 推送 Telegram</label></div>
<div class=form-actions><button class='btn primary' type=submit>保存</button> <a class=btn href='/'>取消</a></div></form>"""
@@ -1284,8 +1461,9 @@ def create_panel_app() -> FastAPI:
cfg = cfg_load_fresh()
rows = []
for i, m in enumerate(cfg.get("monitors") or []):
rows.append(f"""<tr><td><span class=badge>{html_escape(m.get('type','web'))}</span></td><td><b>{html_escape(m.get('name',''))}</b><br><small>{html_escape(m.get('url',''))}</small></td><td>{html_escape(m.get('interval_seconds',60))}s</td><td>{html_escape(', '.join(m.get('keywords') or []))}</td><td><a class=btn href='/monitor/{i}/edit'>编辑</a> <a class='btn ok' href='/monitor/{i}/preview'>预览</a> <a class='btn ok' href='/monitor/{i}/run'>检查</a> <a class='btn danger' href='/monitor/{i}/delete' onclick='return confirm("确定删除?")'>删除</a></td></tr>""")
body = f"""<div class=card><div class=toolbar><div><h2 style='margin:0 0 6px'>监控目标</h2><p class=muted style='margin:0'>当前 {len(cfg.get('monitors') or [])} 个;保存后自动重载定时任务。</p></div><div class=actions><a class='btn' href='/monitor/templates'>论坛模板</a> <a class='btn primary' href='/monitor/new'>新增监控</a> <a class='btn ok' href='/monitor/bulk'>批量新增</a></div></div><table style='margin-top:16px'><tr><th>类型</th><th>目标</th><th>间隔</th><th>关键词</th><th>操作</th></tr>""" + "".join(rows) + "</table></div>"
tg = "TG" if m.get("notify_telegram", True) else "仅 Web"
rows.append(f"""<tr><td><span class=badge>{html_escape(m.get('type','web'))}</span></td><td><b>{html_escape(m.get('name',''))}</b><br><small>{html_escape(m.get('url',''))}</small></td><td>{html_escape(m.get('interval_seconds',60))}s<br><small>{tg}</small></td><td>{html_escape(', '.join(m.get('keywords') or []))}</td><td><a class=btn href='/monitor/{i}/edit'>编辑</a> <a class='btn ok' href='/monitor/{i}/preview'>预览</a> <a class='btn ok' href='/monitor/{i}/run'>检查</a> <a class='btn danger' href='/monitor/{i}/delete' onclick='return confirm("确定删除?")'>删除</a></td></tr>""")
body = f"""<div class=card><div class=toolbar><div><h2 style='margin:0 0 6px'>监控目标</h2><p class=muted style='margin:0'>当前 {len(cfg.get('monitors') or [])} 个;保存后自动重载定时任务。</p></div><div class=actions><a class='btn' href='/monitor/templates'>论坛模板</a> <a class='btn primary' href='/monitor/new'>新增监控</a> <a class='btn ok' href='/monitor/bulk'>批量新增</a></div></div><table style='margin-top:16px'><tr><th>类型</th><th>目标</th><th>间隔/通知</th><th>关键词</th><th>操作</th></tr>""" + "".join(rows) + "</table></div>"
return layout("监控", body)
@app.get("/monitor/new", response_class=HTMLResponse)
@@ -1318,7 +1496,7 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
return layout("批量新增", body)
@app.post("/monitor/bulk")
async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)):
async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form("on")):
cfg = cfg_load_fresh()
monitors = cfg.setdefault("monitors", [])
added = 0
@@ -1334,7 +1512,7 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
name, url = parts[0], parts[1]
keywords = parts[2] if len(parts) >= 3 else ""
try:
monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change)))
monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram)))
added += 1
except Exception as e:
errors.append(f"{line_no} 行失败:{html_escape(e)}")
@@ -1370,10 +1548,11 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
new_item: str | None,
price_change: str | None,
stock_change: str | None,
notify_telegram: str | None,
) -> RedirectResponse:
cfg = cfg_load_fresh()
monitors = cfg.setdefault("monitors", [])
m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change))
m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram))
if original_index is None:
monitors.append(m)
else:
@@ -1386,12 +1565,12 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
return RedirectResponse("/", status_code=303)
@app.post("/monitor/create")
async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change)
async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram)
@app.post("/monitor/save")
async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change)
async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram)
@app.get("/monitor/{idx}/delete")
async def delete_monitor(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
@@ -1459,28 +1638,37 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
status = "" if bot_ready else "<div class=msg>未填写 Token 或管理员 ID;网页可用,但 Bot 和监控推送不可用。</div>"
body = f"""<h2>Bot / 面板设置</h2>{status}<div class=card><form method=post>
<label>Telegram Bot Token</label><input name=TELEGRAM_BOT_TOKEN value='{html_escape(v['TELEGRAM_BOT_TOKEN'])}' placeholder='123456:ABC...'>
<label>管理员 ADMIN_CHAT_ID</label><input name=ADMIN_CHAT_ID value='{html_escape(v['ADMIN_CHAT_ID'])}'>
<label>管理员 ADMIN_CHAT_ID(最多 3 个,用逗号分隔)</label><input name=ADMIN_CHAT_ID value='{html_escape(v['ADMIN_CHAT_ID'])}'>
<div class=grid><div><label>日志级别</label><input name=LOG_LEVEL value='{html_escape(v['LOG_LEVEL'])}'></div><div><label>面板监听地址</label><input name=WEB_PANEL_HOST value='{html_escape(v['WEB_PANEL_HOST'])}'></div><div><label>面板端口</label><input name=WEB_PANEL_PORT value='{html_escape(v['WEB_PANEL_PORT'])}'></div><div><label>面板用户</label><input name=WEB_PANEL_USER value='{html_escape(v['WEB_PANEL_USER'])}'></div><div><label>面板密码</label><input name=WEB_PANEL_PASSWORD value='{html_escape(v['WEB_PANEL_PASSWORD'])}'></div></div>
<h3>监控数据自动清理</h3><p class=muted>删除过期监控通知消息,并清理 RSS/网站监控状态和去重记录;不会删除用户、收件箱、双向对话消息。</p><div class=grid><div><label>清理间隔(分钟)</label><input name=CLEANUP_INTERVAL_MINUTES type=number min=1 value='{html_escape(cleanup.get("interval_minutes", 60))}'></div><div><label>监控通知删除时间(分钟)</label><input name=CLEANUP_MESSAGE_DELETE_AFTER_MINUTES type=number min=1 value='{html_escape(cleanup.get("monitor_message_delete_after_minutes", 60))}'></div><div><label>保留监控数据(分钟)</label><input name=CLEANUP_RETENTION_MINUTES type=number min=1 value='{html_escape(cleanup.get("monitor_retention_minutes", 1440))}'></div></div>
<input type=hidden name=WEB_PANEL_ENABLED value='true'><div class=form-actions><button class='btn primary' type=submit>保存设置</button></div><small>改 Token、管理员 ID 或端口后需要重启。</small></form></div>"""
return layout("设置", body)
@app.post("/settings", response_class=HTMLResponse)
async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str:
write_env_values(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED})
def save_panel_settings(
values: dict[str, Any],
cleanup_interval_minutes: int,
cleanup_message_delete_after_minutes: int,
cleanup_retention_minutes: int,
) -> None:
write_env_values(values)
cfg = cfg_load_fresh()
cfg["cleanup"] = {
"enabled": True,
"interval_minutes": max(1, int(CLEANUP_INTERVAL_MINUTES)),
"monitor_message_delete_after_minutes": max(1, int(CLEANUP_MESSAGE_DELETE_AFTER_MINUTES)),
"monitor_retention_minutes": max(1, int(CLEANUP_RETENTION_MINUTES)),
"interval_minutes": max(1, int(cleanup_interval_minutes)),
"monitor_message_delete_after_minutes": max(1, int(cleanup_message_delete_after_minutes)),
"monitor_retention_minutes": max(1, int(cleanup_retention_minutes)),
}
cfg_save(cfg)
@app.post("/settings", response_class=HTMLResponse)
async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str:
save_panel_settings(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, CLEANUP_INTERVAL_MINUTES, CLEANUP_MESSAGE_DELETE_AFTER_MINUTES, CLEANUP_RETENTION_MINUTES)
return layout("已保存", "<div class=msg>已保存,不会自动重启;修改 Token/管理员 ID 后请重启。</div><p><a class=btn href='/settings'>返回</a> <a class=btn href='/restart'>重启机器人</a></p>")
@app.get("/send", response_class=HTMLResponse)
async def send_page(_: str = Depends(panel_auth)) -> str:
async def send_page(request: Request, _: str = Depends(panel_auth)) -> str:
selected_user_id = request.query_params.get("user_id", "")
with closing(db()) as conn:
users = conn.execute(
"SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 200"
@@ -1490,7 +1678,8 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
blocked = "(已封禁)" if u["blocked"] else ""
username = f"@{u['username']}" if u["username"] else ""
label = f"{u['full_name'] or u['user_id']} {username} · {u['user_id']} {blocked}"
options.append(f"<option value='{u['user_id']}'>{html_escape(label)}</option>")
selected = "selected" if str(u["user_id"]) == selected_user_id else ""
options.append(f"<option value='{u['user_id']}' {selected}>{html_escape(label)}</option>")
body = f"""<div class=card><h2>主动发消息</h2><p class=muted>只能发送给已经私聊过 Bot 的用户。</p><form method=post action='/send'>
<label>选择用户</label><select name=user_id>{''.join(options)}</select>
<label>或手动输入 user_id</label><input name=manual_user_id placeholder='例如 123456789'>
@@ -1511,12 +1700,9 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
return layout("发送失败", f"<div class=card><pre>找不到用户 {uid},对方需要先私聊 Bot。</pre></div><p><a class=btn href='/send'>返回</a></p>")
if is_blocked(uid):
return layout("发送失败", f"<div class=card><pre>用户 {uid} 已被封禁,请先 /unblock。</pre></div><p><a class=btn href='/send'>返回</a></p>")
if not bot:
return layout("发送失败", "<div class=card><pre>Bot 尚未初始化;请确认服务以正常模式运行,而不是 --panel-only。</pre></div><p><a class=btn href='/send'>返回</a></p>")
sent = await bot.send_message(uid, text.strip())
logger.info("panel sent message to user_id=%s message_id=%s", uid, sent.message_id)
await admin_send(f"[主动发送成功]\nuser_id: <code>{uid}</code>\nmessage_id: {sent.message_id}\n时间:{html_escape(now_iso())}")
return layout("发送成功", f"<div class=msg>已发送给用户 {uid}message_id={sent.message_id}。Bot 也已给管理员发送确认提醒。</div><p><a class=btn href='/send'>继续发送</a> <a class=btn href='/inbox'>收件箱</a></p>")
message_id = await send_text_to_user(uid, text.strip(), "web:send")
await admin_send(f"[主动发送成功]\nuser_id: <code>{uid}</code>\nmessage_id: {message_id}\n时间:{html_escape(now_iso())}")
return layout("发送成功", f"<div class=msg>已发送给用户 {uid}message_id={message_id}。Bot 也已给管理员发送确认提醒。</div><p><a class=btn href='/send'>继续发送</a> <a class=btn href='/inbox'>收件箱</a></p>")
except TelegramAPIError as e:
logger.exception("panel send failed")
return layout("发送失败", f"<div class=card><pre>{html_escape(e)}</pre></div><p><a class=btn href='/send'>返回</a></p>")
@@ -1530,11 +1716,22 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
rows = conn.execute("SELECT * FROM inbox_messages ORDER BY id DESC LIMIT 200").fetchall()
trs = []
for r in rows:
status_txt = "已转发" if r["forwarded"] else "未转发"
status_cls = "ok" if r["forwarded"] else "danger"
direction = r["direction"] if "direction" in r.keys() else "in"
source = r["source"] if "source" in r.keys() else "user"
if direction == "out":
status_txt, status_cls = "已回复", "ok"
elif (r["error"] or "").startswith("spam:"):
status_txt, status_cls = "已拦截", "danger"
else:
status_txt = "已转发" if r["forwarded"] else "未转发"
status_cls = "ok" if r["forwarded"] else "danger"
content = html_escape(r["text"] or "(非文本/媒体消息)")
trs.append(f"""<tr><td>#{r['id']}<br><span class='badge {status_cls}'>{status_txt}</span></td><td><b>{html_escape(r['full_name'])}</b><br><small>{r['user_id']} @{html_escape(r['username'] or '')}</small></td><td>{html_escape(r['message_type'])}<br><small>{html_escape(r['created_at'])}</small></td><td>{content}<br><small style='color:#fca5a5'>{html_escape(r['error'] or '')}</small></td><td><a class=btn href='/inbox/{r['id']}/retry'>重试转发</a></td></tr>""")
body = "<div class=card><h2>收件箱</h2><p class=muted>用户消息会先写入 SQLite,再转发给管理员;转发失败的消息可以在这里重试。</p><table><tr><th>ID/状态</th><th>用户</th><th>类型/时间</th><th>内容/错误</th><th>操作</th></tr>" + "".join(trs) + "</table></div>"
flow = "用户 -> 管理员" if direction == "in" else "管理员 -> 用户"
actions = f"<a class=btn href='/inbox/{r['id']}/reply'>回复</a>"
if direction == "in" and not r["forwarded"]:
actions += f" <a class=btn href='/inbox/{r['id']}/retry'>重试转发</a>"
trs.append(f"""<tr><td>#{r['id']}<br><span class='badge {status_cls}'>{status_txt}</span></td><td><b>{html_escape(r['full_name'])}</b><br><small>{r['user_id']} @{html_escape(r['username'] or '')}</small></td><td>{html_escape(flow)}<br><small>{html_escape(source)} · {html_escape(r['created_at'])}</small></td><td>{content}<br><small style='color:#fca5a5'>{html_escape(r['error'] or '')}</small></td><td>{actions}</td></tr>""")
body = "<div class=card><h2>收件箱</h2><p class=muted>这里显示双向机器人对话记录:用户发来的消息、Web 回复、TG 管理员回复都会记录。转发失败的入站消息可重试。</p><table><tr><th>ID/状态</th><th>用户</th><th>方向/来源</th><th>内容/错误</th><th>操作</th></tr>" + "".join(trs) + "</table></div>"
return layout("收件箱", body)
@app.get("/inbox/{msg_id}/retry")
@@ -1545,6 +1742,154 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
await flush_pending_inbox()
return RedirectResponse("/inbox", status_code=303)
@app.get("/inbox/{msg_id}/reply", response_class=HTMLResponse)
async def inbox_reply_page(msg_id: int, _: str = Depends(panel_auth)) -> str:
row = get_inbox_message(msg_id)
if not row:
raise HTTPException(404, "message not found")
options = "".join(
f"<option value='{html_escape(r.get('text',''))}'>{html_escape(r.get('title',''))}</option>"
for r in list_quick_replies()
)
body = f"""<div class=card><h2>回复用户</h2><p class=muted>#{row['id']} · {html_escape(row['full_name'])} · {row['user_id']}</p><pre>{html_escape(row['text'] or '(非文本/媒体消息)')}</pre><form method=post>
<label>快捷模板</label><select onchange="if(this.value)document.querySelector('[name=text]').value=this.value"><option value=''>选择模板</option>{options}</select>
<label>回复内容</label><textarea name=text required></textarea>
<div class=form-actions><button class='btn primary' type=submit>发送回复</button> <a class=btn href='/inbox'>返回</a></div></form></div>"""
return layout("回复用户", body)
@app.post("/inbox/{msg_id}/reply", response_class=HTMLResponse)
async def inbox_reply_save(msg_id: int, _: str = Depends(panel_auth), text: str = Form("")) -> str:
row = get_inbox_message(msg_id)
if not row:
raise HTTPException(404, "message not found")
try:
message_id = await send_text_to_user(int(row["user_id"]), text, "web:inbox")
await admin_send(f"[Web 回复成功]\nuser_id: <code>{row['user_id']}</code>\nmessage_id: {message_id}")
return layout("回复成功", f"<div class=msg>已回复用户 {row['user_id']}。</div><p><a class=btn href='/inbox'>返回收件箱</a></p>")
except Exception as e:
return layout("回复失败", f"<div class=card><pre>{html_escape(e)}</pre></div><p><a class=btn href='/inbox/{msg_id}/reply'>返回</a></p>")
@app.get("/users", response_class=HTMLResponse)
async def users_page(_: str = Depends(panel_auth)) -> str:
v = env_values()
with closing(db()) as conn:
rows = conn.execute("SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 300").fetchall()
trs = []
for u in rows:
status_txt = "封禁" if u["blocked"] else "正常"
action = "unblock" if u["blocked"] else "block"
action_txt = "解封" if u["blocked"] else "封禁"
trs.append(f"""<tr><td><b>{html_escape(u['full_name'] or u['user_id'])}</b><br><small>{u['user_id']} @{html_escape(u['username'] or '')}</small></td><td><span class=badge>{status_txt}</span><br><small>{html_escape(u['updated_at'])}</small></td><td>{html_escape(u['note'] or '')}</td><td><form method=post action='/users/{u['user_id']}/note'><input name=note value='{html_escape(u['note'] or '')}'><button class=btn type=submit>备注</button></form><div class=actions><a class=btn href='/send?user_id={u['user_id']}'>发消息</a><a class='btn danger' href='/users/{u['user_id']}/{action}'>{action_txt}</a></div></td></tr>""")
settings_card = f"""<div class=card><h2>Bot / 面板配置</h2><p class=muted>这里和“Bot / 面板设置”共用同一份 .env。修改 Token、管理员 ID、端口、账号或密码后不会自动重启,需要手动重启服务。</p><form method=post action='/users/settings'>
<label>Telegram Bot Token</label><input name=TELEGRAM_BOT_TOKEN value='{html_escape(v['TELEGRAM_BOT_TOKEN'])}' placeholder='123456:ABC...'>
<label>管理员 ADMIN_CHAT_ID(最多 3 个,用逗号分隔)</label><input name=ADMIN_CHAT_ID value='{html_escape(v['ADMIN_CHAT_ID'])}'>
<div class=grid><div><label>日志级别</label><input name=LOG_LEVEL value='{html_escape(v['LOG_LEVEL'])}'></div><div><label>面板监听地址</label><input name=WEB_PANEL_HOST value='{html_escape(v['WEB_PANEL_HOST'])}'></div><div><label>面板端口</label><input name=WEB_PANEL_PORT value='{html_escape(v['WEB_PANEL_PORT'])}'></div><div><label>面板用户</label><input name=WEB_PANEL_USER value='{html_escape(v['WEB_PANEL_USER'])}'></div><div><label>面板密码</label><input name=WEB_PANEL_PASSWORD value='{html_escape(v['WEB_PANEL_PASSWORD'])}'></div></div>
<input type=hidden name=WEB_PANEL_ENABLED value='true'><div class=form-actions><button class='btn primary' type=submit>保存配置</button> <a class=btn href='/restart'>重启机器人</a></div></form></div>"""
body = settings_card + "<div class=card><h2>用户管理</h2><table><tr><th>用户</th><th>状态</th><th>备注</th><th>操作</th></tr>" + "".join(trs) + "</table></div>"
return layout("用户管理", body)
@app.post("/users/settings", response_class=HTMLResponse)
async def users_settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin")) -> str:
cleanup = (cfg_load_fresh().get("cleanup") or {})
save_panel_settings(
locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED},
int(cleanup.get("interval_minutes", 60)),
int(cleanup.get("monitor_message_delete_after_minutes", 60)),
int(cleanup.get("monitor_retention_minutes", 1440)),
)
return layout("已保存", "<div class=msg>已保存,不会自动重启;修改 Token、管理员 ID、端口、账号或密码后请重启。</div><p><a class=btn href='/users'>返回用户管理</a> <a class=btn href='/restart'>重启机器人</a></p>")
@app.post("/users/{user_id}/note")
async def user_note_save(user_id: int, _: str = Depends(panel_auth), note: str = Form("")) -> RedirectResponse:
set_note(user_id, note.strip())
return RedirectResponse("/users", status_code=303)
@app.get("/users/{user_id}/block")
async def user_block(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
set_block(user_id, True)
return RedirectResponse("/users", status_code=303)
@app.get("/users/{user_id}/unblock")
async def user_unblock(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
set_block(user_id, False)
return RedirectResponse("/users", status_code=303)
@app.get("/rules", response_class=HTMLResponse)
async def rules_page(_: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
spam = (cfg.get("bot") or {}).get("spam_filter") or {}
keywords = "\n".join(spam.get("keywords") or [])
body = f"""<div class=card><h2>私聊广告拦截</h2><p class=muted>只拦截用户私聊 Bot 的双向对话消息,不影响 RSS/Web 监控关键词。监控内容过滤请使用监控配置里的屏蔽词。</p><form method=post>
<div class=check-row><label><input type=checkbox name=enabled {'checked' if spam.get('enabled') else ''}> 启用</label><label><input type=checkbox name=auto_block {'checked' if spam.get('auto_block', True) else ''}> 命中后自动拉黑</label></div>
<label>广告关键词(一行一个)</label><textarea name=keywords>{html_escape(keywords)}</textarea>
<div class=form-actions><button class='btn primary' type=submit>保存规则</button></div></form></div>"""
return layout("拦截规则", body)
@app.post("/rules")
async def rules_save(_: str = Depends(panel_auth), enabled: str | None = Form(None), auto_block: str | None = Form(None), keywords: str = Form("")) -> RedirectResponse:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
bot_cfg["spam_filter"] = {
"enabled": bool(enabled),
"auto_block": bool(auto_block),
"keywords": parse_lines(keywords),
}
cfg_save(cfg)
return RedirectResponse("/rules", status_code=303)
@app.get("/replies", response_class=HTMLResponse)
async def replies_page(_: str = Depends(panel_auth)) -> str:
replies = list_quick_replies()
rows = []
for i, r in enumerate(replies):
rows.append(f"""<tr><td>{i + 1}</td><td>{html_escape(r.get('title',''))}</td><td>{html_escape(r.get('text',''))}</td><td><a class='btn danger' href='/replies/{i}/delete'>删除</a></td></tr>""")
body = """<div class=card><h2>快捷回复</h2><table><tr><th>#</th><th>标题</th><th>内容</th><th>操作</th></tr>""" + "".join(rows) + """</table></div><div class=card><h2>新增模板</h2><form method=post><label>标题</label><input name=title required><label>内容</label><textarea name=text required></textarea><div class=form-actions><button class='btn primary' type=submit>保存模板</button></div></form></div>"""
return layout("快捷回复", body)
@app.post("/replies")
async def replies_save(_: str = Depends(panel_auth), title: str = Form(""), text: str = Form("")) -> RedirectResponse:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
replies = bot_cfg.setdefault("quick_replies", [])
replies.append({"title": title.strip(), "text": text.strip()})
cfg_save(cfg)
return RedirectResponse("/replies", status_code=303)
@app.get("/replies/{idx}/delete")
async def replies_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
cfg = cfg_load_fresh()
replies = (cfg.get("bot") or {}).get("quick_replies") or []
if 0 <= idx < len(replies):
replies.pop(idx)
cfg_save(cfg)
return RedirectResponse("/replies", status_code=303)
@app.get("/monitor/events", response_class=HTMLResponse)
async def monitor_events(_: str = Depends(panel_auth)) -> str:
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM monitor_events ORDER BY id DESC LIMIT 300").fetchall()
trs = []
for r in rows:
status_txt = "已推 TG" if r["pushed"] else "仅 Web"
trs.append(f"""<tr><td>#{r['id']}<br><span class=badge>{status_txt}</span></td><td><b>{html_escape(r['monitor_name'])}</b><br><small>{html_escape(r['created_at'])}</small></td><td>{html_escape(r['title'])}<br><small>{html_escape(r['link'])}</small></td><td>{html_escape(r['reasons'])}</td></tr>""")
body = "<div class=card><h2>监控推送历史</h2><table><tr><th>ID/状态</th><th>监控</th><th>条目</th><th>原因</th></tr>" + "".join(trs) + "</table></div>"
return layout("推送历史", body)
@app.get("/config/export", response_class=HTMLResponse)
async def config_export(_: str = Depends(panel_auth)) -> str:
content = CONFIG_PATH.read_text(encoding="utf-8") if CONFIG_PATH.exists() else ""
body = f"""<div class=card><h2>导出 / 导入配置</h2><p class=muted>只处理 config.yaml,不包含 Token、密码和 Session Secret。</p><form method=post action='/config/import'><textarea name=content style='min-height:520px'>{html_escape(content)}</textarea><div class=form-actions><button class='btn primary' type=submit>导入并保存</button> <a class=btn href='/yaml'>YAML 高级编辑</a></div></form></div>"""
return layout("导出配置", body)
@app.post("/config/import", response_class=HTMLResponse)
async def config_import(_: str = Depends(panel_auth), content: str = Form("")) -> str:
try:
data = yaml.safe_load(content) or {}
cfg_save(data)
return layout("导入完成", "<div class=msg>配置已导入并重载。</div><p><a class=btn href='/'>返回</a></p>")
except Exception as e:
return layout("导入失败", f"<div class=card><pre>{html_escape(e)}</pre></div><p><a class=btn href='/config/export'>返回</a></p>")
@app.get("/restart", response_class=HTMLResponse)
async def restart_page(_: str = Depends(panel_auth)) -> str:
body = """<div class=card><h2>重启机器人</h2><p class=muted>用于修改 Token、管理员 ID、面板设置等需要重启生效的配置。</p><form method=post action='/restart'><button class='btn danger' type=submit>确认重启 tg-watchbot</button></form></div>"""
@@ -1590,7 +1935,10 @@ def validate_env() -> tuple[str, int]:
raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}")
if not admin:
raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}")
return token, int(admin)
ids = parse_admin_chat_ids(admin)
if not ids:
raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}")
return token, ids[0]
def bot_env_configured() -> bool:
@@ -1599,7 +1947,7 @@ def bot_env_configured() -> bool:
async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
global bot, admin_chat_id, config, scheduler_ref
global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref
load_dotenv(ENV_PATH, override=True)
config = load_config()
setup_logging(os.getenv("LOG_LEVEL", "INFO"))
@@ -1613,6 +1961,7 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
# If .env is filled, send notifications during manual test; otherwise just log.
try:
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
except Exception as e:
logger.warning("run-once without Telegram notification: %s", e)
@@ -1628,6 +1977,7 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
while True:
await asyncio.sleep(3600)
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
dp.include_router(router)
+12
View File
@@ -6,6 +6,16 @@ bot:
rate_limit:
window_seconds: 10
max_messages: 3
spam_filter:
enabled: false
auto_block: true
keywords:
- 投资
- 博彩
- 空投
quick_replies:
- title: 已收到
text: 你好,消息已收到,我稍后处理。
monitors:
- name: NodeSeek 新帖
type: rss
@@ -23,6 +33,7 @@ monitors:
new_item: true
price_change: false
stock_change: false
notify_telegram: true
forum: true
- name: Linux.do 最新
type: rss
@@ -40,6 +51,7 @@ monitors:
new_item: true
price_change: false
stock_change: false
notify_telegram: true
forum: true
cleanup:
enabled: true
+120
View File
@@ -43,6 +43,8 @@ def install_import_stubs() -> None:
modules["apscheduler.schedulers.asyncio"].AsyncIOScheduler = object
modules["bs4"].BeautifulSoup = object
modules["dotenv"].load_dotenv = lambda *args, **kwargs: None
modules["yaml"].safe_load = lambda stream: {"bot": {"spam_filter": {"enabled": True, "keywords": []}}}
modules["yaml"].safe_dump = lambda data, **kwargs: str(data)
modules["aiogram"].Bot = object
modules["aiogram"].Dispatcher = object
modules["aiogram"].F = object()
@@ -76,11 +78,16 @@ class FakeBot:
def __init__(self) -> None:
self.deleted: list[tuple[int, int]] = []
self.sent_texts: list[str] = []
self.sent_chat_ids: list[int] = []
self.fail_chat_ids: set[int] = set()
async def delete_message(self, chat_id: int, message_id: int) -> None:
self.deleted.append((chat_id, message_id))
async def send_message(self, chat_id: int, text: str, disable_web_page_preview: bool = False):
if chat_id in self.fail_chat_ids:
raise RuntimeError("send failed")
self.sent_chat_ids.append(chat_id)
self.sent_texts.append(text)
return SimpleNamespace(chat=SimpleNamespace(id=chat_id), message_id=3003)
@@ -99,10 +106,12 @@ class MonitorMessageCleanupTest(unittest.TestCase):
def test_monitor_notification_send_is_recorded_for_later_deletion(self) -> None:
old_bot = app.bot
old_admin_chat_id = app.admin_chat_id
old_admin_chat_ids = app.admin_chat_ids
old_config = app.config
fake_bot = FakeBot()
app.bot = fake_bot
app.admin_chat_id = 1001
app.admin_chat_ids = []
app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}}
try:
sent = asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖"))
@@ -116,8 +125,57 @@ class MonitorMessageCleanupTest(unittest.TestCase):
finally:
app.bot = old_bot
app.admin_chat_id = old_admin_chat_id
app.admin_chat_ids = old_admin_chat_ids
app.config = old_config
def test_monitor_event_history_is_recorded(self) -> None:
app.record_monitor_event("NodeSeek 新帖", "title", "https://example.com", ["关键词"], False)
with closing(sqlite3.connect(app.DB_PATH)) as conn:
row = conn.execute("SELECT monitor_name, title, pushed FROM monitor_events").fetchone()
self.assertEqual(("NodeSeek 新帖", "title", 0), row)
def test_monitor_notification_is_sent_to_all_admins(self) -> None:
old_bot = app.bot
old_admin_chat_ids = app.admin_chat_ids
old_config = app.config
fake_bot = FakeBot()
app.bot = fake_bot
app.admin_chat_ids = [1001, 1002, 1003]
app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}}
try:
self.assertTrue(asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖")))
self.assertEqual([1001, 1002, 1003], fake_bot.sent_chat_ids)
finally:
app.bot = old_bot
app.admin_chat_ids = old_admin_chat_ids
app.config = old_config
def test_monitor_notification_continues_when_one_admin_fails(self) -> None:
old_bot = app.bot
old_admin_chat_ids = app.admin_chat_ids
old_config = app.config
fake_bot = FakeBot()
fake_bot.fail_chat_ids.add(1002)
app.bot = fake_bot
app.admin_chat_ids = [1001, 1002, 1003]
app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}}
try:
with self.assertLogs("tg-watchbot", level="ERROR"):
self.assertTrue(asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖")))
self.assertEqual([1001, 1003], fake_bot.sent_chat_ids)
finally:
app.bot = old_bot
app.admin_chat_ids = old_admin_chat_ids
app.config = old_config
def test_outbound_message_is_recorded_in_conversation_log(self) -> None:
app.upsert_user(2001, "User", "user")
outbox_id = app.create_outbox_message(2001, "reply text", "web:inbox", 4004)
with closing(sqlite3.connect(app.DB_PATH)) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT direction, source, text, forwarded FROM inbox_messages WHERE id=?", (outbox_id,)).fetchone()
self.assertEqual(("out", "web:inbox", "reply text", 1), (row["direction"], row["source"], row["text"], row["forwarded"]))
def test_expired_monitor_message_is_deleted_and_removed_from_queue(self) -> None:
sent_message = SimpleNamespace(chat=SimpleNamespace(id=1001), message_id=2002)
app.record_monitor_message(sent_message, "NodeSeek 新帖", delete_after_seconds=60, sent_at_ts=1000)
@@ -146,6 +204,9 @@ class MonitorMessageCleanupTest(unittest.TestCase):
class BotConfigurationTest(unittest.TestCase):
def test_parse_admin_chat_ids_keeps_unique_first_three(self) -> None:
self.assertEqual([1, 2, 3], app.parse_admin_chat_ids("1,2 2;3,4"))
def test_bot_is_not_configured_without_token_or_admin_chat_id(self) -> None:
old_token = os.environ.pop("TELEGRAM_BOT_TOKEN", None)
old_admin = os.environ.pop("ADMIN_CHAT_ID", None)
@@ -217,9 +278,68 @@ class PanelHtmlContractTest(unittest.TestCase):
"name=new_item",
"name=price_change",
"name=stock_change",
"name=notify_telegram",
]:
self.assertIn(expected, html)
def test_monitor_form_can_disable_telegram_notification(self) -> None:
monitor = {
"type": "rss",
"interval_seconds": 60,
"notify_telegram": False,
"notify_on": {"keyword_match": True},
}
html = app.monitor_form_html(monitor)
self.assertIn("name=notify_telegram", html)
self.assertNotIn("name=notify_telegram checked", html)
def test_layout_groups_navigation_by_domain(self) -> None:
html = app.layout("测试", "<p>ok</p>")
for expected in ["<b>消息</b>", "<b>监控</b>", "<b>配置</b>", "<b>系统</b>", "私聊广告拦截"]:
self.assertIn(expected, html)
def test_inbox_copy_describes_two_way_conversation(self) -> None:
source = Path("app.py").read_text(encoding="utf-8")
self.assertIn("这里显示双向机器人对话记录", source)
self.assertIn("管理员 -> 用户", source)
def test_users_page_keeps_shared_settings_form(self) -> None:
source = Path("app.py").read_text(encoding="utf-8")
self.assertIn("action='/users/settings'", source)
self.assertIn("这里和“Bot / 面板设置”共用同一份 .env", source)
class SpamAndTemplateConfigTest(unittest.TestCase):
def test_spam_keyword_hits_follow_config(self) -> None:
old_config = app.config
app.config = {"bot": {"spam_filter": {"enabled": True, "keywords": ["博彩", "投资"]}}}
try:
self.assertEqual(["博彩"], app.spam_keyword_hits("这里有博彩广告"))
finally:
app.config = old_config
def test_quick_replies_are_loaded_from_config(self) -> None:
old_config = app.config
app.config = {"bot": {"quick_replies": [{"title": "收到", "text": "稍后处理"}]}}
try:
self.assertEqual("收到", app.list_quick_replies()[0]["title"])
finally:
app.config = old_config
def test_update_spam_keywords_writes_config(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
old_config_path = app.CONFIG_PATH
old_config = app.config
app.CONFIG_PATH = Path(temp_dir) / "config.yaml"
app.CONFIG_PATH.write_text("bot:\n spam_filter:\n enabled: true\n keywords: []\n", encoding="utf-8")
app.config = {"bot": {"spam_filter": {"enabled": True, "keywords": []}}}
try:
self.assertEqual(["广告"], app.update_spam_keywords("add", "广告"))
self.assertEqual([], app.update_spam_keywords("delete", "广告"))
finally:
app.CONFIG_PATH = old_config_path
app.config = old_config
if __name__ == "__main__":
unittest.main()