total = monitor.get("total_downloaded", 0) size_mb = (monitor.get("total_size_bytes", 0) or 0) // 1024 // 1024 body = f"""
""" return layout("下载记录", body) return app async def start_panel_server() -> uvicorn.Server | None: if not panel_enabled(): logger.info("web panel disabled") return None host = os.getenv("WEB_PANEL_HOST", "127.0.0.1") port = int(os.getenv("WEB_PANEL_PORT", "8765")) server = uvicorn.Server(uvicorn.Config(create_panel_app(), host=host, port=port, log_level="info")) asyncio.create_task(server.serve()) logger.info("web panel listening on http://%s:%s", host, port) return server def validate_env() -> tuple[str, int]: load_dotenv(ENV_PATH) token = os.getenv("TELEGRAM_BOT_TOKEN", "").strip() admin = os.getenv("ADMIN_CHAT_ID", "").strip() if not token: raise RuntimeError(f"TELEGRAM_BOT_TOKEN is missing in {ENV_PATH}") if not admin: raise RuntimeError(f"ADMIN_CHAT_ID is missing in {ENV_PATH}") ids = parse_admin_chat_ids(admin) if not ids: raise RuntimeError(f"ADMIN_CHAT_ID is invalid in {ENV_PATH}") return token, ids[0] def bot_env_configured() -> bool: load_dotenv(ENV_PATH, override=True) return bool(os.getenv("TELEGRAM_BOT_TOKEN", "").strip() and os.getenv("ADMIN_CHAT_ID", "").strip()) async def main_async(run_once: bool = False, panel_only: bool = False) -> None: global bot, admin_chat_id, admin_chat_ids, config, scheduler_ref, user_session_listener_task load_dotenv(ENV_PATH, override=True) config = load_config() setup_logging(os.getenv("LOG_LEVEL", "INFO")) init_db() if panel_only: await start_panel_server() logger.info("panel-only mode start") while True: await asyncio.sleep(3600) if run_once: try: token, admin_chat_id = validate_env() admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) except Exception as e: logger.warning("run-once without Telegram notification: %s", e) await run_all_monitors_once() if bot: await bot.session.close() return await start_panel_server() if not bot_env_configured(): logger.warning( "Telegram bot is not configured. Web panel is available, but Telegram polling, monitor notifications, and admin/user messaging will not work until TELEGRAM_BOT_TOKEN and ADMIN_CHAT_ID are saved, then the service is restarted." ) while True: await asyncio.sleep(3600) token, admin_chat_id = validate_env() admin_chat_ids = parse_admin_chat_ids(os.getenv("ADMIN_CHAT_ID", "")) bot = Bot(token=token, default=DefaultBotProperties(parse_mode=ParseMode.HTML)) dp = Dispatcher() dp.include_router(router) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") scheduler_ref = scheduler schedule_monitors(scheduler) scheduler.start() asyncio.create_task(flush_pending_loop()) asyncio.create_task(cleanup_monitor_loop()) if group_monitors_need_user_session(): if TelegramClient is None: logger.warning("group monitor with listen_source=user_session detected, but telethon is not installed") elif not user_session_ready(): logger.warning( "group monitor with listen_source=user_session detected, but TG_API_ID/TG_API_HASH/TG_API_SESSION is not complete" ) else: user_session_listener_task = asyncio.create_task(run_user_session_group_listener()) if TelegramClient is not None and user_session_ready(): asyncio.create_task(channel_media_monitor_loop()) asyncio.create_task(channel_media_forward_listener()) logger.info("channel media download + forward listeners started") else: logger.info("channel media listeners skipped: telethon not installed or user session not configured") await admin_send(f"tg-watchbot 已启动\n时间:{now_iso()}") logger.info("bot polling start") await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--run-once", action="store_true", help="run all monitors once and exit; does not need Telegram token unless notification is sent") parser.add_argument("--panel-only", action="store_true", help="start only the web admin panel, useful before Telegram token is configured") args = parser.parse_args() try: asyncio.run(main_async(run_once=args.run_once, panel_only=args.panel_only)) except KeyboardInterrupt: pass except Exception: logger.exception("fatal error") raise if __name__ == "__main__": main()