From 5c850ec65d014326f2880df4bca6a7542012db1b Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Thu, 28 May 2026 21:52:11 +0800 Subject: [PATCH] 528 528 --- README.md | 38 ++- app.py | 742 +++++++++++++++++++++++++++++++++++++++++++++++- linuxdo_post.md | 149 ++++++++++ 3 files changed, 925 insertions(+), 4 deletions(-) create mode 100644 linuxdo_post.md diff --git a/README.md b/README.md index 2ad6d01..978b83d 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@

tg-watchbot

-

Telegram 双向客服机器人 + Web/RSS 监控推送 + 可视化管理面板

-

双向对话 · 关键词监控 · 私聊广告拦截 · 多管理员 · 配置导入导出

+

Telegram 双向客服机器人 + Web/RSS 监控推送 + 频道媒体下载 + 可视化管理面板

+

双向对话 · 关键词监控 · 频道媒体下载 · 私聊广告拦截 · 多管理员 · 配置导入导出

AI 一句话安装 · Docker 安装 · @@ -13,11 +13,12 @@

## 简介: -tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人** 和 **Web/RSS 监控推送** 合在一起: +tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人**、**Web/RSS 监控推送** 和 **频道媒体下载** 合在一起: - 普通用户私聊 Bot,消息会转发给管理员; - 管理员可以直接回复、主动发文字/图片、封禁/备注用户; - 后台定时监控 RSS 或网页,命中关键词、新条目、价格/库存变化后推送给管理员; +- 使用 Telethon 用户账号自动下载频道/群组中的视频、文档等媒体文件; - 自带一个 Web 管理面板,可配置监控目标、编辑 YAML、查看收件箱和日志。 项目为单文件应用,适合个人服务器、NAT 小鸡、轻量 VPS 直接用 systemd 跑。 @@ -29,6 +30,18 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 ``` ## 更新日志 +### 2026-05-28 更新 + +- 新增「频道媒体下载」功能:使用 Telethon 用户账号登录 Telegram,可浏览已加入的群组/频道,选择性下载媒体文件。 +- 面板新增「频道媒体」页面:支持搜索群组、添加/暂停/恢复/删除监控、查看下载记录。 +- 支持断点续传:大文件下载中断后自动续传,不重复下载。 +- 支持并发下载控制:可设置同时下载数(默认 3),避免带宽打满。 +- 支持 SOCKS5/HTTP 代理:国内用户可配置代理访问 Telegram。 +- 支持按日期范围过滤:只下载指定时间段内的消息。 +- 支持关键词过滤、媒体类型过滤(视频/文档/图片/音频)、文件大小限制。 +- 下载完成可自动推送 Telegram 通知给管理员。 +- 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 后使用。 + ### 2026-05-22 更新 - TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。 @@ -101,6 +114,20 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 ![示例图片](https://pic.gongyichuren.de/file/1779287170665_17b7c8b4040d6334ea62a108d08db644.png) +### 频道媒体下载 + +- 使用 Telethon 用户账号(非 Bot)登录 Telegram,可访问已加入的所有频道和群组。 +- 面板「频道媒体」页面支持搜索已加入的群组/频道,一键添加监控。 +- 支持暂停/恢复监控(保留配置)、删除监控。 +- 支持实时自动下载新消息中的媒体,也支持手动触发下载历史媒体。 +- 支持断点续传:大文件下载中断后自动续传,不重复下载。 +- 支持并发下载控制:可设置同时下载数(1-10,默认 3)。 +- 支持 SOCKS5/HTTP 代理,适合国内服务器使用。 +- 支持按日期范围过滤:只下载指定时间段内的消息。 +- 支持关键词过滤、媒体类型选择(视频/文档/图片/音频)、文件大小限制。 +- 下载完成可自动推送 Telegram 通知给管理员。 +- 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION`。 + ### Web 管理面板 - 登录页 + HttpOnly session cookie,不使用丑陋的浏览器 Basic Auth。 @@ -490,6 +517,11 @@ monitors: | `/rules` | 私聊广告拦截规则 | | `/replies` | 快捷回复模板 | | `/monitor/events` | 监控推送历史 | +| `/channel-media` | 频道媒体监控 | +| `/channel-media/{id}/pause` | 暂停频道监控 | +| `/channel-media/{id}/resume` | 恢复频道监控 | +| `/channel-media/{id}/check` | 手动下载频道媒体 | +| `/channel-media/{id}/download` | 查看下载记录 | | `/config/export` | 导出 / 导入 `config.yaml` | | `/logs` | 日志 | | `/health` | 健康检查 | diff --git a/app.py b/app.py index 3711131..6a3e577 100644 --- a/app.py +++ b/app.py @@ -27,6 +27,7 @@ from pathlib import Path from types import SimpleNamespace from typing import Any from urllib.parse import quote_plus, urljoin +import os.path as ospath import feedparser import httpx @@ -79,6 +80,7 @@ pending_sendpic: dict[int, dict[str, Any]] = {} scheduler_ref: AsyncIOScheduler | None = None user_session_listener_task: asyncio.Task | None = None user_session_client: Any = None +channel_media_clients: dict[str, Any] = {} GROUP_SUMMARY_MAX_CHARS = 800 @@ -230,11 +232,51 @@ def init_db() -> None: last_seen_at TEXT NOT NULL, active INTEGER DEFAULT 1 ); + CREATE TABLE IF NOT EXISTS channel_media_monitors ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + channel_id INTEGER NOT NULL, + channel_title TEXT NOT NULL, + channel_username TEXT DEFAULT '', + status TEXT DEFAULT 'active', + media_types TEXT DEFAULT 'video,document', + keywords TEXT DEFAULT '', + max_file_size_mb INTEGER DEFAULT 2000, + download_dir TEXT DEFAULT '', + last_message_id INTEGER DEFAULT 0, + total_downloaded INTEGER DEFAULT 0, + total_size_bytes INTEGER DEFAULT 0, + notify_telegram INTEGER DEFAULT 1, + proxy TEXT DEFAULT '', + date_from TEXT DEFAULT '', + date_to TEXT DEFAULT '', + max_concurrent INTEGER DEFAULT 3, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS channel_media_downloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + monitor_id INTEGER NOT NULL, + channel_id INTEGER NOT NULL, + message_id INTEGER NOT NULL, + media_type TEXT NOT NULL, + file_name TEXT DEFAULT '', + file_path TEXT DEFAULT '', + file_size INTEGER DEFAULT 0, + caption TEXT DEFAULT '', + sender_id INTEGER DEFAULT 0, + status TEXT DEFAULT 'completed', + created_at TEXT NOT NULL, + FOREIGN KEY (monitor_id) REFERENCES channel_media_monitors(id) + ); """ ) for sql in [ "ALTER TABLE inbox_messages ADD COLUMN direction TEXT DEFAULT 'in'", "ALTER TABLE inbox_messages ADD COLUMN source TEXT DEFAULT 'user'", + "ALTER TABLE channel_media_monitors ADD COLUMN proxy TEXT DEFAULT ''", + "ALTER TABLE channel_media_monitors ADD COLUMN date_from TEXT DEFAULT ''", + "ALTER TABLE channel_media_monitors ADD COLUMN date_to TEXT DEFAULT ''", + "ALTER TABLE channel_media_monitors ADD COLUMN max_concurrent INTEGER DEFAULT 3", ]: try: conn.execute(sql) @@ -538,6 +580,454 @@ def user_session_ready() -> bool: return True +# ---- Channel Media Download (Telethon user session) ---- + +def get_or_create_channel_media_client(client_type: str = "channel_media", proxy: str = "") -> Any: + if TelegramClient is None or StringSession is None: + return None + if client_type in channel_media_clients: + c = channel_media_clients[client_type] + if c.is_connected(): + return c + api_id, api_hash, session = user_session_config() + if not api_id or not api_hash or not session: + return None + try: + proxy_arg = None + if proxy and proxy.strip(): + p = proxy.strip() + if p.startswith("socks5://") or p.startswith("socks4://"): + import socks + parts = p.replace("socks5://", "").replace("socks4://", "").split(":") + proxy_arg = (socks.SOCKS5 if "socks5" in p else socks.SOCKS4, + parts[0], int(parts[1]) if len(parts) > 1 else 1080) + elif p.startswith("http://") or p.startswith("https://"): + proxy_arg = p + else: + proxy_arg = p + client = TelegramClient(StringSession(session), int(api_id), api_hash, proxy=proxy_arg) + channel_media_clients[client_type] = client + return client + except Exception: + logger.exception("failed to create channel media client") + return None + + +async def disconnect_channel_media_client(client_type: str = "channel_media") -> None: + client = channel_media_clients.pop(client_type, None) + if client: + try: + await client.disconnect() + except Exception: + pass + + +def channel_media_monitors_all() -> list[dict[str, Any]]: + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM channel_media_monitors ORDER BY id DESC").fetchall() + return [dict(r) for r in rows] + + +def channel_media_monitor_get(monitor_id: int) -> dict[str, Any] | None: + with closing(db()) as conn: + row = conn.execute("SELECT * FROM channel_media_monitors WHERE id=?", (monitor_id,)).fetchone() + return dict(row) if row else None + + +def channel_media_monitor_create( + channel_id: int, + channel_title: str, + channel_username: str = "", + media_types: str = "video,document", + keywords: str = "", + max_file_size_mb: int = 2000, + download_dir: str = "", + notify_telegram: bool = True, + proxy: str = "", + date_from: str = "", + date_to: str = "", + max_concurrent: int = 3, +) -> int: + ts = now_iso() + with closing(db()) as conn: + cur = conn.execute( + """INSERT INTO channel_media_monitors + (channel_id, channel_title, channel_username, status, media_types, keywords, + max_file_size_mb, download_dir, notify_telegram, proxy, date_from, date_to, + max_concurrent, created_at, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + (channel_id, channel_title, channel_username, "active", media_types, keywords, + max_file_size_mb, download_dir, 1 if notify_telegram else 0, + proxy, date_from, date_to, max_concurrent, ts, ts), + ) + conn.commit() + return int(cur.lastrowid) + + +def channel_media_monitor_update(monitor_id: int, **kwargs: Any) -> None: + allowed = {"status", "media_types", "keywords", "max_file_size_mb", "download_dir", + "last_message_id", "notify_telegram", "total_downloaded", "total_size_bytes", + "proxy", "date_from", "date_to", "max_concurrent"} + updates = [] + values = [] + for k, v in kwargs.items(): + if k in allowed: + updates.append(f"{k}=?") + values.append(v) + if not updates: + return + updates.append("updated_at=?") + values.append(now_iso()) + values.append(monitor_id) + with closing(db()) as conn: + conn.execute(f"UPDATE channel_media_monitors SET {', '.join(updates)} WHERE id=?", values) + conn.commit() + + +def channel_media_monitor_delete(monitor_id: int) -> None: + with closing(db()) as conn: + conn.execute("DELETE FROM channel_media_downloads WHERE monitor_id=?", (monitor_id,)) + conn.execute("DELETE FROM channel_media_monitors WHERE id=?", (monitor_id,)) + conn.commit() + + +def channel_media_download_record( + monitor_id: int, channel_id: int, message_id: int, + media_type: str, file_name: str, file_path: str, + file_size: int, caption: str, sender_id: int, status: str = "completed", +) -> int: + ts = now_iso() + with closing(db()) as conn: + cur = conn.execute( + """INSERT INTO channel_media_downloads + (monitor_id, channel_id, message_id, media_type, file_name, file_path, + file_size, caption, sender_id, status, created_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?)""", + (monitor_id, channel_id, message_id, media_type, file_name, file_path, + file_size, caption, sender_id, status, ts), + ) + conn.commit() + return int(cur.lastrowid) + + +def channel_media_downloads_list(monitor_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]: + with closing(db()) as conn: + if monitor_id: + rows = conn.execute( + "SELECT * FROM channel_media_downloads WHERE monitor_id=? ORDER BY id DESC LIMIT ?", + (monitor_id, limit), + ).fetchall() + else: + rows = conn.execute( + "SELECT * FROM channel_media_downloads ORDER BY id DESC LIMIT ?", (limit,) + ).fetchall() + return [dict(r) for r in rows] + + +def channel_media_download_exists(channel_id: int, message_id: int) -> bool: + with closing(db()) as conn: + row = conn.execute( + "SELECT id FROM channel_media_downloads WHERE channel_id=? AND message_id=?", + (channel_id, message_id), + ).fetchone() + return row is not None + + +async def telethon_list_dialogs(limit: int = 500) -> list[dict[str, Any]]: + client = get_or_create_channel_media_client() + if not client: + return [] + try: + if not client.is_connected(): + await client.connect() + if not await client.is_user_authorized(): + return [] + result = [] + count = 0 + async for dialog in client.iter_dialogs(limit=limit): + entity = dialog.entity + chat_type = "unknown" + if dialog.is_group: + chat_type = "group" + elif dialog.is_channel: + chat_type = "channel" + elif dialog.is_user: + chat_type = "user" + result.append({ + "id": dialog.id, + "title": dialog.name or "", + "username": getattr(entity, "username", "") or "", + "type": chat_type, + "unread_count": dialog.unread_count, + }) + count += 1 + logger.info("listed %d dialogs", count) + return result + except Exception: + logger.exception("telethon_list_dialogs failed") + return [] + + +async def telethon_search_dialogs(query: str, limit: int = 100) -> list[dict[str, Any]]: + client = get_or_create_channel_media_client() + if not client: + return [] + try: + if not client.is_connected(): + await client.connect() + if not await client.is_user_authorized(): + return [] + result = [] + q = query.strip().lower() + async for dialog in client.iter_dialogs(limit=500): + name = (dialog.name or "").lower() + username = str(getattr(dialog.entity, "username", "") or "").lower() + if q in name or q in username or query.strip() in str(dialog.id): + entity = dialog.entity + chat_type = "group" if dialog.is_group else ("channel" if dialog.is_channel else ("user" if dialog.is_user else "unknown")) + result.append({ + "id": dialog.id, + "title": dialog.name or "", + "username": getattr(entity, "username", "") or "", + "type": chat_type, + "unread_count": dialog.unread_count, + }) + if len(result) >= limit: + break + return result + except Exception: + logger.exception("telethon_search_dialogs failed") + return [] + + +def _detect_media_type(media: Any, allowed_types: set[str]) -> str: + if not media: + return "" + if hasattr(media, "video") or hasattr(media, "document')"): + pass + from telethon.tl.types import ( + DocumentAttributeVideo, + DocumentAttributeAudio, + DocumentAttributeFilename, + ) + if hasattr(media, "photo"): + if "photo" not in allowed_types: + return "" + return "photo" + if hasattr(media, "document"): + doc = media.document + if doc is None: + return "" + mime = str(getattr(doc, "mime_type", "") or "") + for attr in (getattr(doc, "attributes", None) or []): + if isinstance(attr, DocumentAttributeVideo): + if "video" not in allowed_types: + return "" + return "video" + if isinstance(attr, DocumentAttributeAudio): + if "audio" not in allowed_types: + return "" + return "audio" + if mime.startswith("video/"): + if "video" not in allowed_types: + return "" + return "video" + if mime.startswith("audio/"): + if "audio" not in allowed_types: + return "" + return "audio" + if "document" in allowed_types: + return "document" + return "" + if "document" in allowed_types: + return "document" + return "" + + +def _get_media_size(media: Any) -> int: + if hasattr(media, "document") and media.document: + return int(getattr(media.document, "size", 0) or 0) + if hasattr(media, "photo"): + return 0 + return 0 + + +async def channel_media_monitor_loop() -> None: + while True: + await asyncio.sleep(300) + try: + monitors = channel_media_monitors_all() + active_monitors = [m for m in monitors if m.get("status") == "active"] + if not active_monitors: + continue + for monitor in active_monitors: + try: + await telethon_download_from_channel(int(monitor["id"])) + except Exception: + logger.exception("channel media monitor failed id=%s", monitor.get("id")) + except Exception: + logger.exception("channel_media_monitor_loop error") + finally: + await disconnect_channel_media_client() + + +async def telethon_download_from_channel(monitor_id: int, download_history: bool = False) -> int: + monitor = channel_media_monitor_get(monitor_id) + if not monitor: + return 0 + proxy = str(monitor.get("proxy") or "").strip() + client = get_or_create_channel_media_client(proxy=proxy) + if not client: + logger.warning("channel media client not available") + return 0 + try: + if not client.is_connected(): + await client.connect() + if not await client.is_user_authorized(): + logger.warning("channel media client not authorized") + return 0 + channel_id = int(monitor["channel_id"]) + try: + entity = await client.get_entity(channel_id) + except Exception: + logger.warning("cannot resolve channel entity id=%s", channel_id) + return 0 + media_types_str = str(monitor.get("media_types") or "video,document") + allowed_types = {t.strip().lower() for t in media_types_str.split(",") if t.strip()} + keywords_str = str(monitor.get("keywords") or "").strip() + keywords_list = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else [] + max_size = int(monitor.get("max_file_size_mb") or 2000) * 1024 * 1024 + base_dir = str(monitor.get("download_dir") or "").strip() + if not base_dir: + base_dir = str(BASE_DIR / "channel_downloads" / str(channel_id)) + os.makedirs(base_dir, exist_ok=True) + last_msg_id = int(monitor.get("last_message_id") or 0) + offset_id = last_msg_id if not download_history else 0 + + # Date filtering + from datetime import datetime as dt_type + offset_date = None + date_from_str = str(monitor.get("date_from") or "").strip() + date_to_str = str(monitor.get("date_to") or "").strip() + if date_from_str: + try: + offset_date = dt_type.fromisoformat(date_from_str) + except Exception: + pass + # Collect messages to download + messages_to_download = [] + async for message in client.iter_messages( + entity, limit=500, offset_id=offset_id, + offset_date=offset_date, reverse=bool(offset_date), + ): + if offset_id > 0 and message.id <= offset_id and not offset_date: + break + # Date range check + if date_to_str and message.date: + try: + dt_to = dt_type.fromisoformat(date_to_str) + if message.date.replace(tzinfo=None) > dt_to: + continue + except Exception: + pass + if channel_media_download_exists(channel_id, message.id): + continue + if keywords_list: + msg_text = (message.text or "") + " " + (message.caption or "") + if not any(k.lower() in msg_text.lower() for k in keywords_list): + continue + media = message.media + if not media: + continue + media_type = _detect_media_type(media, allowed_types) + if not media_type: + continue + file_size = _get_media_size(media) + if file_size and file_size > max_size: + continue + messages_to_download.append((message, media_type, file_size)) + + # Concurrent download with semaphore + max_concurrent = max(1, int(monitor.get("max_concurrent") or 3)) + semaphore = asyncio.Semaphore(max_concurrent) + count = 0 + total_size_added = 0 + + async def download_one(msg: Any, mt: str, fs: int) -> tuple[int, int]: + nonlocal count, total_size_added + async with semaphore: + caption = (msg.text or msg.caption or "")[:500] + sender_id = msg.sender_id or 0 + file_name = "" + for attr in (getattr(getattr(msg.media, "document", None), "attributes", None) or []): + if hasattr(attr, "file_name") and attr.file_name: + file_name = attr.file_name + break + if not file_name: + file_name = f"{channel_id}_{msg.id}" + ext_map = {"video": ".mp4", "photo": ".jpg", "audio": ".mp3", "document": ".bin"} + file_name += ext_map.get(mt, ".bin") + file_path = ospath.join(base_dir, file_name) + if ospath.exists(file_path): + file_path = ospath.join(base_dir, f"{channel_id}_{msg.id}_{file_name}") + # Resume: use .part file + part_path = file_path + ".part" + try: + # Check for existing partial download + existing_size = ospath.getsize(part_path) if ospath.exists(part_path) else 0 + if existing_size > 0 and fs and existing_size >= fs: + # Already fully downloaded as .part, just rename + os.rename(part_path, file_path) + else: + await client.download_media(msg, file=part_path) + if ospath.exists(part_path): + os.rename(part_path, file_path) + else: + return (0, 0) + actual_size = ospath.getsize(file_path) if ospath.exists(file_path) else fs + channel_media_download_record( + monitor_id, channel_id, msg.id, + mt, file_name, file_path, + actual_size, caption, sender_id, + ) + logger.info("downloaded channel media: channel=%s msg=%s type=%s", channel_id, msg.id, mt) + return (1, actual_size) + except Exception: + logger.exception("download failed channel=%s msg=%s", channel_id, msg.id) + return (0, 0) + + # Run all downloads concurrently + tasks = [download_one(msg, mt, fs) for msg, mt, fs in messages_to_download] + results = await asyncio.gather(*tasks, return_exceptions=True) + for r in results: + if isinstance(r, tuple): + c, s = r + count += c + total_size_added += s + + new_total = int(monitor.get("total_downloaded") or 0) + count + new_size = int(monitor.get("total_size_bytes") or 0) + total_size_added + update_kwargs: dict[str, Any] = { + "total_downloaded": new_total, + "total_size_bytes": new_size, + } + async for last_msg in client.iter_messages(entity, limit=1): + update_kwargs["last_message_id"] = last_msg.id + break + channel_media_monitor_update(monitor_id, **update_kwargs) + if count > 0 and monitor.get("notify_telegram"): + title = monitor.get("channel_title") or str(channel_id) + await admin_send( + f"[频道媒体下载] {html_escape(title)}\n" + f"新增 {count} 个文件,共 {total_size_added // 1024 // 1024} MB\n" + f"累计:{new_total} 个文件,{new_size // 1024 // 1024} MB" + ) + return count + except Exception: + logger.exception("telethon_download_from_channel failed monitor_id=%s", monitor_id) + return 0 + + def group_message_text(message: Message) -> str: parts = [message.text or "", message.caption or ""] if getattr(message, "reply_to_message", None): @@ -2259,7 +2749,7 @@ pre{{white-space:pre-wrap;background:#121212;color:#fff;padding:13px;border:4px @media (prefers-reduced-motion: reduce){{ *,*::before,*::after{{animation:none!important;transition:none!important}} }} -

{html_escape(title)}

WatchBot Panel
+""" @@ -3179,6 +3669,252 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" async def health() -> str: return "ok" + # ---- Channel Media Monitoring Routes ---- + + def channel_media_page_html() -> str: + monitors = channel_media_monitors_all() + session_ok = user_session_ready() + notice = "" + if TelegramClient is None: + notice = "
未安装 telethon,频道媒体功能不可用。
" + elif not session_ok: + notice = "
TG_API_ID / TG_API_HASH / TG_API_SESSION 未完整填写,请先在「设置」中配置。
" + cards_html = "" + for m in monitors: + status = m.get("status", "active") + if status == "active": + badge_html = '运行中' + elif status == "paused": + badge_html = '已暂停' + else: + badge_html = '已停止' + total = m.get("total_downloaded", 0) + size_mb = (m.get("total_size_bytes", 0) or 0) // 1024 // 1024 + username = f"@{m.get('channel_username', '')}" if m.get("channel_username") else "" + pause_btn = "" + if status == "active": + pause_btn = f"暂停" + elif status == "paused": + pause_btn = f"恢复" + proxy_info = " · 代理: " + html_escape(m.get("proxy", "")) if m.get("proxy") else "" + date_info = "" + if m.get("date_from") or m.get("date_to"): + date_info = " · 日期: " + html_escape(m.get("date_from", "")) + "~" + html_escape(m.get("date_to", "")) + cards_html += f"""
+
+

{badge_html} {html_escape(m.get('channel_title',''))} {html_escape(username)}

+chat_id: {m.get('channel_id','')} · 媒体类型: {html_escape(m.get('media_types',''))} · 并发: {m.get('max_concurrent',3)} · 已下载: {total} 个 · {size_mb} MB{proxy_info}{date_info}
+
+{pause_btn} +下载历史 +立即检查 +删除 +
""" + return f"""
+

频道媒体监控

+

使用你的 TG 账号监控频道/群组,自动下载媒体文件。

+
+{notice}{cards_html}
+ + + +""" + + @app.get("/channel-media", response_class=HTMLResponse) + async def channel_media_page(_: str = Depends(panel_auth)) -> str: + return layout("频道媒体", channel_media_page_html()) + + @app.get("/api/groups") + async def api_groups(_: str = Depends(panel_auth), q: str = "", limit: int = 500) -> dict[str, Any]: + if not user_session_ready(): + return {"groups": [], "error": "TG user session not configured"} + if q.strip(): + groups = await telethon_search_dialogs(q.strip(), limit=min(limit, 200)) + else: + groups = await telethon_list_dialogs(limit=min(limit, 500)) + groups = [g for g in groups if g.get("type") in ("group", "channel")] + return {"groups": groups} + + @app.post("/api/monitors/create") + async def api_monitor_create( + _: str = Depends(panel_auth), + channel_id: str = Form(...), + channel_title: str = Form(""), + channel_username: str = Form(""), + media_types: str = Form("video,document"), + keywords: str = Form(""), + max_file_size_mb: int = Form(2000), + download_dir: str = Form(""), + notify_telegram: str | None = Form(None), + proxy: str = Form(""), + date_from: str = Form(""), + date_to: str = Form(""), + max_concurrent: int = Form(3), + ) -> RedirectResponse: + try: + cid = int(channel_id.strip()) + except (ValueError, TypeError): + raise HTTPException(400, "invalid channel_id") + channel_media_monitor_create( + cid, + channel_title.strip() or str(cid), + channel_username.strip(), + media_types.strip() or "video,document", + keywords.strip(), + max(1, max_file_size_mb), + download_dir.strip(), + bool(notify_telegram), + proxy.strip(), + date_from.strip(), + date_to.strip(), + max(1, min(10, max_concurrent)), + ) + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/pause") + async def channel_media_pause(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_update(monitor_id, status="paused") + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/resume") + async def channel_media_resume(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_update(monitor_id, status="active") + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/delete") + async def channel_media_delete_route(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_delete(monitor_id) + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/check", response_class=HTMLResponse) + async def channel_media_check(monitor_id: int, _: str = Depends(panel_auth)) -> str: + count = await telethon_download_from_channel(monitor_id) + monitor = channel_media_monitor_get(monitor_id) + name = monitor.get("channel_title", "") if monitor else "" + return layout("检查完成", f"
已检查频道 {html_escape(name)},新增 {count} 个文件。

返回

") + + @app.get("/channel-media/{monitor_id}/download", response_class=HTMLResponse) + async def channel_media_download_history(monitor_id: int, _: str = Depends(panel_auth)) -> str: + monitor = channel_media_monitor_get(monitor_id) + if not monitor: + raise HTTPException(404) + downloads = channel_media_downloads_list(monitor_id, limit=200) + rows = "" + for d in downloads: + size_mb = f"{(d.get('file_size', 0) or 0) / 1024 / 1024:.1f} MB" + rows += f"{d.get('id')}{html_escape(d.get('media_type',''))}" + rows += f"{html_escape(d.get('file_name',''))}
{html_escape(d.get('caption','')[:100])}" + rows += f"{size_mb}{html_escape(d.get('created_at',''))}" + total = monitor.get("total_downloaded", 0) + size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024 + body = f"""

下载记录 - {html_escape(monitor.get('channel_title',''))}

+

累计:{total} 个文件,{size_mb} MB

+ +{rows}
ID类型文件名/说明大小时间
""" + return layout("下载记录", body) + return app @@ -3262,6 +3998,10 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: ) else: user_session_listener_task = asyncio.create_task(run_user_session_group_listener()) + if TelegramClient is not None and user_session_ready(): + asyncio.create_task(channel_media_monitor_loop()) + else: + logger.info("channel media monitor loop skipped: telethon not installed or user session not configured") await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") logger.info("bot polling start") await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) diff --git a/linuxdo_post.md b/linuxdo_post.md new file mode 100644 index 0000000..82a2a50 --- /dev/null +++ b/linuxdo_post.md @@ -0,0 +1,149 @@ +# Telegram机器人:双向对话 Bot + 网页关键词推送 + 可视化面板 + tg群消息管理 + 广告屏蔽 +## 资源荟萃 + +本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容: + +- 我的帖子已经打上 `#开源推广` 标签:是 +- 我的开源项目完整开源,无未开源部分:是 +- 我的开源项目已链接认可 LINUX DO 社区:是 +- 我帖子内的项目介绍,AI生成、润色内容部分已截图发出:是 +- 以上选择我承诺是永久有效的,接受社区和佬友监督:是 + +以下为项目介绍正文内容,AI 生成、润色内容已使用截图方式发出。 + +## 更新日志 + +### 2026-05-22 更新 +- TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。 +- TG 群监听新增“已发现群聊”:自动显示 Bot 收到过消息的群聊 `chat_id`,可一键创建监听。 +- 新增 `/update` 安全更新流程:显示本地/远端 commit、ahead/behind、工作区状态;仅允许 `ff-only` 更新。 +- 更新前若检测到本地未提交改动,会拒绝更新;避免覆盖本地代码。 +- 新增“回滚上次更新”按钮:更新前自动记录回滚点,可一键回滚并重启。 +- TG 群监听 AI 总结新增可视化高级控制:`ai_prompt`、`ai_min_interval_seconds`、`ai_dedupe_window_seconds`。 +- TG 群监听增加限频和去重窗口,降低重复推送与 AI 调用成本;AI 失败时仍会回退模板摘要。 +- 监控面板新增可观测状态:最近成功/失败时间、最近错误、耗时、推送数、连续失败次数。 + +### 2026-05-21 第二次更新 +- Web 面板新增收件箱直接回复、用户管理、快捷回复、私聊广告拦截、监控推送历史、配置导入/导出。 +- 收件箱改为完整双向对话记录:用户消息、Web 回复、TG 管理员回复都会显示。 +- 用户管理页新增 Bot / 面板配置卡片,和设置页共用同一份配置;修改 Token、管理员 ID、端口、账号或密码后需要重启。 +- `ADMIN_CHAT_ID` 支持最多 3 个管理员,用逗号分隔。 +- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。 + +### 2026-05-21 +- 默认启动改为先启动 Web 面板:未填写 `TELEGRAM_BOT_TOKEN` / `ADMIN_CHAT_ID` 时,面板仍可打开,同时 Telegram 收发、监控推送不可用。 +- 面板配置页可填写 Bot Token、管理员 ID、面板账号和清理策略;保存后需要重启服务让 Bot 配置生效。 +- 修复到期消息无法删除的 bug:监控推送消息支持到期自动删除,默认 60 分钟。 +- 保存配置时会保留 `WEB_PANEL_SESSION_SECRET`,避免保存后登录状态被重置。 +- Web 面板界面和站点图标已更新优化。 + +(新版安装方法详见项目地址) + +最近看 TG 上不少人说封号比较严重。再加上我作为一个刚入门的 MJJ,平时也想更快地获取信息,所以 vibe 了一个自用小工具:`tg-watchbot`。 + +它是一个轻量级 Python 服务,把 Telegram 双向对话 Bot 和 Web/RSS 监控推送 合在一起,并提供网页端可视化管理面板。它适合放在 VPS、NAT 小鸡或者家里的小服务器上跑。 + +项目地址: + +[GitHub - GongyiChuren/tg-watchbot](https://github.com/GongyiChuren/tg-watchbot) + +## 为什么做这个 + +我最开始的需求很简单: + +- 有一个自己的 Telegram 联系入口,别人私聊 Bot 后可以转发到管理员账号; +- 能实时获取 Linux.do、博客、商店页面等信息源; +- 命中关键词、新条目、价格变化或库存变化时,自动推送到 Telegram; +- 有网页面板,不用每次都手改配置文件。 + +另外,项目支持定时清理监控推送相关状态数据,避免网页监控信息过多,影响正常的双向对话使用。 + +## 主要功能 + +- 普通用户私聊 Bot,消息会转发给管理员 +- 管理员可以直接回复用户,也可以主动发文字 / 图片 +- 支持封禁、解封、备注、查看用户信息 +- 支持 RSS / Atom 监控 +- 支持网页 CSS selector 抓取 +- 支持关键词、新条目、价格变化、库存变化提醒 +- 内置 Web 管理面板 +- 可以在面板里新增、编辑、删除监控 +- 支持 RSS 模板 +- 使用 SQLite 保存用户、消息、监控状态 +- 支持 systemd 部署 + +## 展示 + +> 这里放截图 + +## AI 一句话安装 + +请克隆 `https://github.com/GongyiChuren/tg-watchbot.git` 到本机,默认使用 Docker 按 README 的 `Docker 安装(含自启)` 启动服务;若无 Docker,则按 `快速开始` 走 Python 和 systemd 部署直接安装。启动后打开 `http://127.0.0.1:8765`,安装完成后提醒用户记得在面板填写 `TELEGRAM_BOT_TOKEN` 和 `ADMIN_CHAT_ID` 并在之后保存并执行重启(Docker 用 `docker compose restart`,直接安装用重启进程)。 + +## Docker 安装(含自启) + +```bash +git clone https://github.com/GongyiChuren/tg-watchbot.git tg-watchbot +cd tg-watchbot +cp .env.example .env +cp config.example.yaml config.yaml +touch tg-watchbot.sqlite3 tg-watchbot.log +docker compose up -d --build +``` + +Docker 会在容器内监听 `0.0.0.0:8765`,宿主机仍然打开 `http://127.0.0.1:8765`。 + +查看状态与日志: + +```bash +docker compose ps +docker compose logs -f +``` + +修改配置后重启: + +```bash +docker compose restart +``` + +## systemd 部署 + +推荐部署到 `/opt/tg-watchbot`: + +```bash +sudo useradd --system --no-create-home --shell /usr/sbin/nologin tg-watchbot || true +sudo mkdir -p /opt/tg-watchbot +sudo chown -R "$USER:$USER" /opt/tg-watchbot + +cd /opt/tg-watchbot +git clone https://github.com/GongyiChuren/tg-watchbot.git . +python3 -m venv .venv +./.venv/bin/pip install -U pip +./.venv/bin/pip install -r requirements.txt +cp .env.example .env +cp config.example.yaml config.yaml + +# 先用前台模式打开面板,确认能登录和保存配置 +./.venv/bin/python app.py +``` + +在服务器本机打开: + +`http://127.0.0.1:8765` + +默认账号来自 `.env.example`: + +- 用户名:admin +- 密码:change-me + +## 注意事项 + +- Telegram Bot 不能主动私聊陌生人,对方必须先给 Bot 发过消息 +- `.env` 里有 Token 和密码,不要提交到 GitHub +- Web 面板如果暴露到公网,建议套 Cloudflare Access / 反代鉴权 +- RSS 监控建议 60 秒起步 +- 网页监控建议更保守一点,避免对目标站造成压力 + +目前它还是一个自用小工具,目标是够轻、够直接、够容易部署。后续会继续修 bug 😋 + +可以的话点个 star 吧,谢谢佬们🙏🙏🙏