<内容>\n"
"或在收件箱里回复某条用户消息;也可以打开面板的「主动发消息」。"
)
@router.message()
async def user_message(message: Message) -> None:
# Only relay private user chats to admin.
logger.info("incoming message chat_id=%s chat_type=%s from_user=%s content_type=%s text=%r", message.chat.id, message.chat.type, getattr(message.from_user, 'id', None), message.content_type, (message.text or '')[:80])
if is_admin_chat(message):
logger.info("incoming message is admin plain message; ignored by user relay")
return
if message.chat.type != "private":
if message.chat.type in {"group", "supergroup"}:
try:
record_discovered_group_chat(message)
await handle_group_keyword_message(message, listen_source="bot")
except Exception:
logger.exception("group keyword handling failed chat_id=%s message_id=%s", message.chat.id, message.message_id)
logger.info("incoming message ignored because chat_type is not private: %s", message.chat.type)
return
uid, full, username = user_display(message)
if not uid:
return
upsert_user(uid, full, username)
if is_blocked(uid):
await message.answer("你当前无法发送消息。")
return
if rate_limited(uid):
await message.answer("发送太快了,请稍后再试。")
return
inbox_id = create_inbox_message(message, uid, full, username)
spam_hits = spam_keyword_hits(message.text or message.caption or "")
if spam_hits and spam_filter_settings()["auto_block"]:
set_block(uid, True)
mark_inbox_error(inbox_id, "spam: " + ", ".join(spam_hits))
await admin_send(
f"[垃圾消息已拉黑]\nuser_id: {uid}\n命中:{html_escape(', '.join(spam_hits))}\n内容:{html_escape((message.text or message.caption or '')[:300])}"
)
await message.answer("消息已被系统拦截。")
return
user_row = get_user(uid)
note = user_row["note"] if user_row and "note" in user_row.keys() else ""
header = (
f"[用户消息 #{inbox_id}]\n"
f"user_id: {uid}\n"
f"name: {html_escape(full)}\n"
f"username: @{html_escape(username or '')}\n"
f"note: {html_escape(note)}\n"
f"time: {html_escape(now_iso())}"
)
try:
first_header_id = None
first_copy_id = None
for chat_id in all_admin_chat_ids():
sent = await bot.send_message(chat_id, header) # type: ignore[union-attr]
save_message_map(chat_id, sent.message_id, uid, message.message_id)
copied = await message.copy_to(chat_id, reply_to_message_id=sent.message_id) # type: ignore[arg-type]
save_message_map(chat_id, copied.message_id, uid, message.message_id)
first_header_id = first_header_id or sent.message_id
first_copy_id = first_copy_id or copied.message_id
mark_inbox_forwarded(inbox_id, first_header_id, first_copy_id)
await message.answer("已转交管理员。")
except Exception as e:
mark_inbox_error(inbox_id, repr(e))
logger.exception("failed to relay user message, saved inbox_id=%s", inbox_id)
await message.answer("已收到留言,但转发管理员暂时失败;系统会稍后自动重试。")
@dataclass
class MonitorItem:
key: str
title: str
link: str
text: str
price: str | None = None
stock: str | None = None
author: str | None = None
published: str | None = None
category: str | None = None
def stable_key(*parts: str) -> str:
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest()
def extract_price(text: str) -> str | None:
m = re.search(r"(?:¥|¥|\$|USD|CNY)?\s*\d+(?:[.,]\d{1,2})?", text, re.I)
return m.group(0).strip() if m else None
def keyword_hits(text: str, keywords: list[str]) -> list[str]:
low = text.lower()
return [k for k in keywords if k and k.lower() in low]
def item_blocked(item: MonitorItem, monitor: dict[str, Any]) -> tuple[bool, str]:
text = f"{item.title} {item.text} {item.author or ''} {item.category or ''}"
exclude_hits = keyword_hits(text, monitor.get("exclude_keywords") or [])
if exclude_hits:
return True, "屏蔽词 " + ", ".join(exclude_hits)
authors = [a.lower() for a in (monitor.get("authors") or []) if a]
if authors and (item.author or "").lower() not in authors:
return True, "作者不匹配"
categories = [c.lower() for c in (monitor.get("categories") or []) if c]
if categories and not any(c in (item.category or "").lower() for c in categories):
return True, "分类不匹配"
return False, ""
async def fetch_url(client: httpx.AsyncClient, url: str) -> str:
resp = await client.get(url, follow_redirects=True)
resp.raise_for_status()
return resp.text
def parse_web_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]:
selectors = monitor.get("selectors") or {}
item_sel = selectors.get("item") or "article, .thread, .post, li"
title_sel = selectors.get("title") or "h1, h2, h3, a"
link_sel = selectors.get("link") or "a"
price_sel = selectors.get("price")
stock_sel = selectors.get("stock")
soup = BeautifulSoup(body, "html.parser")
nodes = soup.select(item_sel)[:100]
if not nodes:
nodes = [soup]
items: list[MonitorItem] = []
for node in nodes:
title_node = node.select_one(title_sel) if title_sel else None
link_node = node.select_one(link_sel) if link_sel else None
title = (title_node.get_text(" ", strip=True) if title_node else node.get_text(" ", strip=True)[:120]).strip()
href = link_node.get("href") if link_node else ""
link = urljoin(monitor.get("url", ""), href) if href else monitor.get("url", "")
text = node.get_text(" ", strip=True)
price = None
stock = None
if price_sel and (p := node.select_one(price_sel)):
price = p.get_text(" ", strip=True)
else:
price = extract_price(text)
if stock_sel and (s := node.select_one(stock_sel)):
stock = s.get_text(" ", strip=True)
for hint in ["有货", "无货", "缺货", "in stock", "out of stock", "sold out", "available"]:
if hint.lower() in text.lower():
stock = hint
break
if title or text:
key = stable_key(link, title or text[:80])
items.append(MonitorItem(key=key, title=title or "(no title)", link=link, text=text, price=price, stock=stock))
return items
def canonical_forum_key(link: str, entry_id: str = "") -> str:
"""Return a stable topic/post key that survives title edits and RSS updated ids."""
target = link or entry_id
patterns = [
r"nodeseek\.com/post-(\d+)",
r"linux\.do/t/(?:[^/]+/)?(\d+)",
r"/t/(?:[^/]+/)?(\d+)",
]
for value in [target, entry_id, link]:
for pattern in patterns:
m = re.search(pattern, value or "", re.I)
if m:
return m.group(1)
return stable_key(link or entry_id)
def parse_rss_items(monitor: dict[str, Any], body: str) -> list[MonitorItem]:
feed = feedparser.parse(body)
items: list[MonitorItem] = []
for e in feed.entries[:100]:
title = getattr(e, "title", "(no title)")
link = getattr(e, "link", monitor.get("url", ""))
summary = getattr(e, "summary", "")
content = " ".join([c.get("value", "") for c in getattr(e, "content", []) if isinstance(c, dict)])
published = getattr(e, "published", "") or getattr(e, "updated", "")
author = getattr(e, "author", "") or getattr(e, "dc_creator", "")
category = ""
tags = getattr(e, "tags", None) or []
if tags:
category = ", ".join([t.get("term", "") for t in tags if isinstance(t, dict) and t.get("term")])
entry_id = getattr(e, "id", "") or getattr(e, "guid", "")
key = canonical_forum_key(link, entry_id) if (monitor.get("forum") or monitor.get("type") == "rss") else stable_key(entry_id, link, title)
items.append(MonitorItem(key=key, title=title, link=link, text=f"{title} {summary} {content} {published} {author} {category}", author=author, published=published, category=category))
return items
def should_notify_and_update(monitor: dict[str, Any], item: MonitorItem, hits: list[str]) -> list[str]:
name = monitor["name"]
notify_on = monitor.get("notify_on") or {}
reasons: list[str] = []
with closing(db()) as conn:
prev = conn.execute(
"SELECT * FROM monitor_state WHERE monitor_name=? AND item_key=?",
(name, item.key),
).fetchone()
is_forum = bool(monitor.get("forum") or monitor.get("type") == "rss")
if is_forum:
# 论坛/RSS 帖子只在首次出现并命中时通知一次。
# 后续 RSS 因回复/编辑把同一链接重新排到前面时,只更新状态,不再重复推送。
if prev is None:
if notify_on.get("new_item", False):
reasons.append("新条目")
if notify_on.get("keyword_match", True) and hits:
reasons.append("关键词 " + ", ".join(hits))
else:
if prev is None:
if notify_on.get("new_item", False):
reasons.append("新条目")
else:
if notify_on.get("price_change", False) and (item.price or "") != (prev["price"] or ""):
reasons.append(f"价格变化 {prev['price'] or '-'} -> {item.price or '-'}")
if notify_on.get("stock_change", False) and (item.stock or "") != (prev["stock"] or ""):
reasons.append(f"库存变化 {prev['stock'] or '-'} -> {item.stock or '-'}")
if notify_on.get("keyword_match", True) and hits:
reasons.append("关键词 " + ", ".join(hits))
conn.execute(
"""
INSERT INTO monitor_state(monitor_name, item_key, price, stock, title, link, updated_at)
VALUES(?,?,?,?,?,?,?)
ON CONFLICT(monitor_name, item_key) DO UPDATE SET
price=excluded.price, stock=excluded.stock, title=excluded.title,
link=excluded.link, updated_at=excluded.updated_at
""",
(name, item.key, item.price, item.stock, item.title, item.link, now_iso()),
)
conn.commit()
return reasons
def event_not_sent(event_key: str, monitor_name: str, title: str, link: str) -> bool:
with closing(db()) as conn:
try:
conn.execute(
"INSERT INTO sent_events(event_key, monitor_name, title, link, created_at) VALUES(?,?,?,?,?)",
(event_key, monitor_name, title, link, now_iso()),
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def record_monitor_message(
chat_id: int,
message_id: int,
monitor_name: str,
delete_after_seconds: int,
sent_at_ts: float | None = None,
) -> None:
sent_ts = time.time() if sent_at_ts is None else sent_at_ts
sent_at = datetime.fromtimestamp(sent_ts, timezone.utc).astimezone().isoformat(timespec="seconds")
with closing(db()) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO monitor_messages(
chat_id, message_id, monitor_name, sent_at, delete_after_seconds, delete_error
) VALUES(?,?,?,?,?,NULL)
""",
(
chat_id,
message_id,
monitor_name,
sent_at,
max(1, int(delete_after_seconds)),
),
)
conn.commit()
async def delete_expired_monitor_messages(delete_bot: Any, now_ts: float | None = None) -> int:
now_value = time.time() if now_ts is None else now_ts
with closing(db()) as conn:
rows = conn.execute(
"SELECT chat_id, message_id, sent_at, delete_after_seconds FROM monitor_messages"
).fetchall()
deleted_count = 0
for row in rows:
sent_at_ts = datetime.fromisoformat(row["sent_at"]).timestamp()
if sent_at_ts + int(row["delete_after_seconds"]) > now_value:
continue
try:
await delete_bot.delete_message(int(row["chat_id"]), int(row["message_id"]))
except Exception as e:
logger.exception("failed to delete monitor message chat_id=%s message_id=%s", row["chat_id"], row["message_id"])
with closing(db()) as conn:
conn.execute(
"UPDATE monitor_messages SET delete_error=? WHERE chat_id=? AND message_id=?",
(str(e)[:1000], row["chat_id"], row["message_id"]),
)
conn.commit()
continue
with closing(db()) as conn:
conn.execute(
"DELETE FROM monitor_messages WHERE chat_id=? AND message_id=?",
(row["chat_id"], row["message_id"]),
)
conn.commit()
deleted_count += 1
return deleted_count
async def run_monitor(monitor: dict[str, Any]) -> int:
name = monitor.get("name", "unnamed")
mtype = monitor.get("type", "web")
url = monitor.get("url")
started = time.time()
if not url:
logger.error("monitor %s missing url", name)
record_monitor_runtime(name, ok=False, duration_ms=int((time.time() - started) * 1000), sent_count=0, error="missing url")
return 0
keywords = monitor.get("keywords") or []
timeout = int((config.get("http") or {}).get("timeout_seconds", 20))
ua = (config.get("http") or {}).get("user_agent") or DEFAULT_UA
headers = {"User-Agent": ua, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}
sent_count = 0
try:
async with httpx.AsyncClient(timeout=timeout, headers=headers) as client:
body = await fetch_url(client, url)
items = parse_rss_items(monitor, body) if mtype == "rss" else parse_web_items(monitor, body)
for item in items:
blocked, block_reason = item_blocked(item, monitor)
if blocked:
logger.debug("monitor %s skipped item %s: %s", name, item.title, block_reason)
continue
hits = keyword_hits(f"{item.title} {item.text}", keywords)
# If keywords are configured and keyword_match is enabled, do not push unrelated new posts.
notify_on = monitor.get("notify_on") or {}
if keywords and notify_on.get("keyword_match", True) and not hits and not (notify_on.get("price_change") or notify_on.get("stock_change")):
should_notify_and_update(monitor, item, []) # still remember state to avoid later old flood
continue
reasons = should_notify_and_update(monitor, item, hits)
if not reasons:
continue
# 论坛/RSS 以帖子本身作为事件键;不要把“命中原因/检查时间/编辑变化”放进去,避免同一帖重复发。
is_forum = monitor.get("forum") or mtype == "rss"
event_key = stable_key(name, item.key) if is_forum else stable_key(name, item.key, "|".join(reasons), item.price or "", item.stock or "")
if not event_not_sent(event_key, name, item.title, item.link):
continue
notify_on_tg = bool(monitor.get("notify_telegram", True))
if is_forum:
text = (
f"[新帖命中] {html_escape(name)}\n"
f"标题:{html_escape(item.title)}\n"
f"作者:{html_escape(item.author or '-')}\n"
f"分类:{html_escape(item.category or '-')}\n"
f"链接:{html_escape(item.link)}\n"
f"命中:{html_escape('; '.join(reasons))}\n"
f"发布时间:{html_escape(item.published or '-')}\n"
f"检查时间:{html_escape(now_iso())}"
)
else:
text = (
f"[库存/关键词命中] {html_escape(name)}\n"
f"标题:{html_escape(item.title)}\n"
f"链接:{html_escape(item.link)}\n"
f"命中:{html_escape('; '.join(reasons))}\n"
f"价格:{html_escape(item.price or '-')}\n"
f"库存:{html_escape(item.stock or '-')}\n"
f"时间:{html_escape(now_iso())}"
)
record_monitor_event(name, item.title, item.link, reasons, notify_on_tg)
if not notify_on_tg:
sent_count += 1
continue
if await admin_send_monitor(text, name):
sent_count += 1
record_monitor_runtime(name, ok=True, duration_ms=int((time.time() - started) * 1000), sent_count=sent_count)
except Exception as e:
logger.exception("monitor failed: %s %s", name, url)
record_monitor_runtime(name, ok=False, duration_ms=int((time.time() - started) * 1000), sent_count=sent_count, error=str(e))
return sent_count
def cleanup_monitor_data(retention_minutes: int) -> tuple[int, int]:
"""Delete only website/RSS monitor state older than retention.
Keeps two-way conversation tables intact: users, message_map, inbox_messages.
"""
cutoff_ts = time.time() - max(1, int(retention_minutes)) * 60
cutoff = datetime.fromtimestamp(cutoff_ts, timezone.utc).astimezone().isoformat(timespec="seconds")
with closing(db()) as conn:
cur1 = conn.execute("DELETE FROM monitor_state WHERE updated_at < ?", (cutoff,))
cur2 = conn.execute("DELETE FROM sent_events WHERE created_at < ?", (cutoff,))
conn.commit()
return int(cur1.rowcount or 0), int(cur2.rowcount or 0)
async def cleanup_monitor_loop() -> None:
last_data_cleanup_ts = 0.0
while True:
settings = monitor_cleanup_settings()
await asyncio.sleep(min(60, int(settings["interval_minutes"]) * 60))
if not settings["enabled"]:
continue
try:
state_n, sent_n = 0, 0
message_n = 0
if bot:
message_n = await delete_expired_monitor_messages(bot)
interval_seconds = int(settings["interval_minutes"]) * 60
if time.time() - last_data_cleanup_ts >= interval_seconds:
state_n, sent_n = cleanup_monitor_data(int(settings["retention_minutes"]))
last_data_cleanup_ts = time.time()
logger.info(
"monitor cleanup done retention=%smin deleted monitor_state=%s sent_events=%s monitor_messages=%s",
settings["retention_minutes"], state_n, sent_n, message_n,
)
except Exception:
logger.exception("monitor cleanup failed")
async def flush_pending_inbox() -> None:
if not bot or not all_admin_chat_ids():
return
rows = pending_inbox(50)
if not rows:
return
logger.info("flushing pending inbox messages: %d", len(rows))
for row in rows:
try:
text = (
f"[补发用户消息 #{row['id']}]\n"
f"user_id: {row['user_id']}\n"
f"name: {html_escape(row['full_name'])}\n"
f"username: @{html_escape(row['username'] or '')}\n"
f"原消息ID: {row['user_message_id']}\n"
f"类型: {html_escape(row['message_type'])}\n"
f"时间: {html_escape(row['created_at'])}\n\n"
f"内容:{html_escape(row['text'] or '(非文本/媒体消息,原始媒体无法补发,仅保留记录)')}"
)
first_id = None
for chat_id in all_admin_chat_ids():
sent = await bot.send_message(chat_id, text)
save_message_map(chat_id, sent.message_id, int(row['user_id']), int(row['user_message_id']) if row['user_message_id'] else None)
first_id = first_id or sent.message_id
mark_inbox_forwarded(int(row['id']), first_id, None)
except Exception as e:
mark_inbox_error(int(row['id']), repr(e))
logger.exception("failed to flush inbox message id=%s", row['id'])
async def flush_pending_loop() -> None:
while True:
try:
await flush_pending_inbox()
except Exception:
logger.exception("flush_pending_loop failed")
await asyncio.sleep(60)
async def run_all_monitors_once() -> None:
monitors = config.get("monitors") or []
logger.info("manual/all monitor run start, count=%d", len(monitors))
total = 0
for m in monitors:
total += await run_monitor(m)
logger.info("manual/all monitor run done, notifications=%d", total)
def schedule_monitors(scheduler: AsyncIOScheduler) -> None:
for idx, m in enumerate(config.get("monitors") or []):
name = m.get("name", "unnamed")
requested = int(m.get("interval_seconds", MIN_INTERVAL_SECONDS))
interval = max(requested, MIN_INTERVAL_SECONDS)
if requested < MIN_INTERVAL_SECONDS:
logger.warning("monitor %s interval %s raised to %s", name, requested, interval)
# Use index+hash, not just name: duplicate names should not crash saving/reloading.
job_key = stable_key(str(idx), name, m.get("url", ""))[:16]
scheduler.add_job(run_monitor, "interval", seconds=interval, args=[m], id=f"monitor:{idx}:{job_key}", max_instances=1, coalesce=True, replace_existing=True, next_run_time=datetime.now(timezone.utc))
logger.info("scheduled monitor %s every %ss", name, interval)
# -----------------------------
# Web admin panel
# -----------------------------
def panel_enabled() -> bool:
return os.getenv("WEB_PANEL_ENABLED", "true").lower() not in {"0", "false", "no", "off"}
def session_secret() -> str:
secret = os.getenv("WEB_PANEL_SESSION_SECRET", "").strip()
if not secret:
secret = secrets.token_urlsafe(32)
vals = env_values()
vals["WEB_PANEL_SESSION_SECRET"] = secret
write_env_values(vals)
return secret
def session_token(username: str) -> str:
raw = f"{username}|{session_secret()}"
return hashlib.sha256(raw.encode()).hexdigest()
def is_logged_in(request: Request) -> bool:
username = os.getenv("WEB_PANEL_USER", "admin")
token = request.cookies.get("tg_watchbot_session", "")
return bool(token) and secrets.compare_digest(token, session_token(username))
def panel_auth(request: Request) -> str:
# Actual redirect is handled by middleware; dependencies cannot reliably return redirects.
return os.getenv("WEB_PANEL_USER", "admin")
def login_page(error: str = "") -> str:
err = f"{html_escape(error)}
" if error else ""
return f"""
登录 · tg-watchbot
tg-watchbot 登录后管理 Telegram 机器人、关键词监控和提醒。
{err} """
def env_values() -> dict[str, str]:
load_dotenv(ENV_PATH, override=True)
return {
"TELEGRAM_BOT_TOKEN": os.getenv("TELEGRAM_BOT_TOKEN", ""),
"ADMIN_CHAT_ID": os.getenv("ADMIN_CHAT_ID", ""),
"LOG_LEVEL": os.getenv("LOG_LEVEL", "INFO"),
"WEB_PANEL_ENABLED": os.getenv("WEB_PANEL_ENABLED", "true"),
"WEB_PANEL_HOST": os.getenv("WEB_PANEL_HOST", "127.0.0.1"),
"WEB_PANEL_PORT": os.getenv("WEB_PANEL_PORT", "8765"),
"WEB_PANEL_USER": os.getenv("WEB_PANEL_USER", "admin"),
"WEB_PANEL_PASSWORD": os.getenv("WEB_PANEL_PASSWORD", "admin"),
"WEB_PANEL_SESSION_SECRET": os.getenv("WEB_PANEL_SESSION_SECRET", ""),
"TG_API_ID": os.getenv("TG_API_ID", ""),
"TG_API_HASH": os.getenv("TG_API_HASH", ""),
"TG_API_SESSION": os.getenv("TG_API_SESSION", ""),
"TG_PROXY": os.getenv("TG_PROXY", ""),
}
def write_env_values(values: dict[str, str]) -> None:
existing = {}
if ENV_PATH.exists():
for line in ENV_PATH.read_text(encoding="utf-8", errors="replace").splitlines():
if "=" in line and not line.lstrip().startswith("#"):
key, value = line.split("=", 1)
existing[key.strip()] = value.strip()
session_value = values.get("WEB_PANEL_SESSION_SECRET") or existing.get("WEB_PANEL_SESSION_SECRET", "")
lines = [
"# tg-watchbot environment",
f"TELEGRAM_BOT_TOKEN={values.get('TELEGRAM_BOT_TOKEN','')}",
f"ADMIN_CHAT_ID={values.get('ADMIN_CHAT_ID','')}",
f"LOG_LEVEL={values.get('LOG_LEVEL','INFO')}",
"",
"# Web 管理面板;默认只监听本机,建议用 SSH 隧道或反代再暴露",
f"WEB_PANEL_ENABLED={values.get('WEB_PANEL_ENABLED','true')}",
f"WEB_PANEL_HOST={values.get('WEB_PANEL_HOST','127.0.0.1')}",
f"WEB_PANEL_PORT={values.get('WEB_PANEL_PORT','8765')}",
f"WEB_PANEL_USER={values.get('WEB_PANEL_USER','admin')}",
f"WEB_PANEL_PASSWORD={values.get('WEB_PANEL_PASSWORD','admin')}",
f"WEB_PANEL_SESSION_SECRET={session_value}",
f"TG_API_ID={values.get('TG_API_ID','')}",
f"TG_API_HASH={values.get('TG_API_HASH','')}",
f"TG_API_SESSION={values.get('TG_API_SESSION','')}",
f"TG_PROXY={values.get('TG_PROXY','')}",
"",
]
ENV_PATH.write_text("\n".join(lines), encoding="utf-8")
ENV_PATH.chmod(0o600)
load_dotenv(ENV_PATH, override=True)
def cfg_load_fresh() -> dict[str, Any]:
return load_config()
def cfg_save(new_cfg: dict[str, Any]) -> None:
if not isinstance(new_cfg, dict):
raise ValueError("配置根节点必须是对象")
monitors = new_cfg.setdefault("monitors", [])
if not isinstance(monitors, list):
raise ValueError("monitors 必须是列表")
for m in monitors:
if not isinstance(m, dict):
raise ValueError("每个 monitor 必须是对象")
if int(m.get("interval_seconds", MIN_INTERVAL_SECONDS)) < MIN_INTERVAL_SECONDS:
m["interval_seconds"] = MIN_INTERVAL_SECONDS
group_monitor_rows = new_cfg.get("group_monitors") or []
if group_monitor_rows is not None and not isinstance(group_monitor_rows, list):
raise ValueError("group_monitors 必须是列表")
for gm in group_monitor_rows:
if not isinstance(gm, dict):
raise ValueError("每个 group_monitor 必须是对象")
if "chat_id" in gm:
gm["chat_id"] = safe_int(gm["chat_id"], 0)
gm.setdefault("enabled", True)
gm.setdefault("keywords", [])
gm.setdefault("exclude_keywords", [])
gm.setdefault("notify_telegram", True)
listen_source = str(gm.get("listen_source") or "bot").strip().lower() or "bot"
if listen_source not in {"bot", "user_session"}:
listen_source = "bot"
gm["listen_source"] = listen_source
summary_mode = str(gm.get("summary_mode") or "template").strip().lower() or "template"
if summary_mode not in {"template", "ai"}:
summary_mode = "template"
gm["summary_mode"] = summary_mode
gm["ai_base_url"] = str(gm.get("ai_base_url") or "").strip()
gm["ai_api_key"] = str(gm.get("ai_api_key") or "").strip()
gm["ai_model"] = str(gm.get("ai_model") or "gpt-4o-mini").strip()
ai_interface = str(gm.get("ai_interface") or "responses").strip().lower() or "responses"
if ai_interface not in {"responses", "chat"}:
ai_interface = "responses"
gm["ai_interface"] = ai_interface
gm["ai_temperature"] = safe_float(gm.get("ai_temperature", 0.2), 0.2)
gm["ai_timeout_seconds"] = max(1, safe_int(gm.get("ai_timeout_seconds", 30), 30))
gm["ai_prompt"] = str(gm.get("ai_prompt") or "").strip()
gm["ai_min_interval_seconds"] = max(
0,
safe_int(gm.get("ai_min_interval_seconds", DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS), DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS),
)
gm["ai_dedupe_window_seconds"] = max(
0,
safe_int(gm.get("ai_dedupe_window_seconds", DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS), DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS),
)
CONFIG_PATH.write_text(yaml.safe_dump(new_cfg, allow_unicode=True, sort_keys=False), encoding="utf-8")
global config
config = new_cfg
reload_scheduler_jobs()
def reload_scheduler_jobs() -> None:
if scheduler_ref:
for job in list(scheduler_ref.get_jobs()):
if job.id.startswith("monitor:"):
scheduler_ref.remove_job(job.id)
schedule_monitors(scheduler_ref)
def parse_lines(text: str) -> list[str]:
return [x.strip() for x in (text or "").splitlines() if x.strip()]
def safe_int(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def safe_float(value: Any, default: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def app_meta_get(key: str) -> str:
with closing(db()) as conn:
row = conn.execute("SELECT meta_value FROM app_meta WHERE meta_key=?", (key,)).fetchone()
return str(row["meta_value"]) if row and row["meta_value"] is not None else ""
def app_meta_set(key: str, value: str) -> None:
with closing(db()) as conn:
conn.execute(
"""
INSERT INTO app_meta(meta_key, meta_value, updated_at)
VALUES(?,?,?)
ON CONFLICT(meta_key) DO UPDATE SET
meta_value=excluded.meta_value,
updated_at=excluded.updated_at
""",
(key, value, now_iso()),
)
conn.commit()
def git_run(repo_dir: Path, args: list[str], check: bool = True) -> subprocess.CompletedProcess[str]:
return subprocess.run(
["git", "-C", str(repo_dir), *args],
capture_output=True,
text=True,
check=check,
)
def current_git_branch(repo_dir: Path) -> str:
try:
return git_run(repo_dir, ["rev-parse", "--abbrev-ref", "HEAD"]).stdout.strip() or "main"
except Exception:
return "main"
def git_update_status(repo_dir: Path, branch: str, fetch_remote: bool = True) -> dict[str, Any]:
if fetch_remote:
git_run(repo_dir, ["fetch", "origin", branch], check=True)
head = git_run(repo_dir, ["rev-parse", "HEAD"]).stdout.strip()
remote_ref = f"origin/{branch}"
remote_head = git_run(repo_dir, ["rev-parse", remote_ref]).stdout.strip()
counts = git_run(repo_dir, ["rev-list", "--left-right", "--count", f"HEAD...{remote_ref}"]).stdout.strip().split()
ahead = safe_int(counts[0] if len(counts) > 0 else 0, 0)
behind = safe_int(counts[1] if len(counts) > 1 else 0, 0)
dirty = bool(git_run(repo_dir, ["status", "--porcelain"], check=True).stdout.strip())
return {
"branch": branch,
"head": head,
"remote_head": remote_head,
"ahead": ahead,
"behind": behind,
"dirty": dirty,
}
def rollback_to_commit(repo_dir: Path, commit: str) -> None:
git_run(repo_dir, ["cat-file", "-e", f"{commit}^{{commit}}"], check=True)
git_run(repo_dir, ["reset", "--hard", commit], check=True)
def monitor_from_form(
original_index: int | None,
name: str,
mtype: str,
url: str,
interval_seconds: int,
keywords: str,
item_selector: str,
title_selector: str,
link_selector: str,
price_selector: str,
stock_selector: str,
keyword_match: bool,
new_item: bool,
price_change: bool,
stock_change: bool,
notify_telegram: bool = True,
) -> dict[str, Any]:
m: dict[str, Any] = {
"name": name.strip(),
"type": mtype,
"url": url.strip(),
"interval_seconds": max(int(interval_seconds or MIN_INTERVAL_SECONDS), MIN_INTERVAL_SECONDS),
"keywords": parse_lines(keywords),
"notify_telegram": notify_telegram,
"notify_on": {
"keyword_match": keyword_match,
"new_item": new_item,
"price_change": price_change,
"stock_change": stock_change,
},
}
if mtype == "rss":
m["forum"] = True
if mtype == "web":
selectors = {
"item": item_selector.strip() or "article, .thread, .post, li",
"title": title_selector.strip() or "h1, h2, h3, a",
"link": link_selector.strip() or "a",
}
if price_selector.strip():
selectors["price"] = price_selector.strip()
if stock_selector.strip():
selectors["stock"] = stock_selector.strip()
m["selectors"] = selectors
if not m["name"] or not m["url"]:
raise ValueError("名称和 URL 必填")
return m
def layout(title: str, body: str) -> str:
return f"""
{html_escape(title)} · tg-watchbot
{html_escape(title)} WatchBot Panel
{body} """
def monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str:
m = m or {"type": "web", "interval_seconds": 60, "notify_on": {"keyword_match": True, "new_item": True, "price_change": True, "stock_change": True}, "selectors": {}}
selectors = m.get("selectors") or {}
no = m.get("notify_on") or {}
keywords = "\n".join(m.get("keywords") or [])
action = "/monitor/save" if idx is not None else "/monitor/create"
hidden = f" " if idx is not None else ""
def checked(k: str) -> str:
return "checked" if no.get(k, False) else ""
return f""""""
def group_monitor_form_html(m: dict[str, Any] | None = None, idx: int | None = None) -> str:
m = m or {
"enabled": True,
"keywords": [],
"exclude_keywords": [],
"notify_telegram": True,
"summary_mode": "template",
"ai_base_url": "",
"ai_api_key": "",
"ai_model": "gpt-4o-mini",
"ai_interface": "responses",
"ai_temperature": 0.2,
"ai_timeout_seconds": 30,
"ai_prompt": "",
"ai_min_interval_seconds": DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS,
"ai_dedupe_window_seconds": DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS,
"listen_source": "bot",
}
action = "/group-monitors/save" if idx is not None else "/group-monitors/create"
hidden = f" " if idx is not None else ""
keywords = "\n".join(m.get("keywords") or [])
exclude_keywords = "\n".join(m.get("exclude_keywords") or [])
return f"""{hidden}
启用监听
推送管理员
总结模式 模板 AI
AI 接口 Responses Chat Completions
监听来源选“用户会话”时,需要在“Bot / 面板设置”填写 TG_API_ID、TG_API_HASH、TG_API_SESSION,并重启。
AI 总结提示词(可选) {html_escape(m.get('ai_prompt',''))}
关键词(一行一个) {html_escape(keywords)}
排除词(一行一个) {html_escape(exclude_keywords)}
"""
def create_panel_app() -> FastAPI:
app = FastAPI(title="tg-watchbot Panel")
@app.middleware("http")
async def require_login_middleware(request: Request, call_next):
public_paths = {"/login", "/health", "/favicon.ico"}
if request.url.path in public_paths or is_logged_in(request):
return await call_next(request)
return RedirectResponse("/login", status_code=303)
@app.get("/login", response_class=HTMLResponse)
async def login_get(request: Request):
if is_logged_in(request):
return RedirectResponse("/", status_code=303)
return HTMLResponse(login_page())
@app.post("/login")
async def login_post(username: str = Form(""), password: str = Form("")):
expected_user = os.getenv("WEB_PANEL_USER", "admin")
expected_pass = os.getenv("WEB_PANEL_PASSWORD", "admin")
if secrets.compare_digest(username, expected_user) and secrets.compare_digest(password, expected_pass):
resp = RedirectResponse("/", status_code=303)
resp.set_cookie("tg_watchbot_session", session_token(expected_user), httponly=True, secure=True, samesite="lax", max_age=60 * 60 * 24 * 14)
return resp
return HTMLResponse(login_page("用户名或密码错误"), status_code=401)
@app.get("/logout")
async def logout() -> RedirectResponse:
resp = RedirectResponse("/login", status_code=303)
resp.delete_cookie("tg_watchbot_session")
return resp
@app.get("/", response_class=HTMLResponse)
async def index(_: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
statuses = list_monitor_runtime_status()
rows = []
for i, m in enumerate(cfg.get("monitors") or []):
tg = "TG" if m.get("notify_telegram", True) else "仅 Web"
name = str(m.get("name", ""))
st = statuses.get(name)
st_badge = get_monitor_status_badge(st)
st_line = "-"
if st:
st_line = (
f"{html_escape(st_badge)} · 推送 {st.get('last_sent_count', 0)} · "
f"{st.get('last_duration_ms', 0)}ms成功: {html_escape(st.get('last_success_at') or '-')} / 失败: {html_escape(st.get('last_error_at') or '-')} "
)
if st.get("last_error"):
st_line += f"{html_escape(str(st.get('last_error'))[:100])} "
rows.append(f"""{html_escape(m.get('type','web'))} {html_escape(name)} {html_escape(m.get('url',''))} {html_escape(m.get('interval_seconds',60))}s{tg} {html_escape(', '.join(m.get('keywords') or []))} {st_line} 编辑 预览 检查 删除 """)
body = f"""类型 目标 间隔/通知 关键词 运行状态 操作 """ + "".join(rows) + "
"
return layout("监控", body)
@app.get("/monitor/new", response_class=HTMLResponse)
async def new_monitor(_: str = Depends(panel_auth)) -> str:
return layout("新增监控", "这里是新增单个监控。要一次加多个网站,用左侧/首页的「批量新增」。
" + monitor_form_html())
@app.get("/group-monitors", response_class=HTMLResponse)
async def group_monitors_page(_: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
discovered = list_discovered_group_chats()
trs = []
for i, gm in enumerate(rows):
if not isinstance(gm, dict):
continue
enabled = "启用" if gm.get("enabled", True) else "关闭"
notify = "推送 TG" if gm.get("notify_telegram", True) else "仅记录"
source = "Bot" if str(gm.get("listen_source") or "bot") == "bot" else "用户会话"
kws = ", ".join([str(x) for x in (gm.get("keywords") or [])]) or "-"
exs = ", ".join([str(x) for x in (gm.get("exclude_keywords") or [])]) or "-"
trs.append(
f"""{i+1} {html_escape(gm.get('name') or gm.get('chat_id') or '-')} {html_escape(gm.get('chat_id') or '-')} {enabled}{notify} · {source} {html_escape(kws)} {html_escape(exs)} 编辑 删除 """
)
discovered_rows = []
for row in discovered:
title = row["title"]
chat_id = row["chat_id"]
username = f"@{row['username']}" if row["username"] else "-"
create_link = f"/group-monitors/new?chat_id={chat_id}&name={quote_plus(title)}"
discovered_rows.append(
f"""{html_escape(title)} {html_escape(username)} {chat_id}{html_escape(row['last_seen_at'])} 用此群创建监听 """
)
use_user_session = any(
isinstance(gm, dict) and str(gm.get("listen_source") or "bot") == "user_session"
for gm in rows
)
user_session_notice = ""
if use_user_session:
if TelegramClient is None:
user_session_notice = "检测到“用户会话”监听,但未安装 telethon;该来源不会生效。
"
elif not user_session_ready():
user_session_notice = "检测到“用户会话”监听,但 TG_API_ID / TG_API_HASH / TG_API_SESSION 未完整填写;该来源不会生效。
"
body = (
""
+ user_session_notice
+
"
# 监听 状态 关键词 排除词 操作 "
+ "".join(trs) + "
"
+ "已发现群聊 Bot 在群里收到消息后会自动记录群信息。可直接选择群聊创建监听。
"
+ "
群聊 chat_id 最近活跃 操作 "
+ ("".join(discovered_rows) if discovered_rows else "暂无已发现群聊。先把 Bot 拉进群并发送一条消息。 ")
+ "
"
)
return layout("TG 群监听", body)
@app.get("/group-monitors/new", response_class=HTMLResponse)
async def group_monitor_new(
_: str = Depends(panel_auth),
chat_id: str = "",
name: str = "",
) -> str:
preset = None
if chat_id.strip() or name.strip():
preset = {
"enabled": True,
"chat_id": chat_id.strip(),
"name": name.strip(),
"keywords": [],
"exclude_keywords": [],
"notify_telegram": True,
"summary_mode": "template",
"ai_base_url": "",
"ai_api_key": "",
"ai_model": "gpt-4o-mini",
"ai_interface": "responses",
"ai_temperature": 0.2,
"ai_timeout_seconds": 30,
"ai_prompt": "",
"ai_min_interval_seconds": DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS,
"ai_dedupe_window_seconds": DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS,
}
return layout("新增 TG 群监听", group_monitor_form_html(preset))
@app.get("/group-monitors/{idx}/edit", response_class=HTMLResponse)
async def group_monitor_edit(idx: int, _: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
if idx < 0 or idx >= len(rows) or not isinstance(rows[idx], dict):
raise HTTPException(404, "group monitor not found")
return layout("编辑 TG 群监听", group_monitor_form_html(rows[idx], idx))
async def save_group_monitor_common(
original_index: int | None,
name: str,
chat_id: str,
keywords: str,
exclude_keywords: str,
enabled: str | None,
notify_telegram: str | None,
listen_source: str,
summary_mode: str,
ai_base_url: str,
ai_api_key: str,
ai_model: str,
ai_interface: str,
ai_temperature: str,
ai_timeout_seconds: str,
ai_prompt: str,
ai_min_interval_seconds: str,
ai_dedupe_window_seconds: str,
) -> Response:
cfg = cfg_load_fresh()
rows = cfg.setdefault("group_monitors", [])
if not isinstance(rows, list):
rows = []
cfg["group_monitors"] = rows
try:
parsed_summary_mode = (summary_mode or "template").strip().lower() or "template"
if parsed_summary_mode not in {"template", "ai"}:
parsed_summary_mode = "template"
parsed_listen_source = (listen_source or "bot").strip().lower() or "bot"
if parsed_listen_source not in {"bot", "user_session"}:
parsed_listen_source = "bot"
parsed_ai_interface = (ai_interface or "responses").strip().lower() or "responses"
if parsed_ai_interface not in {"responses", "chat"}:
parsed_ai_interface = "responses"
item = {
"name": name.strip() or chat_id.strip(),
"enabled": bool(enabled),
"chat_id": int(chat_id.strip()),
"keywords": parse_lines(keywords),
"exclude_keywords": parse_lines(exclude_keywords),
"notify_telegram": bool(notify_telegram),
"listen_source": parsed_listen_source,
"summary_mode": parsed_summary_mode,
"ai_base_url": ai_base_url.strip(),
"ai_api_key": ai_api_key.strip(),
"ai_model": ai_model.strip() or "gpt-4o-mini",
"ai_interface": parsed_ai_interface,
"ai_temperature": safe_float(ai_temperature, 0.2),
"ai_timeout_seconds": max(1, safe_int(ai_timeout_seconds, 30)),
"ai_prompt": ai_prompt.strip(),
"ai_min_interval_seconds": max(0, safe_int(ai_min_interval_seconds, DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)),
"ai_dedupe_window_seconds": max(0, safe_int(ai_dedupe_window_seconds, DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)),
}
except Exception as e:
return HTMLResponse(layout("保存失败", f"返回
"), status_code=400)
if original_index is None:
rows.append(item)
else:
if original_index < 0 or original_index >= len(rows):
raise HTTPException(404, "group monitor not found")
rows[original_index] = item
cfg_save(cfg)
return RedirectResponse("/group-monitors", status_code=303)
@app.post("/group-monitors/create")
async def group_monitor_create(
_: str = Depends(panel_auth),
name: str = Form(""),
chat_id: str = Form(""),
keywords: str = Form(""),
exclude_keywords: str = Form(""),
enabled: str | None = Form(None),
notify_telegram: str | None = Form(None),
summary_mode: str = Form("template"),
listen_source: str = Form("bot"),
ai_base_url: str = Form(""),
ai_api_key: str = Form(""),
ai_model: str = Form("gpt-4o-mini"),
ai_interface: str = Form("responses"),
ai_temperature: str = Form("0.2"),
ai_timeout_seconds: str = Form("30"),
ai_prompt: str = Form(""),
ai_min_interval_seconds: str = Form(str(DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)),
ai_dedupe_window_seconds: str = Form(str(DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)),
) -> Response:
return await save_group_monitor_common(
None,
name,
chat_id,
keywords,
exclude_keywords,
enabled,
notify_telegram,
listen_source,
summary_mode,
ai_base_url,
ai_api_key,
ai_model,
ai_interface,
ai_temperature,
ai_timeout_seconds,
ai_prompt,
ai_min_interval_seconds,
ai_dedupe_window_seconds,
)
@app.post("/group-monitors/save")
async def group_monitor_save(
_: str = Depends(panel_auth),
original_index: int = Form(...),
name: str = Form(""),
chat_id: str = Form(""),
keywords: str = Form(""),
exclude_keywords: str = Form(""),
enabled: str | None = Form(None),
notify_telegram: str | None = Form(None),
summary_mode: str = Form("template"),
listen_source: str = Form("bot"),
ai_base_url: str = Form(""),
ai_api_key: str = Form(""),
ai_model: str = Form("gpt-4o-mini"),
ai_interface: str = Form("responses"),
ai_temperature: str = Form("0.2"),
ai_timeout_seconds: str = Form("30"),
ai_prompt: str = Form(""),
ai_min_interval_seconds: str = Form(str(DEFAULT_GROUP_AI_MIN_INTERVAL_SECONDS)),
ai_dedupe_window_seconds: str = Form(str(DEFAULT_GROUP_AI_DEDUPE_WINDOW_SECONDS)),
) -> Response:
return await save_group_monitor_common(
original_index,
name,
chat_id,
keywords,
exclude_keywords,
enabled,
notify_telegram,
listen_source,
summary_mode,
ai_base_url,
ai_api_key,
ai_model,
ai_interface,
ai_temperature,
ai_timeout_seconds,
ai_prompt,
ai_min_interval_seconds,
ai_dedupe_window_seconds,
)
@app.get("/group-monitors/{idx}/delete")
async def group_monitor_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
cfg = cfg_load_fresh()
rows = cfg.get("group_monitors") or []
if 0 <= idx < len(rows):
rows.pop(idx)
cfg_save(cfg)
return RedirectResponse("/group-monitors", status_code=303)
@app.get("/monitor/templates", response_class=HTMLResponse)
async def monitor_templates(_: str = Depends(panel_auth)) -> str:
body = """论坛监控模板 NodeSeek / Linux.do 建议用 RSS,不抓网页 HTML,抗 Cloudflare 更稳。
"""
return layout("监控模板", body)
@app.get("/monitor/template/{kind}", response_class=HTMLResponse)
async def monitor_template(kind: str, _: str = Depends(panel_auth)) -> str:
templates = {
"nodeseek": {"name": "NodeSeek 新帖", "type": "rss", "url": "https://rss.nodeseek.com/", "interval_seconds": 60, "keywords": ["NAT", "优惠", "补货", "VPS", "免费"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}},
"linuxdo": {"name": "Linux.do 最新", "type": "rss", "url": "https://linux.do/latest.rss", "interval_seconds": 60, "keywords": ["Claude", "Codex", "API", "VPS", "NAT"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}},
"linuxdo-resource": {"name": "Linux.do 资源荟萃", "type": "rss", "url": "https://linux.do/c/resource/14.rss", "interval_seconds": 60, "keywords": ["免费", "开源", "API", "Claude"], "forum": True, "notify_on": {"keyword_match": True, "new_item": True, "price_change": False, "stock_change": False}},
}
m = templates.get(kind)
if not m:
raise HTTPException(404, "template not found")
return layout("使用模板新增", "这是预设模板,保存即可加入监控;也可以先调整关键词。
" + monitor_form_html(m))
@app.get("/monitor/bulk", response_class=HTMLResponse)
async def bulk_monitor(_: str = Depends(panel_auth)) -> str:
sample = """NodeSeek|https://www.nodeseek.com/|免费鸡,优惠码,NAT
Linux.do|https://linux.do|公益,codex,claude
HostLoc|https://hostloc.com|VPS,补货,优惠"""
body = f""""""
return layout("批量新增", body)
@app.post("/monitor/bulk")
async def bulk_monitor_save(_: str = Depends(panel_auth), items: str = Form(""), mtype: str = Form("web"), interval_seconds: int = Form(300), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form("on")):
cfg = cfg_load_fresh()
monitors = cfg.setdefault("monitors", [])
added = 0
errors = []
for line_no, raw in enumerate(items.splitlines(), 1):
line = raw.strip()
if not line or line.startswith('#'):
continue
parts = [x.strip() for x in line.split('|')]
if len(parts) < 2:
errors.append(f"第 {line_no} 行格式错误:{html_escape(line)}")
continue
name, url = parts[0], parts[1]
keywords = parts[2] if len(parts) >= 3 else ""
try:
monitors.append(monitor_from_form(None, name, mtype, url, interval_seconds, keywords.replace(',', '\n'), "article, .thread, .post, li", "h1, h2, h3, a", "a", "", "", bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram)))
added += 1
except Exception as e:
errors.append(f"第 {line_no} 行失败:{html_escape(e)}")
try:
cfg_save(cfg)
except Exception as e:
logger.exception("bulk save failed")
return HTMLResponse(layout("批量新增失败", f""), status_code=500)
if errors:
return HTMLResponse(layout("批量新增完成", f"已新增 {added} 个,部分行有问题:
返回
"))
return RedirectResponse("/", status_code=303)
@app.get("/monitor/{idx}/edit", response_class=HTMLResponse)
async def edit_monitor(idx: int, _: str = Depends(panel_auth)) -> str:
monitors = cfg_load_fresh().get("monitors") or []
if idx < 0 or idx >= len(monitors):
raise HTTPException(404, "monitor not found")
return layout("编辑监控", "编辑监控 " + monitor_form_html(monitors[idx], idx))
async def save_form_common(
original_index: int | None,
name: str,
mtype: str,
url: str,
interval_seconds: int,
keywords: str,
item_selector: str,
title_selector: str,
link_selector: str,
price_selector: str,
stock_selector: str,
keyword_match: str | None,
new_item: str | None,
price_change: str | None,
stock_change: str | None,
notify_telegram: str | None,
) -> RedirectResponse:
cfg = cfg_load_fresh()
monitors = cfg.setdefault("monitors", [])
m = monitor_from_form(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, bool(keyword_match), bool(new_item), bool(price_change), bool(stock_change), bool(notify_telegram))
if original_index is None:
monitors.append(m)
else:
monitors[original_index] = m
try:
cfg_save(cfg)
except Exception as e:
logger.exception("save monitor failed")
return HTMLResponse(layout("保存失败", f"返回
"), status_code=500)
return RedirectResponse("/", status_code=303)
@app.post("/monitor/create")
async def create_monitor(_: str = Depends(panel_auth), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(None, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram)
@app.post("/monitor/save")
async def save_monitor(_: str = Depends(panel_auth), original_index: int = Form(...), name: str = Form(...), mtype: str = Form(...), url: str = Form(...), interval_seconds: int = Form(300), keywords: str = Form(""), item_selector: str = Form(""), title_selector: str = Form(""), link_selector: str = Form(""), price_selector: str = Form(""), stock_selector: str = Form(""), keyword_match: str | None = Form(None), new_item: str | None = Form(None), price_change: str | None = Form(None), stock_change: str | None = Form(None), notify_telegram: str | None = Form(None)) -> RedirectResponse:
return await save_form_common(original_index, name, mtype, url, interval_seconds, keywords, item_selector, title_selector, link_selector, price_selector, stock_selector, keyword_match, new_item, price_change, stock_change, notify_telegram)
@app.get("/monitor/{idx}/delete")
async def delete_monitor(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or []
if 0 <= idx < len(monitors):
monitors.pop(idx); cfg_save(cfg)
return RedirectResponse("/", status_code=303)
@app.get("/monitor/{idx}/preview", response_class=HTMLResponse)
async def monitor_preview(idx: int, _: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or []
if idx < 0 or idx >= len(monitors):
raise HTTPException(404, "monitor not found")
m = monitors[idx]
timeout = int((cfg.get("http") or {}).get("timeout_seconds", 20))
ua = (cfg.get("http") or {}).get("user_agent") or DEFAULT_UA
headers = {"User-Agent": ua}
try:
async with httpx.AsyncClient(timeout=timeout, headers=headers) as client:
body = await fetch_url(client, m.get("url"))
items = parse_rss_items(m, body) if m.get("type") == "rss" else parse_web_items(m, body)
rows=[]
for it in items[:15]:
blocked, br = item_blocked(it, m)
hits = keyword_hits(f"{it.title} {it.text}", m.get("keywords") or [])
rows.append(f"""{html_escape(it.title)}{html_escape(it.link)} {html_escape(it.author or '-')} {html_escape(it.category or '-')} {html_escape(', '.join(hits) or '-')} {'跳过: '+html_escape(br) if blocked else '可推送/记录'} """)
body_html = "抓取预览 只预览最近 15 条,不写入去重状态、不推送。
标题/链接 作者 分类 命中 状态 " + "".join(rows) + "
"
return layout("抓取预览", body_html)
except Exception as e:
return HTMLResponse(layout("抓取预览失败", f""), status_code=500)
@app.get("/monitor/{idx}/run", response_class=HTMLResponse)
async def run_monitor_now(idx: int, _: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh(); monitors = cfg.get("monitors") or []
if idx < 0 or idx >= len(monitors):
raise HTTPException(404, "monitor not found")
count = await run_monitor(monitors[idx])
return layout("检查完成", f"已手动检查:{html_escape(monitors[idx].get('name'))},推送 {count} 条。
返回
")
@app.get("/run-once", response_class=HTMLResponse)
async def run_once_page(_: str = Depends(panel_auth)) -> str:
await run_all_monitors_once()
return layout("手动检查完成", "已执行全部监控检查。具体结果看日志/Telegram 推送。
返回
")
@app.get("/yaml", response_class=HTMLResponse)
async def yaml_edit(_: str = Depends(panel_auth)) -> str:
text = CONFIG_PATH.read_text(encoding="utf-8")
return layout("YAML 高级编辑", f"YAML 高级编辑 {html_escape(text)} 保存 YAML
")
@app.post("/yaml", response_class=HTMLResponse)
async def yaml_save(_: str = Depends(panel_auth), content: str = Form(...)) -> str:
try:
data = yaml.safe_load(content) or {}
cfg_save(data)
return layout("已保存", "YAML 已保存并重载。
返回
")
except Exception as e:
return layout("保存失败", f"返回
")
@app.get("/settings", response_class=HTMLResponse)
async def settings(_: str = Depends(panel_auth)) -> str:
v = env_values()
cleanup = (cfg_load_fresh().get("cleanup") or {})
bot_ready = bool(v["TELEGRAM_BOT_TOKEN"].strip() and v["ADMIN_CHAT_ID"].strip())
status = "" if bot_ready else "未填写 Token 或管理员 ID;网页可用,但 Bot 和监控推送不可用。
"
login_row = telegram_login_status_row()
env_session = v.get("TG_API_SESSION", "").strip()
if login_row.get("status") == "authorized":
login_status = "已登录"
login_user = login_row.get("username") or login_row.get("phone") or login_row.get("user_id") or "-"
elif env_session:
login_status = "已配置(手动填入)"
login_user = env_session[:16] + "..."
else:
login_status = "未登录"
login_user = "-"
body = f"""设置向导 {status}
1 Bot 基础配置
先保证 Bot 能给管理员发通知。
2 TG 用户会话登录
用于频道媒体转发。先填 TG_API_ID / TG_API_HASH 并保存,再点二维码登录。
TG_API_SESSION(可选,二维码登录会自动生成保存) {html_escape(v['TG_API_SESSION'])}
TG 代理(可选,国内服务器需要,例如 socks5://127.0.0.1:1080 或 http://127.0.0.1:7890)
登录状态:{html_escape(login_status)} · 账号:{html_escape(login_user)}
二维码登录 登出会话
QR 扫码登录 Telegram
正在生成二维码...
保存设置
改 Token、管理员 ID、端口或 TG_API_ID / TG_API_HASH 后需要保存并重启。
"""
return layout("设置", body)
def save_panel_settings(
values: dict[str, Any],
cleanup_interval_minutes: int,
cleanup_message_delete_after_minutes: int,
cleanup_retention_minutes: int,
) -> None:
write_env_values(values)
cfg = cfg_load_fresh()
cfg["cleanup"] = {
"enabled": True,
"interval_minutes": max(1, int(cleanup_interval_minutes)),
"monitor_message_delete_after_minutes": max(1, int(cleanup_message_delete_after_minutes)),
"monitor_retention_minutes": max(1, int(cleanup_retention_minutes)),
}
cfg_save(cfg)
@app.post("/settings", response_class=HTMLResponse)
async def settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), TG_API_ID: str = Form(""), TG_API_HASH: str = Form(""), TG_API_SESSION: str = Form(""), TG_PROXY: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin"), CLEANUP_INTERVAL_MINUTES: int = Form(60), CLEANUP_MESSAGE_DELETE_AFTER_MINUTES: int = Form(60), CLEANUP_RETENTION_MINUTES: int = Form(1440)) -> str:
save_panel_settings(locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED}, CLEANUP_INTERVAL_MINUTES, CLEANUP_MESSAGE_DELETE_AFTER_MINUTES, CLEANUP_RETENTION_MINUTES)
return layout("已保存", "已保存,不会自动重启;修改 Token/管理员 ID 后请重启。
返回 重启机器人
")
@app.get("/send", response_class=HTMLResponse)
async def send_page(request: Request, _: str = Depends(panel_auth)) -> str:
selected_user_id = request.query_params.get("user_id", "")
with closing(db()) as conn:
users = conn.execute(
"SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 200"
).fetchall()
options = []
for u in users:
blocked = "(已封禁)" if u["blocked"] else ""
username = f"@{u['username']}" if u["username"] else ""
label = f"{u['full_name'] or u['user_id']} {username} · {u['user_id']} {blocked}"
selected = "selected" if str(u["user_id"]) == selected_user_id else ""
options.append(f"{html_escape(label)} ")
body = f""""""
return layout("主动发消息", body)
@app.post("/send", response_class=HTMLResponse)
async def send_save(_: str = Depends(panel_auth), user_id: str = Form(""), manual_user_id: str = Form(""), text: str = Form("")) -> str:
raw_uid = (manual_user_id or user_id or "").strip()
if not raw_uid:
return layout("发送失败", "返回
")
if not text.strip():
return layout("发送失败", "返回
")
try:
uid = int(raw_uid)
if not get_user(uid):
return layout("发送失败", f"返回
")
if is_blocked(uid):
return layout("发送失败", f"用户 {uid} 已被封禁,请先 /unblock。 返回
")
message_id = await send_text_to_user(uid, text.strip(), "web:send")
await admin_send(f"[主动发送成功]\nuser_id: {uid}\nmessage_id: {message_id}\n时间:{html_escape(now_iso())}")
return layout("发送成功", f"已发送给用户 {uid},message_id={message_id}。Bot 也已给管理员发送确认提醒。
继续发送 收件箱
")
except TelegramAPIError as e:
logger.exception("panel send failed")
return layout("发送失败", f"返回
")
except Exception as e:
logger.exception("panel send failed")
return layout("发送失败", f"返回
")
@app.get("/inbox", response_class=HTMLResponse)
async def inbox_page(_: str = Depends(panel_auth)) -> str:
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM inbox_messages ORDER BY id DESC LIMIT 200").fetchall()
trs = []
for r in rows:
direction = r["direction"] if "direction" in r.keys() else "in"
source = r["source"] if "source" in r.keys() else "user"
if direction == "out":
status_txt, status_cls = "已回复", "ok"
elif (r["error"] or "").startswith("spam:"):
status_txt, status_cls = "已拦截", "danger"
else:
status_txt = "已转发" if r["forwarded"] else "未转发"
status_cls = "ok" if r["forwarded"] else "danger"
content = html_escape(r["text"] or "(非文本/媒体消息)")
flow = "用户 -> 管理员" if direction == "in" else "管理员 -> 用户"
actions = f"回复 "
if direction == "in" and not r["forwarded"]:
actions += f" 重试转发 "
trs.append(f"""#{r['id']}{status_txt} {html_escape(r['full_name'])} {r['user_id']} @{html_escape(r['username'] or '')} {html_escape(flow)}{html_escape(source)} · {html_escape(r['created_at'])} {content}{html_escape(r['error'] or '')} {actions} """)
body = "收件箱 这里显示双向机器人对话记录:用户发来的消息、Web 回复、TG 管理员回复都会记录。转发失败的入站消息可重试。
ID/状态 用户 方向/来源 内容/错误 操作 " + "".join(trs) + "
"
return layout("收件箱", body)
@app.get("/inbox/{msg_id}/retry")
async def retry_inbox(msg_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
with closing(db()) as conn:
conn.execute("UPDATE inbox_messages SET forwarded=0, error=NULL WHERE id=?", (msg_id,))
conn.commit()
await flush_pending_inbox()
return RedirectResponse("/inbox", status_code=303)
@app.get("/inbox/{msg_id}/reply", response_class=HTMLResponse)
async def inbox_reply_page(msg_id: int, _: str = Depends(panel_auth)) -> str:
row = get_inbox_message(msg_id)
if not row:
raise HTTPException(404, "message not found")
options = "".join(
f"{html_escape(r.get('title',''))} "
for r in list_quick_replies()
)
body = f"""回复用户 #{row['id']} · {html_escape(row['full_name'])} · {row['user_id']}
{html_escape(row['text'] or '(非文本/媒体消息)')}
快捷模板 选择模板 {options}
回复内容
"""
return layout("回复用户", body)
@app.post("/inbox/{msg_id}/reply", response_class=HTMLResponse)
async def inbox_reply_save(msg_id: int, _: str = Depends(panel_auth), text: str = Form("")) -> str:
row = get_inbox_message(msg_id)
if not row:
raise HTTPException(404, "message not found")
try:
message_id = await send_text_to_user(int(row["user_id"]), text, "web:inbox")
await admin_send(f"[Web 回复成功]\nuser_id: {row['user_id']}\nmessage_id: {message_id}")
return layout("回复成功", f"已回复用户 {row['user_id']}。
返回收件箱
")
except Exception as e:
return layout("回复失败", f"返回
")
@app.get("/users", response_class=HTMLResponse)
async def users_page(_: str = Depends(panel_auth)) -> str:
v = env_values()
with closing(db()) as conn:
rows = conn.execute("SELECT user_id, username, full_name, blocked, note, updated_at FROM users ORDER BY updated_at DESC LIMIT 300").fetchall()
trs = []
for u in rows:
status_txt = "封禁" if u["blocked"] else "正常"
action = "unblock" if u["blocked"] else "block"
action_txt = "解封" if u["blocked"] else "封禁"
trs.append(f"""{html_escape(u['full_name'] or u['user_id'])} {u['user_id']} @{html_escape(u['username'] or '')} {status_txt} {html_escape(u['updated_at'])} {html_escape(u['note'] or '')} 备注 """)
settings_card = f""""""
body = settings_card + "用户管理 用户 状态 备注 操作 " + "".join(trs) + "
"
return layout("用户管理", body)
@app.post("/users/settings", response_class=HTMLResponse)
async def users_settings_save(_: str = Depends(panel_auth), TELEGRAM_BOT_TOKEN: str = Form(""), ADMIN_CHAT_ID: str = Form(""), TG_API_ID: str = Form(""), TG_API_HASH: str = Form(""), TG_API_SESSION: str = Form(""), TG_PROXY: str = Form(""), LOG_LEVEL: str = Form("INFO"), WEB_PANEL_ENABLED: str = Form("true"), WEB_PANEL_HOST: str = Form("127.0.0.1"), WEB_PANEL_PORT: str = Form("8765"), WEB_PANEL_USER: str = Form("admin"), WEB_PANEL_PASSWORD: str = Form("admin")) -> str:
cleanup = (cfg_load_fresh().get("cleanup") or {})
save_panel_settings(
locals() | {"WEB_PANEL_ENABLED": WEB_PANEL_ENABLED},
int(cleanup.get("interval_minutes", 60)),
int(cleanup.get("monitor_message_delete_after_minutes", 60)),
int(cleanup.get("monitor_retention_minutes", 1440)),
)
return layout("已保存", "已保存,不会自动重启;修改 Token、管理员 ID、端口、账号或密码后请重启。
返回用户管理 重启机器人
")
@app.post("/api/tg-login/qr")
async def api_tg_login_qr(_: str = Depends(panel_auth)) -> dict[str, Any]:
result = await telegram_login_prepare_qr(proxy=os.getenv("TG_PROXY", "").strip())
if not result.get("ok"):
return {"ok": False, "error": result.get("error", "failed")}
login_id = secrets.token_urlsafe(8)
telegram_qr_logins[login_id] = {
"client": result["client"],
"login": result["login"],
"created_at": time.time(),
"status": "pending",
}
async def waiter():
try:
data = await telegram_login_complete(result["client"], result["login"])
telegram_qr_logins[login_id]["status"] = "authorized"
telegram_qr_logins[login_id]["result"] = data
except Exception as e:
telegram_qr_logins[login_id]["status"] = "error"
telegram_qr_logins[login_id]["error"] = str(e)
asyncio.create_task(waiter())
return {"ok": True, "login_id": login_id, "qr_png": result["qr_png"]}
@app.get("/api/tg-login/status")
async def api_tg_login_status(_: str = Depends(panel_auth), login_id: str = "") -> dict[str, Any]:
item = telegram_qr_logins.get(login_id)
if not item:
return {"ok": False, "status": "missing"}
status = item.get("status", "pending")
if time.time() - float(item.get("created_at", 0)) > 120:
item["status"] = "expired"
status = "expired"
result = item.get("result", {})
return {
"ok": True,
"status": status,
"error": item.get("error", ""),
"username": result.get("username", ""),
"phone": result.get("phone", ""),
"user_id": result.get("user_id", ""),
}
@app.post("/api/tg-login/logout")
async def api_tg_login_logout(_: str = Depends(panel_auth)) -> dict[str, Any]:
clear_telegram_login_session()
return {"ok": True}
@app.post("/users/{user_id}/note")
async def user_note_save(user_id: int, _: str = Depends(panel_auth), note: str = Form("")) -> RedirectResponse:
set_note(user_id, note.strip())
return RedirectResponse("/users", status_code=303)
@app.get("/users/{user_id}/block")
async def user_block(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
set_block(user_id, True)
return RedirectResponse("/users", status_code=303)
@app.get("/users/{user_id}/unblock")
async def user_unblock(user_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
set_block(user_id, False)
return RedirectResponse("/users", status_code=303)
@app.get("/rules", response_class=HTMLResponse)
async def rules_page(_: str = Depends(panel_auth)) -> str:
cfg = cfg_load_fresh()
spam = (cfg.get("bot") or {}).get("spam_filter") or {}
keywords = "\n".join(spam.get("keywords") or [])
body = f""""""
return layout("拦截规则", body)
@app.post("/rules")
async def rules_save(_: str = Depends(panel_auth), enabled: str | None = Form(None), auto_block: str | None = Form(None), keywords: str = Form("")) -> RedirectResponse:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
bot_cfg["spam_filter"] = {
"enabled": bool(enabled),
"auto_block": bool(auto_block),
"keywords": parse_lines(keywords),
}
cfg_save(cfg)
return RedirectResponse("/rules", status_code=303)
@app.get("/replies", response_class=HTMLResponse)
async def replies_page(_: str = Depends(panel_auth)) -> str:
replies = list_quick_replies()
rows = []
for i, r in enumerate(replies):
rows.append(f"""{i + 1} {html_escape(r.get('title',''))} {html_escape(r.get('text',''))} 删除 """)
body = """快捷回复 # 标题 内容 操作 """ + "".join(rows) + """
"""
return layout("快捷回复", body)
@app.post("/replies")
async def replies_save(_: str = Depends(panel_auth), title: str = Form(""), text: str = Form("")) -> RedirectResponse:
cfg = cfg_load_fresh()
bot_cfg = cfg.setdefault("bot", {})
replies = bot_cfg.setdefault("quick_replies", [])
replies.append({"title": title.strip(), "text": text.strip()})
cfg_save(cfg)
return RedirectResponse("/replies", status_code=303)
@app.get("/replies/{idx}/delete")
async def replies_delete(idx: int, _: str = Depends(panel_auth)) -> RedirectResponse:
cfg = cfg_load_fresh()
replies = (cfg.get("bot") or {}).get("quick_replies") or []
if 0 <= idx < len(replies):
replies.pop(idx)
cfg_save(cfg)
return RedirectResponse("/replies", status_code=303)
@app.get("/monitor/events", response_class=HTMLResponse)
async def monitor_events(_: str = Depends(panel_auth)) -> str:
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM monitor_events ORDER BY id DESC LIMIT 300").fetchall()
trs = []
for r in rows:
status_txt = "已推 TG" if r["pushed"] else "仅 Web"
trs.append(f"""#{r['id']}{status_txt} {html_escape(r['monitor_name'])} {html_escape(r['created_at'])} {html_escape(r['title'])}{html_escape(r['link'])} {html_escape(r['reasons'])} """)
body = "监控推送历史 ID/状态 监控 条目 原因 " + "".join(trs) + "
"
return layout("推送历史", body)
@app.get("/config/export", response_class=HTMLResponse)
async def config_export(_: str = Depends(panel_auth)) -> str:
content = CONFIG_PATH.read_text(encoding="utf-8") if CONFIG_PATH.exists() else ""
body = f"""导出 / 导入配置 只处理 config.yaml,不包含 Token、密码和 Session Secret。
{html_escape(content)} """
return layout("导出配置", body)
@app.post("/config/import", response_class=HTMLResponse)
async def config_import(_: str = Depends(panel_auth), content: str = Form("")) -> str:
try:
data = yaml.safe_load(content) or {}
cfg_save(data)
return layout("导入完成", "配置已导入并重载。
返回
")
except Exception as e:
return layout("导入失败", f"返回
")
@app.get("/restart", response_class=HTMLResponse)
async def restart_page(_: str = Depends(panel_auth)) -> str:
body = """重启机器人 用于修改 Token、管理员 ID、面板设置等需要重启生效的配置。
确认重启 tg-watchbot """
return layout("重启机器人", body)
@app.get("/update", response_class=HTMLResponse)
async def update_page(_: str = Depends(panel_auth)) -> str:
repo_dir = BASE_DIR
branch = current_git_branch(repo_dir)
status_html = ""
try:
st = git_update_status(repo_dir, branch, fetch_remote=True)
rollback = app_meta_get("last_update_rollback")
status_html = (
"更新状态 "
f"
分支:{html_escape(st['branch'])}
"
f"
本地:{html_escape(st['head'][:12])} "
f"远端:{html_escape(st['remote_head'][:12])} "
f"ahead: {st['ahead']} / behind: {st['behind']} "
f"工作区:{'有未提交改动' if st['dirty'] else '干净'}
"
f"
上次回滚点:{html_escape((rollback or '-')[:12])}
"
"
"
)
except Exception as e:
status_html = f"更新状态 {html_escape(str(e))} "
actions = (
"更新操作 只允许快进更新(ff-only)。若工作区有本地改动,将拒绝更新。
"
"
"
"
更新并重启 "
"回滚上次更新 "
" "
)
return layout("更新代码", status_html + actions)
@app.post("/update")
async def update_post(_: str = Depends(panel_auth)) -> HTMLResponse:
repo_dir = BASE_DIR
branch = current_git_branch(repo_dir)
try:
st = git_update_status(repo_dir, branch, fetch_remote=True)
if st["dirty"]:
return HTMLResponse(
layout(
"更新被拒绝",
"检测到本地未提交改动,已拒绝自动更新。请先提交或清理本地改动再试。
返回
",
),
status_code=400,
)
if int(st["behind"]) <= 0:
return HTMLResponse(layout("无需更新", "当前已是最新版本,无需重启。
返回
"))
old_head = st["head"]
pull = git_run(repo_dir, ["pull", "--ff-only", "origin", branch], check=True)
new_head = git_run(repo_dir, ["rev-parse", "HEAD"], check=True).stdout.strip()
if new_head != old_head:
app_meta_set("last_update_rollback", old_head)
logger.info("update applied branch=%s old=%s new=%s out=%s", branch, old_head, new_head, pull.stdout.strip())
except subprocess.CalledProcessError as e:
logger.exception("update failed")
return HTMLResponse(
layout(
"更新失败",
f"{html_escape((e.stderr or e.stdout or str(e))[:4000])} 返回
",
),
status_code=500,
)
except Exception as e:
logger.exception("update failed")
return HTMLResponse(layout("更新失败", f"返回
"), status_code=500)
async def delayed_restart():
await asyncio.sleep(1.0)
os._exit(1)
asyncio.create_task(delayed_restart())
return HTMLResponse(layout("更新完成", "已拉取最新代码,正在重启。
返回首页
"))
@app.post("/update/rollback")
async def update_rollback_post(_: str = Depends(panel_auth)) -> HTMLResponse:
repo_dir = BASE_DIR
rollback = app_meta_get("last_update_rollback")
if not rollback:
return HTMLResponse(layout("回滚失败", "返回
"), status_code=400)
try:
if git_run(repo_dir, ["status", "--porcelain"], check=True).stdout.strip():
return HTMLResponse(
layout("回滚被拒绝", "检测到本地未提交改动,已拒绝回滚。请先处理本地改动再试。
返回
"),
status_code=400,
)
rollback_to_commit(repo_dir, rollback)
logger.info("rollback applied commit=%s", rollback)
except Exception as e:
logger.exception("rollback failed")
return HTMLResponse(layout("回滚失败", f"返回
"), status_code=500)
async def delayed_restart():
await asyncio.sleep(1.0)
os._exit(1)
asyncio.create_task(delayed_restart())
return HTMLResponse(layout("回滚完成", "已回滚并准备重启。
返回首页
"))
@app.post("/restart")
async def restart_post(_: str = Depends(panel_auth)) -> HTMLResponse:
async def delayed_restart():
await asyncio.sleep(1.0)
# Exit with failure so systemd Restart=on-failure brings the service back up.
os._exit(1)
asyncio.create_task(delayed_restart())
return HTMLResponse(layout("正在重启", "已发送重启命令,约 5-10 秒后刷新页面。
返回首页
"))
@app.get("/logs", response_class=HTMLResponse)
async def logs(_: str = Depends(panel_auth)) -> str:
text = LOG_PATH.read_text(encoding="utf-8", errors="replace")[-20000:] if LOG_PATH.exists() else "暂无日志"
return layout("日志", f"最近应用日志 {html_escape(text)} ")
@app.get("/health", response_class=PlainTextResponse)
async def health() -> str:
return "ok"
# ---- Channel Media Monitoring Routes ----
def channel_media_page_html() -> str:
monitors = channel_media_monitors_all()
session_ok = user_session_ready()
notice = ""
if TelegramClient is None:
notice = "未安装 telethon,频道媒体功能不可用。
"
elif not session_ok:
notice = "TG_API_ID / TG_API_HASH / TG_API_SESSION 未完整填写,请先在「设置」中配置。
"
cards_html = ""
for m in monitors:
status = m.get("status", "active")
if status == "active":
badge_html = '运行中 '
elif status == "paused":
badge_html = '已暂停 '
else:
badge_html = '已停止 '
username = f"@{m.get('channel_username', '')}" if m.get("channel_username") else ""
pause_btn = ""
if status == "active":
pause_btn = f"暂停 "
elif status == "paused":
pause_btn = f"恢复 "
proxy_info = " · 代理: " + html_escape(m.get("proxy", "")) if m.get("proxy") else ""
cards_html += f"""
{badge_html} {html_escape(m.get('channel_title',''))} {html_escape(username)}
chat_id: {m.get('channel_id','')} · 转发: {'开启' if m.get('forward_mode') else '关闭'}{proxy_info}
"""
return f"""
添加频道/群组监控
搜索并选择你已加入的频道或群组。
已选择:
添加监控
取消
"""
@app.get("/channel-media", response_class=HTMLResponse)
async def channel_media_page(_: str = Depends(panel_auth)) -> str:
return layout("频道媒体", channel_media_page_html())
@app.get("/api/groups")
async def api_groups(_: str = Depends(panel_auth), q: str = "", limit: int = 500) -> dict[str, Any]:
if not user_session_ready():
return {"groups": [], "error": "TG user session not configured"}
if q.strip():
groups = await telethon_search_dialogs(q.strip(), limit=min(limit, 200))
else:
groups = await telethon_list_dialogs(limit=min(limit, 500))
groups = [g for g in groups if g.get("type") in ("group", "channel")]
return {"groups": groups}
@app.post("/api/monitors/create")
async def api_monitor_create(
_: str = Depends(panel_auth),
channel_id: str = Form(...),
channel_title: str = Form(""),
channel_username: str = Form(""),
media_types: str = Form("video,document"),
keywords: str = Form(""),
max_file_size_mb: int = Form(2000),
download_dir: str = Form(""),
notify_telegram: str | None = Form(None),
proxy: str = Form(""),
date_from: str = Form(""),
date_to: str = Form(""),
max_concurrent: int = Form(3),
forward_mode: str | None = Form(None),
forward_to: str = Form("admin"),
) -> RedirectResponse:
try:
cid = int(channel_id.strip())
except (ValueError, TypeError):
raise HTTPException(400, "invalid channel_id")
channel_media_monitor_create(
cid,
channel_title.strip() or str(cid),
channel_username.strip(),
media_types.strip() or "video,document",
keywords.strip(),
max(1, max_file_size_mb),
download_dir.strip(),
bool(notify_telegram),
proxy.strip(),
date_from.strip(),
date_to.strip(),
max(1, min(10, max_concurrent)),
bool(forward_mode),
forward_to.strip() or "admin",
)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/pause")
async def channel_media_pause(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="paused")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/resume")
async def channel_media_resume(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="active")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/delete")
async def channel_media_delete_route(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_delete(monitor_id)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/check", response_class=HTMLResponse)
async def channel_media_check(monitor_id: int, _: str = Depends(panel_auth)) -> str:
count = await telethon_download_from_channel(monitor_id)
monitor = channel_media_monitor_get(monitor_id)
name = monitor.get("channel_title", "") if monitor else ""
return layout("检查完成", f"已检查频道 {html_escape(name)},新增 {count} 个文件。
返回
")
@app.get("/channel-media/{monitor_id}/download", response_class=HTMLResponse)
async def channel_media_download_history(monitor_id: int, _: str = Depends(panel_auth)) -> str:
monitor = channel_media_monitor_get(monitor_id)
if not monitor:
raise HTTPException(404)
downloads = channel_media_downloads_list(monitor_id, limit=200)
rows = ""
for d in downloads:
size_mb = f"{(d.get('file_size', 0) or 0) / 1024 / 1024:.1f} MB"
rows += f"{d.get('id')} {html_escape(d.get('media_type',''))} "
rows += f"{html_escape(d.get('file_name',''))}{html_escape(d.get('caption','')[:100])} "
rows += f"{size_mb} {html_escape(d.get('created_at',''))} "
total = monitor.get("total_downloaded", 0)
size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024
body = f"""下载记录 - {html_escape(monitor.get('channel_title',''))}
累计:{total} 个文件,{size_mb} MB
"""
return layout("下载记录", body)
return app
async def start_panel_server() -> uvicorn.Server | None:
if not panel_enabled():
logger.info("web panel disabled")
return None
host = os.getenv("WEB_PANEL_HOST", "127.0.0.1")
port = int(os.getenv("WEB_PANEL_PORT", "8765"))
server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info"))
asyncio.create_task(server.serve())
logger.info("web panel listening on http://%s:%s", host, port)
return server
def validate_env() -> tuple[str, int]:
load_dotenv(ENV_PATH)
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
admin = os.getenv("ADMIN_CHAT_ID", "").strip()
if not token:
raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}")
if not admin:
raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}")
ids = parse_admin_chat_ids(admin)
if not ids:
raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}")
return token, ids[0]
def bot_env_configured() -> bool:
load_dotenv(ENV_PATH, override=True)
return bool(os.getenv("TELEGRAM_BOT_TOKEN", "").strip() and os.getenv("ADMIN_CHAT_ID", "").strip())
async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task
load_dotenv(ENV_PATH, override=True)
config = load_config()
setup_logging(os.getenv("LOG_LEVEL", "INFO"))
init_db()
if panel_only:
await start_panel_server()
logger.info("panel-only mode start")
while True:
await asyncio.sleep(3600)
if run_once:
try:
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
except Exception as e:
logger.warning("run-once without Telegram notification: %s", e)
await run_all_monitors_once()
if bot:
await bot.session.close()
return
await start_panel_server()
if not bot_env_configured():
logger.warning(
"Telegram bot is not configured. Web panel is available, but Telegram polling, monitor notifications, and admin/user messaging will not work until TELEGRAM_BOT_TOKEN and ADMIN_CHAT_ID are saved, then the service is restarted."
)
while True:
await asyncio.sleep(3600)
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
dp.include_router(router)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
scheduler_ref = scheduler
schedule_monitors(scheduler)
scheduler.start()
asyncio.create_task(flush_pending_loop())
asyncio.create_task(cleanup_monitor_loop())
if group_monitors_need_user_session():
if TelegramClient is None:
logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed")
elif not user_session_ready():
logger.warning(
"group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete"
)
else:
user_session_listener_task = asyncio.create_task(run_user_session_group_listener())
await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}")
logger.info("bot polling start")
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent")
parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured")
args = parser.parse_args()
try:
asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only))
except KeyboardInterrupt:
pass
except Exception:
logger.exception("fatal error")
raise
if __name__ == "__main__":
main()