Update bot startup docs and monitor cleanup

This commit is contained in:
InfernoXuaI
2026-05-21 03:33:22 +08:00
parent b0c7099f9e
commit 73b9a954ad
4 changed files with 488 additions and 54 deletions
+225
View File
@@ -0,0 +1,225 @@
import asyncio
import os
import sqlite3
import sys
import tempfile
import unittest
from contextlib import closing
from pathlib import Path
from types import ModuleType, SimpleNamespace
def install_import_stubs() -> None:
class DummyRouter:
def message(self, *args, **kwargs):
def decorator(func):
return func
return decorator
def identity_factory(*args, **kwargs):
return object()
modules = {
"feedparser": ModuleType("feedparser"),
"httpx": ModuleType("httpx"),
"yaml": ModuleType("yaml"),
"uvicorn": ModuleType("uvicorn"),
"apscheduler": ModuleType("apscheduler"),
"apscheduler.schedulers": ModuleType("apscheduler.schedulers"),
"apscheduler.schedulers.asyncio": ModuleType("apscheduler.schedulers.asyncio"),
"bs4": ModuleType("bs4"),
"dotenv": ModuleType("dotenv"),
"aiogram": ModuleType("aiogram"),
"aiogram.enums": ModuleType("aiogram.enums"),
"aiogram.exceptions": ModuleType("aiogram.exceptions"),
"aiogram.filters": ModuleType("aiogram.filters"),
"aiogram.types": ModuleType("aiogram.types"),
"aiogram.client": ModuleType("aiogram.client"),
"aiogram.client.default": ModuleType("aiogram.client.default"),
"fastapi": ModuleType("fastapi"),
"fastapi.responses": ModuleType("fastapi.responses"),
}
modules["apscheduler.schedulers.asyncio"].AsyncIOScheduler = object
modules["bs4"].BeautifulSoup = object
modules["dotenv"].load_dotenv = lambda *args, **kwargs: None
modules["aiogram"].Bot = object
modules["aiogram"].Dispatcher = object
modules["aiogram"].F = object()
modules["aiogram"].Router = DummyRouter
modules["aiogram.enums"].ParseMode = SimpleNamespace(HTML="HTML")
modules["aiogram.exceptions"].TelegramAPIError = Exception
modules["aiogram.filters"].Command = identity_factory
modules["aiogram.filters"].CommandObject = object
modules["aiogram.types"].Message = object
modules["aiogram.client.default"].DefaultBotProperties = identity_factory
modules["fastapi"].Depends = identity_factory
modules["fastapi"].FastAPI = object
modules["fastapi"].Form = identity_factory
modules["fastapi"].HTTPException = Exception
modules["fastapi"].Request = object
modules["fastapi"].Response = object
modules["fastapi"].status = object()
modules["fastapi.responses"].HTMLResponse = object
modules["fastapi.responses"].RedirectResponse = object
modules["fastapi.responses"].PlainTextResponse = object
modules["uvicorn"].Server = object
modules["uvicorn"].Config = identity_factory
sys.modules.update({name: sys.modules.get(name, module) for name, module in modules.items()})
install_import_stubs()
import app
class FakeBot:
def __init__(self) -> None:
self.deleted: list[tuple[int, int]] = []
self.sent_texts: list[str] = []
async def delete_message(self, chat_id: int, message_id: int) -> None:
self.deleted.append((chat_id, message_id))
async def send_message(self, chat_id: int, text: str, disable_web_page_preview: bool = False):
self.sent_texts.append(text)
return SimpleNamespace(chat=SimpleNamespace(id=chat_id), message_id=3003)
class MonitorMessageCleanupTest(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.old_db_path = app.DB_PATH
app.DB_PATH = Path(self.temp_dir.name) / "test.sqlite3"
app.init_db()
def tearDown(self) -> None:
app.DB_PATH = self.old_db_path
self.temp_dir.cleanup()
def test_monitor_notification_send_is_recorded_for_later_deletion(self) -> None:
old_bot = app.bot
old_admin_chat_id = app.admin_chat_id
old_config = app.config
fake_bot = FakeBot()
app.bot = fake_bot
app.admin_chat_id = 1001
app.config = {"cleanup": {"monitor_message_delete_after_minutes": 1}}
try:
sent = asyncio.run(app.admin_send_monitor("monitor hit", "NodeSeek 新帖"))
self.assertTrue(sent)
self.assertEqual(["monitor hit"], fake_bot.sent_texts)
with closing(sqlite3.connect(app.DB_PATH)) as conn:
row = conn.execute(
"SELECT chat_id, message_id, monitor_name, delete_after_seconds FROM monitor_messages"
).fetchone()
self.assertEqual((1001, 3003, "NodeSeek 新帖", 60), row)
finally:
app.bot = old_bot
app.admin_chat_id = old_admin_chat_id
app.config = old_config
def test_expired_monitor_message_is_deleted_and_removed_from_queue(self) -> None:
sent_message = SimpleNamespace(chat=SimpleNamespace(id=1001), message_id=2002)
app.record_monitor_message(sent_message, "NodeSeek 新帖", delete_after_seconds=60, sent_at_ts=1000)
fake_bot = FakeBot()
deleted_count = asyncio.run(app.delete_expired_monitor_messages(fake_bot, now_ts=1061))
self.assertEqual(1, deleted_count)
self.assertEqual([(1001, 2002)], fake_bot.deleted)
with closing(sqlite3.connect(app.DB_PATH)) as conn:
remaining = conn.execute("SELECT COUNT(*) FROM monitor_messages").fetchone()[0]
self.assertEqual(0, remaining)
def test_unexpired_monitor_message_is_kept(self) -> None:
sent_message = SimpleNamespace(chat=SimpleNamespace(id=1001), message_id=2002)
app.record_monitor_message(sent_message, "NodeSeek 新帖", delete_after_seconds=60, sent_at_ts=1000)
fake_bot = FakeBot()
deleted_count = asyncio.run(app.delete_expired_monitor_messages(fake_bot, now_ts=1059))
self.assertEqual(0, deleted_count)
self.assertEqual([], fake_bot.deleted)
with closing(sqlite3.connect(app.DB_PATH)) as conn:
remaining = conn.execute("SELECT COUNT(*) FROM monitor_messages").fetchone()[0]
self.assertEqual(1, remaining)
class BotConfigurationTest(unittest.TestCase):
def test_bot_is_not_configured_without_token_or_admin_chat_id(self) -> None:
old_token = os.environ.pop("TELEGRAM_BOT_TOKEN", None)
old_admin = os.environ.pop("ADMIN_CHAT_ID", None)
try:
self.assertFalse(app.bot_env_configured())
finally:
if old_token is not None:
os.environ["TELEGRAM_BOT_TOKEN"] = old_token
if old_admin is not None:
os.environ["ADMIN_CHAT_ID"] = old_admin
def test_bot_is_configured_with_token_and_admin_chat_id(self) -> None:
old_token = os.environ.get("TELEGRAM_BOT_TOKEN")
old_admin = os.environ.get("ADMIN_CHAT_ID")
os.environ["TELEGRAM_BOT_TOKEN"] = "123456:test-token"
os.environ["ADMIN_CHAT_ID"] = "1001"
try:
self.assertTrue(app.bot_env_configured())
finally:
if old_token is None:
os.environ.pop("TELEGRAM_BOT_TOKEN", None)
else:
os.environ["TELEGRAM_BOT_TOKEN"] = old_token
if old_admin is None:
os.environ.pop("ADMIN_CHAT_ID", None)
else:
os.environ["ADMIN_CHAT_ID"] = old_admin
def test_write_env_values_preserves_existing_session_secret(self) -> None:
with tempfile.TemporaryDirectory() as temp_dir:
old_env_path = app.ENV_PATH
app.ENV_PATH = Path(temp_dir) / ".env"
app.ENV_PATH.write_text("WEB_PANEL_SESSION_SECRET=keep-me\n", encoding="utf-8")
try:
app.write_env_values({
"TELEGRAM_BOT_TOKEN": "123456:test-token",
"ADMIN_CHAT_ID": "1001",
"WEB_PANEL_USER": "admin",
"WEB_PANEL_PASSWORD": "change-me",
})
self.assertIn(
"WEB_PANEL_SESSION_SECRET=keep-me",
app.ENV_PATH.read_text(encoding="utf-8"),
)
finally:
app.ENV_PATH = old_env_path
class PanelHtmlContractTest(unittest.TestCase):
def test_login_form_keeps_expected_fields(self) -> None:
html = app.login_page()
self.assertIn("action=/login", html)
self.assertIn("name=username", html)
self.assertIn("name=password", html)
def test_monitor_form_keeps_backend_field_names(self) -> None:
html = app.monitor_form_html()
for expected in [
"action='/monitor/create'",
"name=name",
"name=mtype",
"name=url",
"name=interval_seconds",
"name=keywords",
"name=item_selector",
"name=title_selector",
"name=link_selector",
"name=keyword_match",
"name=new_item",
"name=price_change",
"name=stock_change",
]:
self.assertIn(expected, html)
if __name__ == "__main__":
unittest.main()