928f54e923
自身群监控
120 lines
5.2 KiB
Plaintext
120 lines
5.2 KiB
Plaintext
total = monitor.get("total_downloaded", 0)
|
|
size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024
|
|
body = f"""<div class=card><h2>下载记录 - {html_escape(monitor.get('channel_title',''))}</h2>
|
|
<p class=muted>累计:{total} 个文件,{size_mb} MB</p>
|
|
<div class=actions style='margin-bottom:16px'>
|
|
<a class='btn ok' href='/channel-media/{monitor_id}/check'>立即下载新内容</a>
|
|
<a class='btn' href='/channel-media'>返回列表</a></div>
|
|
<table><tr><th>ID</th><th>类型</th><th>文件名/说明</th><th>大小</th><th>时间</th></tr>{rows}</table></div>"""
|
|
return layout("下载记录", body)
|
|
|
|
return app
|
|
|
|
|
|
async def start_panel_server() -> uvicorn.Server | None:
|
|
if not panel_enabled():
|
|
logger.info("web panel disabled")
|
|
return None
|
|
host = os.getenv("WEB_PANEL_HOST", "127.0.0.1")
|
|
port = int(os.getenv("WEB_PANEL_PORT", "8765"))
|
|
server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info"))
|
|
asyncio.create_task(server.serve())
|
|
logger.info("web panel listening on http://%s:%s", host, port)
|
|
return server
|
|
|
|
def validate_env() -> tuple[str, int]:
|
|
load_dotenv(ENV_PATH)
|
|
token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
|
|
admin = os.getenv("ADMIN_CHAT_ID", "").strip()
|
|
if not token:
|
|
raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}")
|
|
if not admin:
|
|
raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}")
|
|
ids = parse_admin_chat_ids(admin)
|
|
if not ids:
|
|
raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}")
|
|
return token, ids[0]
|
|
|
|
|
|
def bot_env_configured() -> bool:
|
|
load_dotenv(ENV_PATH, override=True)
|
|
return bool(os.getenv("TELEGRAM_BOT_TOKEN", "").strip() and os.getenv("ADMIN_CHAT_ID", "").strip())
|
|
|
|
|
|
async def main_async(run_once: bool = False, panel_only: bool = False) -> None:
|
|
global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task
|
|
load_dotenv(ENV_PATH, override=True)
|
|
config = load_config()
|
|
setup_logging(os.getenv("LOG_LEVEL", "INFO"))
|
|
init_db()
|
|
if panel_only:
|
|
await start_panel_server()
|
|
logger.info("panel-only mode start")
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
if run_once:
|
|
try:
|
|
token, admin_chat_id = validate_env()
|
|
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
|
|
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
|
except Exception as e:
|
|
logger.warning("run-once without Telegram notification: %s", e)
|
|
await run_all_monitors_once()
|
|
if bot:
|
|
await bot.session.close()
|
|
return
|
|
await start_panel_server()
|
|
if not bot_env_configured():
|
|
logger.warning(
|
|
"Telegram bot is not configured. Web panel is available, but Telegram polling, monitor notifications, and admin/user messaging will not work until TELEGRAM_BOT_TOKEN and ADMIN_CHAT_ID are saved, then the service is restarted."
|
|
)
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
token, admin_chat_id = validate_env()
|
|
admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", ""))
|
|
bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
|
|
dp = Dispatcher()
|
|
dp.include_router(router)
|
|
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
|
|
scheduler_ref = scheduler
|
|
schedule_monitors(scheduler)
|
|
scheduler.start()
|
|
asyncio.create_task(flush_pending_loop())
|
|
asyncio.create_task(cleanup_monitor_loop())
|
|
if group_monitors_need_user_session():
|
|
if TelegramClient is None:
|
|
logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed")
|
|
elif not user_session_ready():
|
|
logger.warning(
|
|
"group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete"
|
|
)
|
|
else:
|
|
user_session_listener_task = asyncio.create_task(run_user_session_group_listener())
|
|
if TelegramClient is not None and user_session_ready():
|
|
asyncio.create_task(channel_media_monitor_loop())
|
|
asyncio.create_task(channel_media_forward_listener())
|
|
logger.info("channel media download + forward listeners started")
|
|
else:
|
|
logger.info("channel media listeners skipped: telethon not installed or user session not configured")
|
|
await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}")
|
|
logger.info("bot polling start")
|
|
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent")
|
|
parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured")
|
|
args = parser.parse_args()
|
|
try:
|
|
asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except Exception:
|
|
logger.exception("fatal error")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|