merge origin/main: 自身群监控

This commit is contained in:
GongyiChuren
2026-05-31 10:07:44 +08:00
5 changed files with 1226 additions and 5 deletions
+35 -3
View File
@@ -1,7 +1,7 @@
<div align="center">
<h1>tg-watchbot</h1>
<p>Telegram 双向客服机器人 + Web/RSS 监控推送 + 可视化管理面板</p>
<p>双向对话 · 关键词监控 · 私聊广告拦截 · 多管理员 · 配置导入导出</p>
<p>Telegram 双向客服机器人 + Web/RSS 监控推送 + 频道媒体下载 + 可视化管理面板</p>
<p>双向对话 · 关键词监控 · 频道媒体下载 · 私聊广告拦截 · 多管理员 · 配置导入导出</p>
<p>
<a href="#ai-one-line-install">AI 一句话安装</a> ·
<a href="#docker-install">Docker 安装</a> ·
@@ -13,11 +13,12 @@
</div>
## 简介:
tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人****Web/RSS 监控推送** 合在一起:
tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器人****Web/RSS 监控推送** 和 **频道媒体下载** 合在一起:
- 普通用户私聊 Bot,消息会转发给管理员;
- 管理员可以直接回复、主动发文字/图片、封禁/备注用户;
- 后台定时监控 RSS 或网页,命中关键词、新条目、价格/库存变化后推送给管理员;
- 使用 Telethon 用户账号自动下载频道/群组中的视频、文档等媒体文件;
- 自带一个 Web 管理面板,可配置监控目标、编辑 YAML、查看收件箱和日志。
项目为单文件应用,适合个人服务器、NAT 小鸡、轻量 VPS 直接用 systemd 跑。
@@ -29,6 +30,17 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
```
## 更新日志
### 2026-05-28 更新
- 新增「频道媒体转发」:使用 Telethon 用户账号登录 TG,实时转发群组/频道消息到你的 Telegram。
- 面板新增「频道媒体」页面:搜索已加入群组,一键添加转发监控。
- 支持暂停/恢复监控(保留配置)、删除监控。
- 支持关键词过滤:只转发包含特定关键词的消息,留空则转发全部。
- 支持媒体类型过滤:可选视频、文档、图片、音频。
- 支持 SOCKS5/HTTP 代理,适合国内服务器。
- 内置下载到服务器、断点续传、并发下载等功能,后续可通过配置开启。
- 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION` 后使用。
### 2026-05-22 更新
- TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。
@@ -101,6 +113,21 @@ tg-watchbot 是一个轻量级 Python 服务,把 **Telegram 双向客服机器
![示例图片](https://pic.gongyichuren.de/file/1779287170665_17b7c8b4040d6334ea62a108d08db644.png)
### 频道媒体下载
- 使用 Telethon 用户账号(非 Bot)登录 Telegram,可访问已加入的所有频道和群组。
- 面板「频道媒体」页面支持搜索已加入的群组/频道,一键添加监控。
- 支持暂停/恢复监控(保留配置)、删除监控。
- 支持实时自动下载新消息中的媒体,也支持手动触发下载历史媒体。
- 支持断点续传:大文件下载中断后自动续传,不重复下载。
- 支持并发下载控制:可设置同时下载数(1-10,默认 3)。
- 支持 SOCKS5/HTTP 代理,适合国内服务器使用。
- 支持按日期范围过滤:只下载指定时间段内的消息。
- 支持关键词过滤、媒体类型选择(视频/文档/图片/音频)、文件大小限制。
- 支持实时转发模式:群消息匹配后直接转发到你的 Telegram(含视频/文档原文),无需下载到服务器。
- 下载完成可自动推送 Telegram 通知给管理员。
- 需要在设置页填写 `TG_API_ID`、`TG_API_HASH`、`TG_API_SESSION`。
### Web 管理面板
- 登录页 + HttpOnly session cookie,不使用丑陋的浏览器 Basic Auth。
@@ -490,6 +517,11 @@ monitors:
| `/rules` | 私聊广告拦截规则 |
| `/replies` | 快捷回复模板 |
| `/monitor/events` | 监控推送历史 |
| `/channel-media` | 频道媒体监控 |
| `/channel-media/{id}/pause` | 暂停频道监控 |
| `/channel-media/{id}/resume` | 恢复频道监控 |
| `/channel-media/{id}/check` | 手动下载频道媒体 |
| `/channel-media/{id}/download` | 查看下载记录 |
| `/config/export` | 导出 / 导入 `config.yaml` |
| `/logs` | 日志 |
| `/health` | 健康检查 |
+820 -2
View File
@@ -27,6 +27,7 @@ from pathlib import Path
from types import SimpleNamespace
from typing import Any
from urllib.parse import quote_plus, urljoin
import os.path as ospath
import feedparser
import httpx
@@ -79,6 +80,7 @@ pending_sendpic: dict[int, dict[str, Any]] = {}
scheduler_ref: AsyncIOScheduler | None = None
user_session_listener_task: asyncio.Task | None = None
user_session_client: Any = None
channel_media_clients: dict[str, Any] = {}
GROUP_SUMMARY_MAX_CHARS = 800
@@ -230,11 +232,55 @@ def init_db() -> None:
last_seen_at TEXT NOT NULL,
active INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS channel_media_monitors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_id INTEGER NOT NULL,
channel_title TEXT NOT NULL,
channel_username TEXT DEFAULT '',
status TEXT DEFAULT 'active',
media_types TEXT DEFAULT 'video,document',
keywords TEXT DEFAULT '',
max_file_size_mb INTEGER DEFAULT 2000,
download_dir TEXT DEFAULT '',
last_message_id INTEGER DEFAULT 0,
total_downloaded INTEGER DEFAULT 0,
total_size_bytes INTEGER DEFAULT 0,
notify_telegram INTEGER DEFAULT 1,
proxy TEXT DEFAULT '',
date_from TEXT DEFAULT '',
date_to TEXT DEFAULT '',
max_concurrent INTEGER DEFAULT 3,
forward_mode INTEGER DEFAULT 0,
forward_to TEXT DEFAULT 'admin',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS channel_media_downloads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
monitor_id INTEGER NOT NULL,
channel_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
media_type TEXT NOT NULL,
file_name TEXT DEFAULT '',
file_path TEXT DEFAULT '',
file_size INTEGER DEFAULT 0,
caption TEXT DEFAULT '',
sender_id INTEGER DEFAULT 0,
status TEXT DEFAULT 'completed',
created_at TEXT NOT NULL,
FOREIGN KEY (monitor_id) REFERENCES channel_media_monitors(id)
);
"""
)
for sql in [
"ALTER TABLE inbox_messages ADD COLUMN direction TEXT DEFAULT 'in'",
"ALTER TABLE inbox_messages ADD COLUMN source TEXT DEFAULT 'user'",
"ALTER TABLE channel_media_monitors ADD COLUMN proxy TEXT DEFAULT ''",
"ALTER TABLE channel_media_monitors ADD COLUMN date_from TEXT DEFAULT ''",
"ALTER TABLE channel_media_monitors ADD COLUMN date_to TEXT DEFAULT ''",
"ALTER TABLE channel_media_monitors ADD COLUMN max_concurrent INTEGER DEFAULT 3",
"ALTER TABLE channel_media_monitors ADD COLUMN forward_mode INTEGER DEFAULT 0",
"ALTER TABLE channel_media_monitors ADD COLUMN forward_to TEXT DEFAULT 'admin'",
]:
try:
conn.execute(sql)
@@ -538,6 +584,543 @@ def user_session_ready() -> bool:
return True
# ---- Channel Media Download (Telethon user session) ----
def get_or_create_channel_media_client(client_type: str = "channel_media", proxy: str = "") -> Any:
if TelegramClient is None or StringSession is None:
return None
if client_type in channel_media_clients:
c = channel_media_clients[client_type]
if c.is_connected():
return c
api_id, api_hash, session = user_session_config()
if not api_id or not api_hash or not session:
return None
try:
proxy_arg = None
if proxy and proxy.strip():
p = proxy.strip()
if p.startswith("socks5://") or p.startswith("socks4://"):
import socks
parts = p.replace("socks5://", "").replace("socks4://", "").split(":")
proxy_arg = (socks.SOCKS5 if "socks5" in p else socks.SOCKS4,
parts[0], int(parts[1]) if len(parts) > 1 else 1080)
elif p.startswith("http://") or p.startswith("https://"):
proxy_arg = p
else:
proxy_arg = p
client = TelegramClient(StringSession(session), int(api_id), api_hash, proxy=proxy_arg)
channel_media_clients[client_type] = client
return client
except Exception:
logger.exception("failed to create channel media client")
return None
async def disconnect_channel_media_client(client_type: str = "channel_media") -> None:
client = channel_media_clients.pop(client_type, None)
if client:
try:
await client.disconnect()
except Exception:
pass
def channel_media_monitors_all() -> list[dict[str, Any]]:
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM channel_media_monitors ORDER BY id DESC").fetchall()
return [dict(r) for r in rows]
def channel_media_monitor_get(monitor_id: int) -> dict[str, Any] | None:
with closing(db()) as conn:
row = conn.execute("SELECT * FROM channel_media_monitors WHERE id=?", (monitor_id,)).fetchone()
return dict(row) if row else None
def channel_media_monitor_create(
channel_id: int,
channel_title: str,
channel_username: str = "",
media_types: str = "video,document",
keywords: str = "",
max_file_size_mb: int = 2000,
download_dir: str = "",
notify_telegram: bool = True,
proxy: str = "",
date_from: str = "",
date_to: str = "",
max_concurrent: int = 3,
forward_mode: bool = False,
forward_to: str = "admin",
) -> int:
ts = now_iso()
with closing(db()) as conn:
cur = conn.execute(
"""INSERT INTO channel_media_monitors
(channel_id, channel_title, channel_username, status, media_types, keywords,
max_file_size_mb, download_dir, notify_telegram, proxy, date_from, date_to,
max_concurrent, forward_mode, forward_to, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(channel_id, channel_title, channel_username, "active", media_types, keywords,
max_file_size_mb, download_dir, 1 if notify_telegram else 0,
proxy, date_from, date_to, max_concurrent, 1 if forward_mode else 0,
forward_to, ts, ts),
)
conn.commit()
return int(cur.lastrowid)
def channel_media_monitor_update(monitor_id: int, **kwargs: Any) -> None:
allowed = {"status", "media_types", "keywords", "max_file_size_mb", "download_dir",
"last_message_id", "notify_telegram", "total_downloaded", "total_size_bytes",
"proxy", "date_from", "date_to", "max_concurrent", "forward_mode", "forward_to"}
updates = []
values = []
for k, v in kwargs.items():
if k in allowed:
updates.append(f"{k}=?")
values.append(v)
if not updates:
return
updates.append("updated_at=?")
values.append(now_iso())
values.append(monitor_id)
with closing(db()) as conn:
conn.execute(f"UPDATE channel_media_monitors SET {', '.join(updates)} WHERE id=?", values)
conn.commit()
def channel_media_monitor_delete(monitor_id: int) -> None:
with closing(db()) as conn:
conn.execute("DELETE FROM channel_media_downloads WHERE monitor_id=?", (monitor_id,))
conn.execute("DELETE FROM channel_media_monitors WHERE id=?", (monitor_id,))
conn.commit()
def channel_media_download_record(
monitor_id: int, channel_id: int, message_id: int,
media_type: str, file_name: str, file_path: str,
file_size: int, caption: str, sender_id: int, status: str = "completed",
) -> int:
ts = now_iso()
with closing(db()) as conn:
cur = conn.execute(
"""INSERT INTO channel_media_downloads
(monitor_id, channel_id, message_id, media_type, file_name, file_path,
file_size, caption, sender_id, status, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
(monitor_id, channel_id, message_id, media_type, file_name, file_path,
file_size, caption, sender_id, status, ts),
)
conn.commit()
return int(cur.lastrowid)
def channel_media_downloads_list(monitor_id: int | None = None, limit: int = 100) -> list[dict[str, Any]]:
with closing(db()) as conn:
if monitor_id:
rows = conn.execute(
"SELECT * FROM channel_media_downloads WHERE monitor_id=? ORDER BY id DESC LIMIT ?",
(monitor_id, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM channel_media_downloads ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def channel_media_download_exists(channel_id: int, message_id: int) -> bool:
with closing(db()) as conn:
row = conn.execute(
"SELECT id FROM channel_media_downloads WHERE channel_id=? AND message_id=?",
(channel_id, message_id),
).fetchone()
return row is not None
async def telethon_list_dialogs(limit: int = 500) -> list[dict[str, Any]]:
client = get_or_create_channel_media_client()
if not client:
return []
try:
if not client.is_connected():
await client.connect()
if not await client.is_user_authorized():
return []
result = []
count = 0
async for dialog in client.iter_dialogs(limit=limit):
entity = dialog.entity
chat_type = "unknown"
if dialog.is_group:
chat_type = "group"
elif dialog.is_channel:
chat_type = "channel"
elif dialog.is_user:
chat_type = "user"
result.append({
"id": dialog.id,
"title": dialog.name or "",
"username": getattr(entity, "username", "") or "",
"type": chat_type,
"unread_count": dialog.unread_count,
})
count += 1
logger.info("listed %d dialogs", count)
return result
except Exception:
logger.exception("telethon_list_dialogs failed")
return []
async def telethon_search_dialogs(query: str, limit: int = 100) -> list[dict[str, Any]]:
client = get_or_create_channel_media_client()
if not client:
return []
try:
if not client.is_connected():
await client.connect()
if not await client.is_user_authorized():
return []
result = []
q = query.strip().lower()
async for dialog in client.iter_dialogs(limit=500):
name = (dialog.name or "").lower()
username = str(getattr(dialog.entity, "username", "") or "").lower()
if q in name or q in username or query.strip() in str(dialog.id):
entity = dialog.entity
chat_type = "group" if dialog.is_group else ("channel" if dialog.is_channel else ("user" if dialog.is_user else "unknown"))
result.append({
"id": dialog.id,
"title": dialog.name or "",
"username": getattr(entity, "username", "") or "",
"type": chat_type,
"unread_count": dialog.unread_count,
})
if len(result) >= limit:
break
return result
except Exception:
logger.exception("telethon_search_dialogs failed")
return []
def _detect_media_type(media: Any, allowed_types: set[str]) -> str:
if not media:
return ""
if hasattr(media, "video") or hasattr(media, "document')"):
pass
from telethon.tl.types import (
DocumentAttributeVideo,
DocumentAttributeAudio,
DocumentAttributeFilename,
)
if hasattr(media, "photo"):
if "photo" not in allowed_types:
return ""
return "photo"
if hasattr(media, "document"):
doc = media.document
if doc is None:
return ""
mime = str(getattr(doc, "mime_type", "") or "")
for attr in (getattr(doc, "attributes", None) or []):
if isinstance(attr, DocumentAttributeVideo):
if "video" not in allowed_types:
return ""
return "video"
if isinstance(attr, DocumentAttributeAudio):
if "audio" not in allowed_types:
return ""
return "audio"
if mime.startswith("video/"):
if "video" not in allowed_types:
return ""
return "video"
if mime.startswith("audio/"):
if "audio" not in allowed_types:
return ""
return "audio"
if "document" in allowed_types:
return "document"
return ""
if "document" in allowed_types:
return "document"
return ""
def _get_media_size(media: Any) -> int:
if hasattr(media, "document") and media.document:
return int(getattr(media.document, "size", 0) or 0)
if hasattr(media, "photo"):
return 0
return 0
async def channel_media_monitor_loop() -> None:
while True:
await asyncio.sleep(300)
try:
monitors = channel_media_monitors_all()
active_monitors = [m for m in monitors if m.get("status") == "active"]
if not active_monitors:
continue
for monitor in active_monitors:
try:
await telethon_download_from_channel(int(monitor["id"]))
except Exception:
logger.exception("channel media monitor failed id=%s", monitor.get("id"))
except Exception:
logger.exception("channel_media_monitor_loop error")
finally:
await disconnect_channel_media_client()
async def channel_media_forward_listener() -> None:
"""Real-time listener: forward messages from monitored groups to admin Telegram."""
if TelegramClient is None or StringSession is None:
logger.info("channel media forward listener skipped: telethon not installed")
return
api_id_raw, api_hash, session = user_session_config()
if not api_id_raw or not api_hash or not session:
logger.info("channel media forward listener skipped: user session not configured")
return
try:
api_id = int(api_id_raw)
except Exception:
return
try:
client = TelegramClient(StringSession(session), api_id, api_hash)
def _get_forward_monitors() -> dict[int, dict[str, Any]]:
monitors = channel_media_monitors_all()
result = {}
for m in monitors:
if m.get("status") != "active" or not m.get("forward_mode"):
continue
try:
cid = int(m["channel_id"])
result[cid] = m
except (TypeError, ValueError):
continue
return result
@client.on(events.NewMessage(incoming=True))
async def on_new_message_for_forward(event: Any) -> None:
try:
chat_id = getattr(event, "chat_id", None)
if chat_id is None:
return
fwd_monitors = _get_forward_monitors()
monitor = fwd_monitors.get(int(chat_id))
if not monitor:
return
msg = event.message
text = (msg.text or "") + " " + (msg.caption or "")
# Keyword filter
keywords_str = str(monitor.get("keywords") or "").strip()
if keywords_str:
keywords_list = [k.strip() for k in keywords_str.split(",") if k.strip()]
if keywords_list and not any(k.lower() in text.lower() for k in keywords_list):
return
# Media type filter
media_types_str = str(monitor.get("media_types") or "").strip()
if media_types_str:
allowed = {t.strip().lower() for t in media_types_str.split(",") if t.strip()}
if allowed and not msg.media:
has_text_only = bool(text.strip())
if not has_text_only:
return
# Determine forward target
forward_to = str(monitor.get("forward_to") or "admin").strip()
if forward_to == "admin":
targets = all_admin_chat_ids()
elif forward_to == "saved":
targets = ["me"]
else:
try:
targets = [int(forward_to)]
except (TypeError, ValueError):
targets = all_admin_chat_ids()
# Forward the message
for target in targets:
try:
await msg.forward_to(target)
except Exception:
logger.exception("forward failed target=%s msg_id=%s", target, msg.id)
logger.info("forwarded message from chat=%s msg_id=%s to %s", chat_id, msg.id, targets)
except Exception:
logger.exception("on_new_message_for_forward error")
await client.start()
logger.info("channel media forward listener started")
await client.run_until_disconnected()
except asyncio.CancelledError:
logger.info("channel media forward listener cancelled")
raise
except Exception:
logger.exception("channel media forward listener crashed")
async def telethon_download_from_channel(monitor_id: int, download_history: bool = False) -> int:
monitor = channel_media_monitor_get(monitor_id)
if not monitor:
return 0
proxy = str(monitor.get("proxy") or "").strip()
client = get_or_create_channel_media_client(proxy=proxy)
if not client:
logger.warning("channel media client not available")
return 0
try:
if not client.is_connected():
await client.connect()
if not await client.is_user_authorized():
logger.warning("channel media client not authorized")
return 0
channel_id = int(monitor["channel_id"])
try:
entity = await client.get_entity(channel_id)
except Exception:
logger.warning("cannot resolve channel entity id=%s", channel_id)
return 0
media_types_str = str(monitor.get("media_types") or "video,document")
allowed_types = {t.strip().lower() for t in media_types_str.split(",") if t.strip()}
keywords_str = str(monitor.get("keywords") or "").strip()
keywords_list = [k.strip() for k in keywords_str.split(",") if k.strip()] if keywords_str else []
max_size = int(monitor.get("max_file_size_mb") or 2000) * 1024 * 1024
base_dir = str(monitor.get("download_dir") or "").strip()
if not base_dir:
base_dir = str(BASE_DIR / "channel_downloads" / str(channel_id))
os.makedirs(base_dir, exist_ok=True)
last_msg_id = int(monitor.get("last_message_id") or 0)
offset_id = last_msg_id if not download_history else 0
# Date filtering
from datetime import datetime as dt_type
offset_date = None
date_from_str = str(monitor.get("date_from") or "").strip()
date_to_str = str(monitor.get("date_to") or "").strip()
if date_from_str:
try:
offset_date = dt_type.fromisoformat(date_from_str)
except Exception:
pass
# Collect messages to download
messages_to_download = []
async for message in client.iter_messages(
entity, limit=500, offset_id=offset_id,
offset_date=offset_date, reverse=bool(offset_date),
):
if offset_id > 0 and message.id <= offset_id and not offset_date:
break
# Date range check
if date_to_str and message.date:
try:
dt_to = dt_type.fromisoformat(date_to_str)
if message.date.replace(tzinfo=None) > dt_to:
continue
except Exception:
pass
if channel_media_download_exists(channel_id, message.id):
continue
if keywords_list:
msg_text = (message.text or "") + " " + (message.caption or "")
if not any(k.lower() in msg_text.lower() for k in keywords_list):
continue
media = message.media
if not media:
continue
media_type = _detect_media_type(media, allowed_types)
if not media_type:
continue
file_size = _get_media_size(media)
if file_size and file_size > max_size:
continue
messages_to_download.append((message, media_type, file_size))
# Concurrent download with semaphore
max_concurrent = max(1, int(monitor.get("max_concurrent") or 3))
semaphore = asyncio.Semaphore(max_concurrent)
count = 0
total_size_added = 0
async def download_one(msg: Any, mt: str, fs: int) -> tuple[int, int]:
nonlocal count, total_size_added
async with semaphore:
caption = (msg.text or msg.caption or "")[:500]
sender_id = msg.sender_id or 0
file_name = ""
for attr in (getattr(getattr(msg.media, "document", None), "attributes", None) or []):
if hasattr(attr, "file_name") and attr.file_name:
file_name = attr.file_name
break
if not file_name:
file_name = f"{channel_id}_{msg.id}"
ext_map = {"video": ".mp4", "photo": ".jpg", "audio": ".mp3", "document": ".bin"}
file_name += ext_map.get(mt, ".bin")
file_path = ospath.join(base_dir, file_name)
if ospath.exists(file_path):
file_path = ospath.join(base_dir, f"{channel_id}_{msg.id}_{file_name}")
# Resume: use .part file
part_path = file_path + ".part"
try:
# Check for existing partial download
existing_size = ospath.getsize(part_path) if ospath.exists(part_path) else 0
if existing_size > 0 and fs and existing_size >= fs:
# Already fully downloaded as .part, just rename
os.rename(part_path, file_path)
else:
await client.download_media(msg, file=part_path)
if ospath.exists(part_path):
os.rename(part_path, file_path)
else:
return (0, 0)
actual_size = ospath.getsize(file_path) if ospath.exists(file_path) else fs
channel_media_download_record(
monitor_id, channel_id, msg.id,
mt, file_name, file_path,
actual_size, caption, sender_id,
)
logger.info("downloaded channel media: channel=%s msg=%s type=%s", channel_id, msg.id, mt)
return (1, actual_size)
except Exception:
logger.exception("download failed channel=%s msg=%s", channel_id, msg.id)
return (0, 0)
# Run all downloads concurrently
tasks = [download_one(msg, mt, fs) for msg, mt, fs in messages_to_download]
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, tuple):
c, s = r
count += c
total_size_added += s
new_total = int(monitor.get("total_downloaded") or 0) + count
new_size = int(monitor.get("total_size_bytes") or 0) + total_size_added
update_kwargs: dict[str, Any] = {
"total_downloaded": new_total,
"total_size_bytes": new_size,
}
async for last_msg in client.iter_messages(entity, limit=1):
update_kwargs["last_message_id"] = last_msg.id
break
channel_media_monitor_update(monitor_id, **update_kwargs)
if count > 0 and monitor.get("notify_telegram"):
title = monitor.get("channel_title") or str(channel_id)
await admin_send(
f"[频道媒体下载] {html_escape(title)}\n"
f"新增 {count} 个文件,共 {total_size_added // 1024 // 1024} MB\n"
f"累计:{new_total} 个文件,{new_size // 1024 // 1024} MB"
)
return count
except Exception:
logger.exception("telethon_download_from_channel failed monitor_id=%s", monitor_id)
return 0
def group_message_text(message: Message) -> str:
parts = [message.text or "", message.caption or ""]
if getattr(message, "reply_to_message", None):
@@ -2259,7 +2842,7 @@ pre{{white-space:pre-wrap;background:#121212;color:#fff;padding:13px;border:4px
@media (prefers-reduced-motion: reduce){{
*,*::before,*::after{{animation:none!important;transition:none!important}}
}}
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><section><b>消息</b><a href='/inbox'>收件箱</a><a href='/users'>用户管理</a><a href='/send'>主动发消息</a><a href='/replies'>快捷回复</a><a href='/rules'>私聊广告拦截</a></section><section><b>监控</b><a href='/'>监控面板</a><a href='/monitor/new'>新增监控</a><a href='/group-monitors'>TG 群监听</a><a href='/monitor/events'>推送历史</a><a href='/run-once'>手动检查</a></section><section><b>配置</b><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/config/export'>导出配置</a></section><section><b>系统</b><a href='/update'>更新代码</a><a href='/logs'>运行日志</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></section></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
</style></head><body><div class=shell><aside><div class=brand><div class=mark><i></i></div><div><b>tg-watchbot</b><small>Telegram 自动化</small></div></div><nav><section><b>消息</b><a href='/inbox'>收件箱</a><a href='/users'>用户管理</a><a href='/send'>主动发消息</a><a href='/replies'>快捷回复</a><a href='/rules'>私聊广告拦截</a></section><section><b>监控</b><a href='/'>监控面板</a><a href='/monitor/new'>新增监控</a><a href='/group-monitors'>TG 群监听</a><a href='/channel-media'>频道媒体</a><a href='/monitor/events'>推送历史</a><a href='/run-once'>手动检查</a></section><section><b>配置</b><a href='/settings'>Bot / 面板设置</a><a href='/yaml'>YAML 高级编辑</a><a href='/config/export'>导出配置</a></section><section><b>系统</b><a href='/update'>更新代码</a><a href='/logs'>运行日志</a><a href='/restart' onclick='return confirm("确定重启机器人服务?")'>重启机器人</a><a class=logout href='/logout'>退出登录</a></section></nav></aside><main><div class=top><h1>{html_escape(title)}</h1><span class=badge>WatchBot Panel</span></div>
{body}<div class=friend-links><b>友链</b><a href='https://linux.do' target='_blank' rel='noopener noreferrer'>Linux.do</a><span>·</span><a href='https://www.nodeseek.com' target='_blank' rel='noopener noreferrer'>NodeSeek</a></div></main></div></body></html>"""
@@ -3179,6 +3762,237 @@ HostLoc|https://hostloc.com|VPS,补货,优惠"""
async def health() -> str:
return "ok"
# ---- Channel Media Monitoring Routes ----
def channel_media_page_html() -> str:
monitors = channel_media_monitors_all()
session_ok = user_session_ready()
notice = ""
if TelegramClient is None:
notice = "<div class=msg>未安装 telethon,频道媒体功能不可用。</div>"
elif not session_ok:
notice = "<div class=msg>TG_API_ID / TG_API_HASH / TG_API_SESSION 未完整填写,请先在「设置」中配置。</div>"
cards_html = ""
for m in monitors:
status = m.get("status", "active")
if status == "active":
badge_html = '<span class="badge" style="background:#22c55e">运行中</span>'
elif status == "paused":
badge_html = '<span class="badge" style="background:#eab308">已暂停</span>'
else:
badge_html = '<span class="badge" style="background:#6b7280">已停止</span>'
username = f"@{m.get('channel_username', '')}" if m.get("channel_username") else ""
pause_btn = ""
if status == "active":
pause_btn = f"<a class='btn' href='/channel-media/{m['id']}/pause'>暂停</a>"
elif status == "paused":
pause_btn = f"<a class='btn ok' href='/channel-media/{m['id']}/resume'>恢复</a>"
proxy_info = " · 代理: " + html_escape(m.get("proxy", "")) if m.get("proxy") else ""
cards_html += f"""<div class=card style='margin:12px 0'>
<div style='display:flex;justify-content:space-between;align-items:center;flex-wrap:wrap;gap:10px'>
<div><h3 style='margin:0 0 4px'>{badge_html} {html_escape(m.get('channel_title',''))} {html_escape(username)}</h3>
<small class=muted>chat_id: {m.get('channel_id','')} · 转发: {'开启' if m.get('forward_mode') else '关闭'}{proxy_info}</small></div>
<div class=actions>
{pause_btn}
<a class='btn danger' href='/channel-media/{m['id']}/delete' onclick='return confirm("确定删除该监控?")'>删除</a>
</div></div></div>"""
return f"""<div class=card>
<div class=toolbar><div><h2 style='margin:0 0 6px'>频道媒体转发</h2>
<p class=muted style='margin:0'>使用你的 TG 账号监听群组,消息实时转发到你的 Telegram。</p></div>
<div class=actions><button class='btn primary' onclick='document.getElementById("addModal").style.display="flex"'>添加频道/群组</button></div></div>
{notice}{cards_html}</div>
<div id='addModal' style='display:none;position:fixed;inset:0;background:rgba(0,0,0,.5);z-index:100;justify-content:center;align-items:flex-start;padding-top:60px'>
<div class=card style='max-width:640px;width:95%;max-height:80vh;overflow:auto;position:relative'>
<h2>添加频道/群组监控</h2>
<p class=muted>搜索并选择你已加入的频道或群组。</p>
<div style='margin-bottom:16px'>
<input id='groupSearch' placeholder='输入名称搜索...' oninput='filterGroups()' style='width:100%'>
</div>
<div id='groupList' style='max-height:300px;overflow:auto;border:3px solid var(--ink)'>
<div id='groupLoading' class=muted style='padding:16px;text-align:center'>正在加载群组列表...</div>
</div>
<div id='selectedGroup' style='display:none;margin-top:12px;padding:12px;background:var(--gray);border:3px solid var(--ink)'>
<b>已选择:</b> <span id='selectedName'></span>
<input type=hidden id='selChannelId' name='channel_id'>
<input type=hidden id='selChannelTitle' name='channel_title'>
<input type=hidden id='selChannelUsername' name='channel_username'>
</div>
<div style='margin-top:14px'>
<label>关键词过滤(逗号分隔,留空则转发所有消息)</label>
<input id='mediaKeywords' placeholder='留空则转发所有消息'>
<label>媒体类型过滤(逗号分隔,留空不限)</label>
<input id='mediaTypes' value='' placeholder='video,document,photo,audio(留空=所有类型)'>
<label>代理(留空不用,支持 socks5://host:port 或 http://host:port</label>
<input id='mediaProxy' placeholder='socks5://127.0.0.1:1080'>
<div class=check-row style='margin-top:8px'>
<label><input type=checkbox id='forwardMode' checked> 实时转发到 Telegram</label>
</div>
<div><label>转发目标</label>
<select id='forwardTo'><option value='admin'>管理员</option><option value='saved'>我的收藏(Saved Messages)</option></select></div>
</div>
<div class=form-actions>
<button class='btn primary' onclick='addMonitor()'>添加监控</button>
<button class='btn' onclick='document.getElementById("addModal").style.display="none"'>取消</button>
</div>
</div></div>
<script>
let allGroups = [];
async function loadGroups() {{
try {{
const resp = await fetch('/api/groups?limit=500');
const data = await resp.json();
allGroups = data.groups || [];
renderGroups(allGroups);
}} catch(e) {{
document.getElementById('groupLoading').textContent = '加载失败:' + e.message;
}}
}}
function renderGroups(groups) {{
const container = document.getElementById('groupList');
if (!groups.length) {{ container.innerHTML = '<div style="padding:16px" class=muted>未找到匹配项</div>'; return; }}
container.innerHTML = groups.map(g =>
`<div style="padding:10px 12px;border-bottom:1px solid #e0e0e0;cursor:pointer;display:flex;justify-content:space-between;align-items:center" onclick="selectGroup(${{g.id}},'${{g.title.replace(/'/g,"\\'")}}','${{(g.username||'').replace(/'/g,"\\'")}}')" onmouseover="this.style.background='#f0f0f0'" onmouseout="this.style.background='white'">
<div><b>${{g.title}}</b> ${{g.username ? '<small>@'+g.username+'</small>' : ''}}<br><small class=muted>${{g.type}} · id: ${{g.id}}</small></div>
<span class=badge>${{g.type}}</span></div>`
).join('');
}}
function filterGroups() {{
const q = document.getElementById('groupSearch').value.toLowerCase();
if (!q) {{ renderGroups(allGroups); return; }}
const filtered = allGroups.filter(g => g.title.toLowerCase().includes(q) || (g.username||'').toLowerCase().includes(q) || String(g.id).includes(q));
renderGroups(filtered);
}}
function selectGroup(id, title, username) {{
document.getElementById('selectedGroup').style.display = 'block';
document.getElementById('selectedName').textContent = title + (username ? ' @'+username : '') + ' (' + id + ')';
document.getElementById('selChannelId').value = id;
document.getElementById('selChannelTitle').value = title;
document.getElementById('selChannelUsername').value = username;
document.getElementById('addModal').querySelector('h2').textContent = '确认添加';
}}
async function addMonitor() {{
const channelId = document.getElementById('selChannelId').value;
if (!channelId) {{ alert('请先选择一个频道/群组'); return; }}
const body = new URLSearchParams({{
channel_id: channelId,
channel_title: document.getElementById('selChannelTitle').value,
channel_username: document.getElementById('selChannelUsername').value,
media_types: document.getElementById('mediaTypes').value,
keywords: document.getElementById('mediaKeywords').value,
proxy: document.getElementById('mediaProxy').value,
forward_mode: document.getElementById('forwardMode').checked ? 'on' : '',
forward_to: document.getElementById('forwardTo').value,
}});
try {{
const resp = await fetch('/api/monitors/create', {{ method: 'POST', headers: {{'Content-Type':'application/x-www-form-urlencoded'}}, body: body.toString() }});
if (resp.ok) {{ location.reload(); }}
else {{ const t = await resp.text(); alert('创建失败:' + t); }}
}} catch(e) {{ alert('创建失败:' + e.message); }}
}}
document.getElementById('addModal').addEventListener('click', function(e) {{ if (e.target === this) this.style.display='none'; }});
</script>"""
@app.get("/channel-media", response_class=HTMLResponse)
async def channel_media_page(_: str = Depends(panel_auth)) -> str:
return layout("频道媒体", channel_media_page_html())
@app.get("/api/groups")
async def api_groups(_: str = Depends(panel_auth), q: str = "", limit: int = 500) -> dict[str, Any]:
if not user_session_ready():
return {"groups": [], "error": "TG user session not configured"}
if q.strip():
groups = await telethon_search_dialogs(q.strip(), limit=min(limit, 200))
else:
groups = await telethon_list_dialogs(limit=min(limit, 500))
groups = [g for g in groups if g.get("type") in ("group", "channel")]
return {"groups": groups}
@app.post("/api/monitors/create")
async def api_monitor_create(
_: str = Depends(panel_auth),
channel_id: str = Form(...),
channel_title: str = Form(""),
channel_username: str = Form(""),
media_types: str = Form("video,document"),
keywords: str = Form(""),
max_file_size_mb: int = Form(2000),
download_dir: str = Form(""),
notify_telegram: str | None = Form(None),
proxy: str = Form(""),
date_from: str = Form(""),
date_to: str = Form(""),
max_concurrent: int = Form(3),
forward_mode: str | None = Form(None),
forward_to: str = Form("admin"),
) -> RedirectResponse:
try:
cid = int(channel_id.strip())
except (ValueError, TypeError):
raise HTTPException(400, "invalid channel_id")
channel_media_monitor_create(
cid,
channel_title.strip() or str(cid),
channel_username.strip(),
media_types.strip() or "video,document",
keywords.strip(),
max(1, max_file_size_mb),
download_dir.strip(),
bool(notify_telegram),
proxy.strip(),
date_from.strip(),
date_to.strip(),
max(1, min(10, max_concurrent)),
bool(forward_mode),
forward_to.strip() or "admin",
)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/pause")
async def channel_media_pause(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="paused")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/resume")
async def channel_media_resume(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="active")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/delete")
async def channel_media_delete_route(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_delete(monitor_id)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/check", response_class=HTMLResponse)
async def channel_media_check(monitor_id: int, _: str = Depends(panel_auth)) -> str:
count = await telethon_download_from_channel(monitor_id)
monitor = channel_media_monitor_get(monitor_id)
name = monitor.get("channel_title", "") if monitor else ""
return layout("检查完成", f"<div class=msg>已检查频道 {html_escape(name)},新增 {count} 个文件。</div><p><a class=btn href='/channel-media'>返回</a></p>")
@app.get("/channel-media/{monitor_id}/download", response_class=HTMLResponse)
async def channel_media_download_history(monitor_id: int, _: str = Depends(panel_auth)) -> str:
monitor = channel_media_monitor_get(monitor_id)
if not monitor:
raise HTTPException(404)
downloads = channel_media_downloads_list(monitor_id, limit=200)
rows = ""
for d in downloads:
size_mb = f"{(d.get('file_size', 0) or 0) / 1024 / 1024:.1f} MB"
rows += f"<tr><td>{d.get('id')}</td><td>{html_escape(d.get('media_type',''))}</td>"
rows += f"<td>{html_escape(d.get('file_name',''))}<br><small class=muted>{html_escape(d.get('caption','')[:100])}</small></td>"
rows += f"<td>{size_mb}</td><td><small>{html_escape(d.get('created_at',''))}</small></td></tr>"
total = monitor.get("total_downloaded", 0)
size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024
body = f"""<div class=card><h2>下载记录 - {html_escape(monitor.get('channel_title',''))}</h2>
<p class=muted>累计:{total} 个文件,{size_mb} MB</p>
<div class=actions style='margin-bottom:16px'>
<a class='btn ok' href='/channel-media/{monitor_id}/check'>立即下载新内容</a>
<a class='btn' href='/channel-media'>返回列表</a></div>
<table><tr><th>ID</th><th>类型</th><th>文件名/说明</th><th>大小</th><th>时间</th></tr>{rows}</table></div>"""
return layout("下载记录", body)
return app
@@ -3224,7 +4038,6 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
while True:
await asyncio.sleep(3600)
if run_once:
# If .env is filled, send notifications during manual test; otherwise just log.
try:
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
@@ -3262,6 +4075,11 @@ async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
)
else:
user_session_listener_task = asyncio.create_task(run_user_session_group_listener())
if TelegramClient is not None and user_session_ready():
asyncio.create_task(channel_media_forward_listener())
logger.info("channel media forward listener started")
else:
logger.info("channel media forward listener skipped: telethon not installed or user session not configured")
await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}")
logger.info("bot polling start")
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
+149
View File
@@ -0,0 +1,149 @@
# Telegram机器人:双向对话 Bot + 网页关键词推送 + 可视化面板 + tg群消息管理 + 广告屏蔽
## 资源荟萃
本帖使用社区开源推广,符合推广要求。我申明并遵循社区要求的以下内容:
- 我的帖子已经打上 `#开源推广` 标签:是
- 我的开源项目完整开源,无未开源部分:是
- 我的开源项目已链接认可 LINUX DO 社区:是
- 我帖子内的项目介绍,AI生成、润色内容部分已截图发出:是
- 以上选择我承诺是永久有效的,接受社区和佬友监督:是
以下为项目介绍正文内容,AI 生成、润色内容已使用截图方式发出。
## 更新日志
### 2026-05-22 更新
- TG 群监听功能增强:支持可视化配置监听规则、AI 总结参数与防刷屏策略。
- TG 群监听新增“已发现群聊”:自动显示 Bot 收到过消息的群聊 `chat_id`,可一键创建监听。
- 新增 `/update` 安全更新流程:显示本地/远端 commit、ahead/behind、工作区状态;仅允许 `ff-only` 更新。
- 更新前若检测到本地未提交改动,会拒绝更新;避免覆盖本地代码。
- 新增“回滚上次更新”按钮:更新前自动记录回滚点,可一键回滚并重启。
- TG 群监听 AI 总结新增可视化高级控制:`ai_prompt``ai_min_interval_seconds``ai_dedupe_window_seconds`
- TG 群监听增加限频和去重窗口,降低重复推送与 AI 调用成本;AI 失败时仍会回退模板摘要。
- 监控面板新增可观测状态:最近成功/失败时间、最近错误、耗时、推送数、连续失败次数。
### 2026-05-21 第二次更新
- Web 面板新增收件箱直接回复、用户管理、快捷回复、私聊广告拦截、监控推送历史、配置导入/导出。
- 收件箱改为完整双向对话记录:用户消息、Web 回复、TG 管理员回复都会显示。
- 用户管理页新增 Bot / 面板配置卡片,和设置页共用同一份配置;修改 Token、管理员 ID、端口、账号或密码后需要重启。
- `ADMIN_CHAT_ID` 支持最多 3 个管理员,用逗号分隔。
- 单个监控可关闭 Telegram 推送,只记录到 Web 推送历史。
### 2026-05-21
- 默认启动改为先启动 Web 面板:未填写 `TELEGRAM_BOT_TOKEN` / `ADMIN_CHAT_ID` 时,面板仍可打开,同时 Telegram 收发、监控推送不可用。
- 面板配置页可填写 Bot Token、管理员 ID、面板账号和清理策略;保存后需要重启服务让 Bot 配置生效。
- 修复到期消息无法删除的 bug:监控推送消息支持到期自动删除,默认 60 分钟。
- 保存配置时会保留 `WEB_PANEL_SESSION_SECRET`,避免保存后登录状态被重置。
- Web 面板界面和站点图标已更新优化。
(新版安装方法详见项目地址)
最近看 TG 上不少人说封号比较严重。再加上我作为一个刚入门的 MJJ,平时也想更快地获取信息,所以 vibe 了一个自用小工具:`tg-watchbot`
它是一个轻量级 Python 服务,把 Telegram 双向对话 Bot 和 Web/RSS 监控推送 合在一起,并提供网页端可视化管理面板。它适合放在 VPS、NAT 小鸡或者家里的小服务器上跑。
项目地址:
[GitHub - GongyiChuren/tg-watchbot](https://github.com/GongyiChuren/tg-watchbot)
## 为什么做这个
我最开始的需求很简单:
- 有一个自己的 Telegram 联系入口,别人私聊 Bot 后可以转发到管理员账号;
- 能实时获取 Linux.do、博客、商店页面等信息源;
- 命中关键词、新条目、价格变化或库存变化时,自动推送到 Telegram;
- 有网页面板,不用每次都手改配置文件。
另外,项目支持定时清理监控推送相关状态数据,避免网页监控信息过多,影响正常的双向对话使用。
## 主要功能
- 普通用户私聊 Bot,消息会转发给管理员
- 管理员可以直接回复用户,也可以主动发文字 / 图片
- 支持封禁、解封、备注、查看用户信息
- 支持 RSS / Atom 监控
- 支持网页 CSS selector 抓取
- 支持关键词、新条目、价格变化、库存变化提醒
- 内置 Web 管理面板
- 可以在面板里新增、编辑、删除监控
- 支持 RSS 模板
- 使用 SQLite 保存用户、消息、监控状态
- 支持 systemd 部署
## 展示
> 这里放截图
## AI 一句话安装
请克隆 `https://github.com/GongyiChuren/tg-watchbot.git` 到本机,默认使用 Docker 按 README 的 `Docker 安装(含自启)` 启动服务;若无 Docker,则按 `快速开始` 走 Python 和 systemd 部署直接安装。启动后打开 `http://127.0.0.1:8765`,安装完成后提醒用户记得在面板填写 `TELEGRAM_BOT_TOKEN``ADMIN_CHAT_ID` 并在之后保存并执行重启(Docker 用 `docker compose restart`,直接安装用重启进程)。
## Docker 安装(含自启)
```bash
git clone https://github.com/GongyiChuren/tg-watchbot.git tg-watchbot
cd tg-watchbot
cp .env.example .env
cp config.example.yaml config.yaml
touch tg-watchbot.sqlite3 tg-watchbot.log
docker compose up -d --build
```
Docker 会在容器内监听 `0.0.0.0:8765`,宿主机仍然打开 `http://127.0.0.1:8765`
查看状态与日志:
```bash
docker compose ps
docker compose logs -f
```
修改配置后重启:
```bash
docker compose restart
```
## systemd 部署
推荐部署到 `/opt/tg-watchbot`
```bash
sudo useradd --system --no-create-home --shell /usr/sbin/nologin tg-watchbot || true
sudo mkdir -p /opt/tg-watchbot
sudo chown -R "$USER:$USER" /opt/tg-watchbot
cd /opt/tg-watchbot
git clone https://github.com/GongyiChuren/tg-watchbot.git .
python3 -m venv .venv
./.venv/bin/pip install -U pip
./.venv/bin/pip install -r requirements.txt
cp .env.example .env
cp config.example.yaml config.yaml
# 先用前台模式打开面板,确认能登录和保存配置
./.venv/bin/python app.py
```
在服务器本机打开:
`http://127.0.0.1:8765`
默认账号来自 `.env.example`
- 用户名:admin
- 密码:change-me
## 注意事项
- Telegram Bot 不能主动私聊陌生人,对方必须先给 Bot 发过消息
- `.env` 里有 Token 和密码,不要提交到 GitHub
- Web 面板如果暴露到公网,建议套 Cloudflare Access / 反代鉴权
- RSS 监控建议 60 秒起步
- 网页监控建议更保守一点,避免对目标站造成压力
目前它还是一个自用小工具,目标是够轻、够直接、够容易部署。后续会继续修 bug 😋
可以的话点个 star 吧,谢谢佬们🙏🙏🙏
+119
View File
@@ -0,0 +1,119 @@
total = monitor.get("total_downloaded", 0)
size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024
body = f"""<div class=card><h2>下载记录 - {html_escape(monitor.get('channel_title',''))}</h2>
<p class=muted>累计:{total} 个文件,{size_mb} MB</p>
<div class=actions style='margin-bottom:16px'>
<a class='btn ok' href='/channel-media/{monitor_id}/check'>立即下载新内容</a>
<a class='btn' href='/channel-media'>返回列表</a></div>
<table><tr><th>ID</th><th>类型</th><th>文件名/说明</th><th>大小</th><th>时间</th></tr>{rows}</table></div>"""
return layout("下载记录", body)
return app
async def start_panel_server() -> uvicorn.Server | None:
if not panel_enabled():
logger.info("web panel disabled")
return None
host = os.getenv("WEB_PANEL_HOST", "127.0.0.1")
port = int(os.getenv("WEB_PANEL_PORT", "8765"))
server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info"))
asyncio.create_task(server.serve())
logger.info("web panel listening on http://%s:%s", host, port)
return server
def validate_env() -> tuple[str, int]:
load_dotenv(ENV_PATH)
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
admin = os.getenv("ADMIN_CHAT_ID", "").strip()
if not token:
raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}")
if not admin:
raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}")
ids = parse_admin_chat_ids(admin)
if not ids:
raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}")
return token, ids[0]
def bot_env_configured() -> bool:
load_dotenv(ENV_PATH, override=True)
return bool(os.getenv("TELEGRAM_BOT_TOKEN", "").strip() and os.getenv("ADMIN_CHAT_ID", "").strip())
async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task
load_dotenv(ENV_PATH, override=True)
config = load_config()
setup_logging(os.getenv("LOG_LEVEL", "INFO"))
init_db()
if panel_only:
await start_panel_server()
logger.info("panel-only mode start")
while True:
await asyncio.sleep(3600)
if run_once:
try:
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
except Exception as e:
logger.warning("run-once without Telegram notification: %s", e)
await run_all_monitors_once()
if bot:
await bot.session.close()
return
await start_panel_server()
if not bot_env_configured():
logger.warning(
"Telegram bot is not configured. Web panel is available, but Telegram polling, monitor notifications, and admin/user messaging will not work until TELEGRAM_BOT_TOKEN and ADMIN_CHAT_ID are saved, then the service is restarted."
)
while True:
await asyncio.sleep(3600)
token, admin_chat_id = validate_env()
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
dp.include_router(router)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
scheduler_ref = scheduler
schedule_monitors(scheduler)
scheduler.start()
asyncio.create_task(flush_pending_loop())
asyncio.create_task(cleanup_monitor_loop())
if group_monitors_need_user_session():
if TelegramClient is None:
logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed")
elif not user_session_ready():
logger.warning(
"group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete"
)
else:
user_session_listener_task = asyncio.create_task(run_user_session_group_listener())
if TelegramClient is not None and user_session_ready():
asyncio.create_task(channel_media_monitor_loop())
asyncio.create_task(channel_media_forward_listener())
logger.info("channel media download + forward listeners started")
else:
logger.info("channel media listeners skipped: telethon not installed or user session not configured")
await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}")
logger.info("bot polling start")
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent")
parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured")
args = parser.parse_args()
try:
asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only))
except KeyboardInterrupt:
pass
except Exception:
logger.exception("fatal error")
raise
if __name__ == "__main__":
main()
+103
View File
@@ -0,0 +1,103 @@
document.getElementById('addModal').addEventListener('click', function(e) { if (e.target === this) this.style.display='none'; });
</script>"""
@app.get("/channel-media", response_class=HTMLResponse)
async def channel_media_page(_: str = Depends(panel_auth)) -> str:
return layout("频道媒体", channel_media_page_html())
@app.get("/api/groups")
async def api_groups(_: str = Depends(panel_auth), q: str = "", limit: int = 500) -> dict[str, Any]:
if not user_session_ready():
return {"groups": [], "error": "TG user session not configured"}
if q.strip():
groups = await telethon_search_dialogs(q.strip(), limit=min(limit, 200))
else:
groups = await telethon_list_dialogs(limit=min(limit, 500))
groups = [g for g in groups if g.get("type") in ("group", "channel")]
return {"groups": groups}
@app.post("/api/monitors/create")
async def api_monitor_create(
_: str = Depends(panel_auth),
channel_id: str = Form(...),
channel_title: str = Form(""),
channel_username: str = Form(""),
media_types: str = Form("video,document"),
keywords: str = Form(""),
max_file_size_mb: int = Form(2000),
download_dir: str = Form(""),
notify_telegram: str | None = Form(None),
proxy: str = Form(""),
date_from: str = Form(""),
date_to: str = Form(""),
max_concurrent: int = Form(3),
forward_mode: str | None = Form(None),
forward_to: str = Form("admin"),
) -> RedirectResponse:
try:
cid = int(channel_id.strip())
except (ValueError, TypeError):
raise HTTPException(400, "invalid channel_id")
channel_media_monitor_create(
cid,
channel_title.strip() or str(cid),
channel_username.strip(),
media_types.strip() or "video,document",
keywords.strip(),
max(1, max_file_size_mb),
download_dir.strip(),
bool(notify_telegram),
proxy.strip(),
date_from.strip(),
date_to.strip(),
max(1, min(10, max_concurrent)),
bool(forward_mode),
forward_to.strip() or "admin",
)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/pause")
async def channel_media_pause(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="paused")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/resume")
async def channel_media_resume(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_update(monitor_id, status="active")
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/delete")
async def channel_media_delete_route(monitor_id: int, _: str = Depends(panel_auth)) -> RedirectResponse:
channel_media_monitor_delete(monitor_id)
return RedirectResponse("/channel-media", status_code=303)
@app.get("/channel-media/{monitor_id}/check", response_class=HTMLResponse)
async def channel_media_check(monitor_id: int, _: str = Depends(panel_auth)) -> str:
count = await telethon_download_from_channel(monitor_id)
monitor = channel_media_monitor_get(monitor_id)
name = monitor.get("channel_title", "") if monitor else ""
return layout("检查完成", f"<div class=msg>已检查频道 {html_escape(name)},新增 {count} 个文件。</div><p><a class=btn href='/channel-media'>返回</a></p>")
@app.get("/channel-media/{monitor_id}/download", response_class=HTMLResponse)
async def channel_media_download_history(monitor_id: int, _: str = Depends(panel_auth)) -> str:
monitor = channel_media_monitor_get(monitor_id)
if not monitor:
raise HTTPException(404)
downloads = channel_media_downloads_list(monitor_id, limit=200)
rows = ""
for d in downloads:
size_mb = f"{(d.get('file_size', 0) or 0) / 1024 / 1024:.1f} MB"
rows += f"<tr><td>{d.get('id')}</td><td>{html_escape(d.get('media_type',''))}</td>"
rows += f"<td>{html_escape(d.get('file_name',''))}<br><small class=muted>{html_escape(d.get('caption','')[:100])}</small></td>"
rows += f"<td>{size_mb}</td><td><small>{html_escape(d.get('created_at',''))}</small></td></tr>"
total = monitor.get("total_downloaded", 0)
size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024
body = f"""<div class=card><h2>下载记录 - {html_escape(monitor.get('channel_title',''))}</h2>
<p class=muted>累计:{total} 个文件,{size_mb} MB</p>
<div class=actions style='margin-bottom:16px'>
<a class='btn ok' href='/channel-media/{monitor_id}/check'>立即下载新内容</a>
<a class='btn' href='/channel-media'>返回列表</a></div>
<table><tr><th>ID</th><th>类型</th><th>文件名/说明</th><th>大小</th><th>时间</th></tr>{rows}</table></div>"""
return layout("下载记录", body)
return app