From 928f54e9232ed27af4bdde467b81c682b31dcc93 Mon Sep 17 00:00:00 2001 From: InfernoXuaI <1391197588@qq.com> Date: Thu, 28 May 2026 22:21:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E8=BA=AB=E7=BE=A4=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 自身群监控 --- README.md | 16 ++--- app.py | 154 +++++++++++++++++++++++++++++++++++++------------ patch_main.txt | 119 ++++++++++++++++++++++++++++++++++++++ patch_tail.txt | 103 +++++++++++++++++++++++++++++++++ 4 files changed, 346 insertions(+), 46 deletions(-) create mode 100644 patch_main.txt create mode 100644 patch_tail.txt diff --git a/README.md b/README.md index 978b83d..23c8122 100644 --- a/README.md +++ b/README.md @@ -32,14 +32,13 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 ### 2026-05-28 更新 -- 新增「频道媒体下载」功能:使用 Telethon 用户账号登录 Telegram,可浏览已加入的群组/频道,选择性下载媒体文件。 -- 面板新增「频道媒体」页面:支持搜索群组、添加/暂停/恢复/删除监控、查看下载记录。 -- 支持断点续传:大文件下载中断后自动续传,不重复下载。 -- 支持并发下载控制:可设置同时下载数(默认 3),避免带宽打满。 -- 支持 SOCKS5/HTTP 代理:国内用户可配置代理访问 Telegram。 -- 支持按日期范围过滤:只下载指定时间段内的消息。 -- 支持关键词过滤、媒体类型过滤(视频/文档/图片/音频)、文件大小限制。 -- 下载完成可自动推送 Telegram 通知给管理员。 +- 新增「频道媒体转发」:使用 Telethon 用户账号登录 TG,实时转发群组/频道消息到你的 Telegram。 +- 面板新增「频道媒体」页面:搜索已加入群组,一键添加转发监控。 +- 支持暂停/恢复监控(保留配置)、删除监控。 +- 支持关键词过滤:只转发包含特定关键词的消息,留空则转发全部。 +- 支持媒体类型过滤:可选视频、文档、图片、音频。 +- 支持 SOCKS5/HTTP 代理,适合国内服务器。 +- 内置下载到服务器、断点续传、并发下载等功能,后续可通过配置开启。 - 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 后使用。 ### 2026-05-22 更新 @@ -125,6 +124,7 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器 - 支持 SOCKS5/HTTP 代理,适合国内服务器使用。 - 支持按日期范围过滤:只下载指定时间段内的消息。 - 支持关键词过滤、媒体类型选择(视频/文档/图片/音频)、文件大小限制。 +- 支持实时转发模式:群消息匹配后直接转发到你的 Telegram(含视频/文档原文),无需下载到服务器。 - 下载完成可自动推送 Telegram 通知给管理员。 - 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION`。 diff --git a/app.py b/app.py index 6a3e577..0e1cb2c 100644 --- a/app.py +++ b/app.py @@ -250,6 +250,8 @@ def init_db() -> None: date_from TEXT DEFAULT '', date_to TEXT DEFAULT '', max_concurrent INTEGER DEFAULT 3, + forward_mode INTEGER DEFAULT 0, + forward_to TEXT DEFAULT 'admin', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); @@ -277,6 +279,8 @@ def init_db() -> None: "ALTER TABLE channel_media_monitors ADD COLUMN date_from TEXT DEFAULT ''", "ALTER TABLE channel_media_monitors ADD COLUMN date_to TEXT DEFAULT ''", "ALTER TABLE channel_media_monitors ADD COLUMN max_concurrent INTEGER DEFAULT 3", + "ALTER TABLE channel_media_monitors ADD COLUMN forward_mode INTEGER DEFAULT 0", + "ALTER TABLE channel_media_monitors ADD COLUMN forward_to TEXT DEFAULT 'admin'", ]: try: conn.execute(sql) @@ -647,6 +651,8 @@ def channel_media_monitor_create( date_from: str = "", date_to: str = "", max_concurrent: int = 3, + forward_mode: bool = False, + forward_to: str = "admin", ) -> int: ts = now_iso() with closing(db()) as conn: @@ -654,11 +660,12 @@ def channel_media_monitor_create( """INSERT INTO channel_media_monitors (channel_id, channel_title, channel_username, status, media_types, keywords, max_file_size_mb, download_dir, notify_telegram, proxy, date_from, date_to, - max_concurrent, created_at, updated_at) - VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + max_concurrent, forward_mode, forward_to, created_at, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", (channel_id, channel_title, channel_username, "active", media_types, keywords, max_file_size_mb, download_dir, 1 if notify_telegram else 0, - proxy, date_from, date_to, max_concurrent, ts, ts), + proxy, date_from, date_to, max_concurrent, 1 if forward_mode else 0, + forward_to, ts, ts), ) conn.commit() return int(cur.lastrowid) @@ -667,7 +674,7 @@ def channel_media_monitor_create( def channel_media_monitor_update(monitor_id: int, **kwargs: Any) -> None: allowed = {"status", "media_types", "keywords", "max_file_size_mb", "download_dir", "last_message_id", "notify_telegram", "total_downloaded", "total_size_bytes", - "proxy", "date_from", "date_to", "max_concurrent"} + "proxy", "date_from", "date_to", "max_concurrent", "forward_mode", "forward_to"} updates = [] values = [] for k, v in kwargs.items(): @@ -871,6 +878,92 @@ async def channel_media_monitor_loop() -> None: await disconnect_channel_media_client() +async def channel_media_forward_listener() -> None: + """Real-time listener: forward messages from monitored groups to admin Telegram.""" + if TelegramClient is None or StringSession is None: + logger.info("channel media forward listener skipped: telethon not installed") + return + api_id_raw, api_hash, session = user_session_config() + if not api_id_raw or not api_hash or not session: + logger.info("channel media forward listener skipped: user session not configured") + return + try: + api_id = int(api_id_raw) + except Exception: + return + try: + client = TelegramClient(StringSession(session), api_id, api_hash) + + def _get_forward_monitors() -> dict[int, dict[str, Any]]: + monitors = channel_media_monitors_all() + result = {} + for m in monitors: + if m.get("status") != "active" or not m.get("forward_mode"): + continue + try: + cid = int(m["channel_id"]) + result[cid] = m + except (TypeError, ValueError): + continue + return result + + @client.on(events.NewMessage(incoming=True)) + async def on_new_message_for_forward(event: Any) -> None: + try: + chat_id = getattr(event, "chat_id", None) + if chat_id is None: + return + fwd_monitors = _get_forward_monitors() + monitor = fwd_monitors.get(int(chat_id)) + if not monitor: + return + msg = event.message + text = (msg.text or "") + " " + (msg.caption or "") + # Keyword filter + keywords_str = str(monitor.get("keywords") or "").strip() + if keywords_str: + keywords_list = [k.strip() for k in keywords_str.split(",") if k.strip()] + if keywords_list and not any(k.lower() in text.lower() for k in keywords_list): + return + # Media type filter + media_types_str = str(monitor.get("media_types") or "").strip() + if media_types_str: + allowed = {t.strip().lower() for t in media_types_str.split(",") if t.strip()} + if allowed and not msg.media: + has_text_only = bool(text.strip()) + if not has_text_only: + return + # Determine forward target + forward_to = str(monitor.get("forward_to") or "admin").strip() + if forward_to == "admin": + targets = all_admin_chat_ids() + elif forward_to == "saved": + targets = ["me"] + else: + try: + targets = [int(forward_to)] + except (TypeError, ValueError): + targets = all_admin_chat_ids() + # Forward the message + for target in targets: + try: + await msg.forward_to(target) + except Exception: + logger.exception("forward failed target=%s msg_id=%s", target, msg.id) + logger.info("forwarded message from chat=%s msg_id=%s to %s", chat_id, msg.id, targets) + except Exception: + logger.exception("on_new_message_for_forward error") + + await client.start() + logger.info("channel media forward listener started") + await client.run_until_disconnected() + except asyncio.CancelledError: + logger.info("channel media forward listener cancelled") + raise + except Exception: + logger.exception("channel media forward listener crashed") + + async def telethon_download_from_channel(monitor_id: int, download_history: bool = False) -> int: monitor = channel_media_monitor_get(monitor_id) if not monitor: @@ -3688,8 +3781,6 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" badge_html = '已暂停' else: badge_html = '已停止' - total = m.get("total_downloaded", 0) - size_mb = (m.get("total_size_bytes", 0) or 0) // 1024 // 1024 username = f"@{m.get('channel_username', '')}" if m.get("channel_username") else "" pause_btn = "" if status == "active": @@ -3697,22 +3788,17 @@ HostLoc|https://hostloc.com|VPS,补货,优惠""" elif status == "paused": pause_btn = f"恢复" proxy_info = " · 代理: " + html_escape(m.get("proxy", "")) if m.get("proxy") else "" - date_info = "" - if m.get("date_from") or m.get("date_to"): - date_info = " · 日期: " + html_escape(m.get("date_from", "")) + "~" + html_escape(m.get("date_to", "")) cards_html += f"""

{badge_html} {html_escape(m.get('channel_title',''))} {html_escape(username)}

-chat_id: {m.get('channel_id','')} · 媒体类型: {html_escape(m.get('media_types',''))} · 并发: {m.get('max_concurrent',3)} · 已下载: {total} 个 · {size_mb} MB{proxy_info}{date_info}
+chat_id: {m.get('channel_id','')} · 转发: {'开启' if m.get('forward_mode') else '关闭'}{proxy_info}
{pause_btn} -下载历史 -立即检查 删除
""" return f"""
-

频道媒体监控

-

使用你的 TG 账号监控频道/群组,自动下载媒体文件。

+

频道媒体转发

+

使用你的 TG 账号监听群组,消息实时转发到你的 Telegram。

{notice}{cards_html}
@@ -3733,25 +3819,17 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
- - - - - - - - + + + + -
-
-
-
- -
- +
+
+
@@ -3803,13 +3881,9 @@ async function addMonitor() {{ channel_username: document.getElementById('selChannelUsername').value, media_types: document.getElementById('mediaTypes').value, keywords: document.getElementById('mediaKeywords').value, - max_file_size_mb: document.getElementById('maxFileSize').value, - download_dir: document.getElementById('downloadDir').value, proxy: document.getElementById('mediaProxy').value, - date_from: document.getElementById('dateFrom').value, - date_to: document.getElementById('dateTo').value, - max_concurrent: document.getElementById('maxConcurrent').value, - notify_telegram: document.getElementById('notifyTg').checked ? 'on' : '', + forward_mode: document.getElementById('forwardMode').checked ? 'on' : '', + forward_to: document.getElementById('forwardTo').value, }}); try {{ const resp = await fetch('/api/monitors/create', {{ method: 'POST', headers: {{'Content-Type':'application/x-www-form-urlencoded'}}, body: body.toString() }}); @@ -3850,6 +3924,8 @@ document.getElementById('addModal').addEventListener('click', function(e) {{ if date_from: str = Form(""), date_to: str = Form(""), max_concurrent: int = Form(3), + forward_mode: str | None = Form(None), + forward_to: str = Form("admin"), ) -> RedirectResponse: try: cid = int(channel_id.strip()) @@ -3868,6 +3944,8 @@ document.getElementById('addModal').addEventListener('click', function(e) {{ if date_from.strip(), date_to.strip(), max(1, min(10, max_concurrent)), + bool(forward_mode), + forward_to.strip() or "admin", ) return RedirectResponse("/channel-media", status_code=303) @@ -3960,7 +4038,6 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: while True: await asyncio.sleep(3600) if run_once: - # If .env is filled, send notifications during manual test; otherwise just log. try: token, admin_chat_id = validate_env() admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) @@ -3999,9 +4076,10 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None: else: user_session_listener_task = asyncio.create_task(run_user_session_group_listener()) if TelegramClient is not None and user_session_ready(): - asyncio.create_task(channel_media_monitor_loop()) + asyncio.create_task(channel_media_forward_listener()) + logger.info("channel media forward listener started") else: - logger.info("channel media monitor loop skipped: telethon not installed or user session not configured") + logger.info("channel media forward listener skipped: telethon not installed or user session not configured") await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") logger.info("bot polling start") await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) diff --git a/patch_main.txt b/patch_main.txt new file mode 100644 index 0000000..77dec8b --- /dev/null +++ b/patch_main.txt @@ -0,0 +1,119 @@ + total = monitor.get("total_downloaded", 0) + size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024 + body = f"""

下载记录 - {html_escape(monitor.get('channel_title',''))}

+

累计:{total} 个文件,{size_mb} MB

+ +{rows}
ID类型文件名/说明大小时间
""" + return layout("下载记录", body) + + return app + + +async def start_panel_server() -> uvicorn.Server | None: + if not panel_enabled(): + logger.info("web panel disabled") + return None + host = os.getenv("WEB_PANEL_HOST", "127.0.0.1") + port = int(os.getenv("WEB_PANEL_PORT", "8765")) + server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info")) + asyncio.create_task(server.serve()) + logger.info("web panel listening on http://%s:%s", host, port) + return server + +def validate_env() -> tuple[str, int]: + load_dotenv(ENV_PATH) + token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip() + admin = os.getenv("ADMIN_CHAT_ID", "").strip() + if not token: + raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}") + if not admin: + raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}") + ids = parse_admin_chat_ids(admin) + if not ids: + raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}") + return token, ids[0] + + +def bot_env_configured() -> bool: + load_dotenv(ENV_PATH, override=True) + return bool(os.getenv("TELEGRAM_BOT_TOKEN", "").strip() and os.getenv("ADMIN_CHAT_ID", "").strip()) + + +async def main_async(run_once: bool = False, panel_only: bool = False) -> None: + global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task + load_dotenv(ENV_PATH, override=True) + config = load_config() + setup_logging(os.getenv("LOG_LEVEL", "INFO")) + init_db() + if panel_only: + await start_panel_server() + logger.info("panel-only mode start") + while True: + await asyncio.sleep(3600) + if run_once: + try: + token, admin_chat_id = validate_env() + admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + except Exception as e: + logger.warning("run-once without Telegram notification: %s", e) + await run_all_monitors_once() + if bot: + await bot.session.close() + return + await start_panel_server() + if not bot_env_configured(): + logger.warning( + "Telegram bot is not configured. Web panel is available, but Telegram polling, monitor notifications, and admin/user messaging will not work until TELEGRAM_BOT_TOKEN and ADMIN_CHAT_ID are saved, then the service is restarted." + ) + while True: + await asyncio.sleep(3600) + token, admin_chat_id = validate_env() + admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) + bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) + dp = Dispatcher() + dp.include_router(router) + scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") + scheduler_ref = scheduler + schedule_monitors(scheduler) + scheduler.start() + asyncio.create_task(flush_pending_loop()) + asyncio.create_task(cleanup_monitor_loop()) + if group_monitors_need_user_session(): + if TelegramClient is None: + logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed") + elif not user_session_ready(): + logger.warning( + "group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete" + ) + else: + user_session_listener_task = asyncio.create_task(run_user_session_group_listener()) + if TelegramClient is not None and user_session_ready(): + asyncio.create_task(channel_media_monitor_loop()) + asyncio.create_task(channel_media_forward_listener()) + logger.info("channel media download + forward listeners started") + else: + logger.info("channel media listeners skipped: telethon not installed or user session not configured") + await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") + logger.info("bot polling start") + await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent") + parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured") + args = parser.parse_args() + try: + asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only)) + except KeyboardInterrupt: + pass + except Exception: + logger.exception("fatal error") + raise + + +if __name__ == "__main__": + main() diff --git a/patch_tail.txt b/patch_tail.txt new file mode 100644 index 0000000..3657466 --- /dev/null +++ b/patch_tail.txt @@ -0,0 +1,103 @@ +document.getElementById('addModal').addEventListener('click', function(e) { if (e.target === this) this.style.display='none'; }); +""" + + @app.get("/channel-media", response_class=HTMLResponse) + async def channel_media_page(_: str = Depends(panel_auth)) -> str: + return layout("频道媒体", channel_media_page_html()) + + @app.get("/api/groups") + async def api_groups(_: str = Depends(panel_auth), q: str = "", limit: int = 500) -> dict[str, Any]: + if not user_session_ready(): + return {"groups": [], "error": "TG user session not configured"} + if q.strip(): + groups = await telethon_search_dialogs(q.strip(), limit=min(limit, 200)) + else: + groups = await telethon_list_dialogs(limit=min(limit, 500)) + groups = [g for g in groups if g.get("type") in ("group", "channel")] + return {"groups": groups} + + @app.post("/api/monitors/create") + async def api_monitor_create( + _: str = Depends(panel_auth), + channel_id: str = Form(...), + channel_title: str = Form(""), + channel_username: str = Form(""), + media_types: str = Form("video,document"), + keywords: str = Form(""), + max_file_size_mb: int = Form(2000), + download_dir: str = Form(""), + notify_telegram: str | None = Form(None), + proxy: str = Form(""), + date_from: str = Form(""), + date_to: str = Form(""), + max_concurrent: int = Form(3), + forward_mode: str | None = Form(None), + forward_to: str = Form("admin"), + ) -> RedirectResponse: + try: + cid = int(channel_id.strip()) + except (ValueError, TypeError): + raise HTTPException(400, "invalid channel_id") + channel_media_monitor_create( + cid, + channel_title.strip() or str(cid), + channel_username.strip(), + media_types.strip() or "video,document", + keywords.strip(), + max(1, max_file_size_mb), + download_dir.strip(), + bool(notify_telegram), + proxy.strip(), + date_from.strip(), + date_to.strip(), + max(1, min(10, max_concurrent)), + bool(forward_mode), + forward_to.strip() or "admin", + ) + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/pause") + async def channel_media_pause(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_update(monitor_id, status="paused") + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/resume") + async def channel_media_resume(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_update(monitor_id, status="active") + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/delete") + async def channel_media_delete_route(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse: + channel_media_monitor_delete(monitor_id) + return RedirectResponse("/channel-media", status_code=303) + + @app.get("/channel-media/{monitor_id}/check", response_class=HTMLResponse) + async def channel_media_check(monitor_id: int, _: str = Depends(panel_auth)) -> str: + count = await telethon_download_from_channel(monitor_id) + monitor = channel_media_monitor_get(monitor_id) + name = monitor.get("channel_title", "") if monitor else "" + return layout("检查完成", f"
已检查频道 {html_escape(name)},新增 {count} 个文件。

返回

") + + @app.get("/channel-media/{monitor_id}/download", response_class=HTMLResponse) + async def channel_media_download_history(monitor_id: int, _: str = Depends(panel_auth)) -> str: + monitor = channel_media_monitor_get(monitor_id) + if not monitor: + raise HTTPException(404) + downloads = channel_media_downloads_list(monitor_id, limit=200) + rows = "" + for d in downloads: + size_mb = f"{(d.get('file_size', 0) or 0) / 1024 / 1024:.1f} MB" + rows += f"{d.get('id')}{html_escape(d.get('media_type',''))}" + rows += f"{html_escape(d.get('file_name',''))}
{html_escape(d.get('caption','')[:100])}" + rows += f"{size_mb}{html_escape(d.get('created_at',''))}" + total = monitor.get("total_downloaded", 0) + size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024 + body = f"""

下载记录 - {html_escape(monitor.get('channel_title',''))}

+

累计:{total} 个文件,{size_mb} MB

+ +{rows}
ID类型文件名/说明大小时间
""" + return layout("下载记录", body) + + return app