refactor(scheduling): 统一三套定时调度为 NightlyJob 流水线

替代 scanLoop / crawlerLoop / Migrator.Run 三个并行的周期循环为单一 nightly.Runner,
每天 cron_hour(默认 01:00)串行跑一条流水线:

  Phase 1  扫所有非 spider91 / 非 localupload 网盘
           → 检测新增视频 + 检测被删视频(清理 catalog 行 + 本地封面/teaser)
           → 入队封面 + teaser(per-drive teaser_enabled 决定 teaser 是否入队)
           → 等所有 thumb / teaser worker 队列 idle
  Phase 2  仅当存在 spider91 drive:跑 91 爬虫,新视频入队 teaser
           → 等 teaser 队列 idle
  Phase 3  spider91 → 云盘迁移(PikPak/115 一次性 sweep)

关键属性:
  - 6h 软超时(nightly.max_duration);到点 phase 跑完,后续 phase 不启动
  - 当天去重:last_run_date 持久化到 settings 表,进程崩溃重启不重复跑
  - sync.Mutex.TryLock 保证手动触发与自然 cron 触发互斥
  - 每 phase 边界检查 ctx.Err,不强 kill 进行中的 ffmpeg / 上传
  - 单 drive '重扫' 和 spider91 '立即抓取' 按钮保留
  - 顶栏新增 '立即跑全流程' 按钮 (POST /admin/api/jobs/nightly/run)

附带优化:
  - preview.Worker / ThumbWorker 增加 WaitIdle(ctx) error,nightly 用作同步屏障
  - scanner 增加 30s 心跳进度日志,避免长扫盘内部黑盒
    格式: [scanner] drive=X progress: scanned=N added=K errors=E dirs=M elapsed=Ts at=<dir>
  - cleanupMissingDriveVideos 从 PikPak-only 扩展到所有云盘 kind
    (保留 stats.Errors==0 闸门避免 API 抖动误删)
  - Migrator 移除周期 ticker / Trigger 通道,改成可单独调用的 RunOnce
    (captcha cooldown 状态机仍保留,跨 RunOnce 持久 5 分钟)

废弃 (字段保留以兼容旧 yaml):
  - scanner.interval_seconds   (替代为 nightly.cron_hour 调度)
  - spider91 drive 的 crawl_hour 凭证字段 (last_crawl_at 仅作 admin UI 显示)

测试:go test ./... 全绿 (含 nightly 包 ~320 行单元测试);npm run build 通过。
This commit is contained in:
nianzhibai
2026-05-27 13:17:44 +08:00
parent ebd6943a10
commit 1eeebbf305
17 changed files with 1138 additions and 406 deletions
+45 -6
View File
@@ -5,7 +5,7 @@
- 前端:React 18 + Vite + TypeScript
- 后端:Go 1.23SQLite(纯 Go 驱动,无 CGO),ffmpeg 生成 teaser 和封面
- 网盘接入:夸克自研 + 115driver SDK + PikPak 自研(参考 OpenList+ wopan-sdk-go SDK + OneDriveOpenList 在线续期 + Microsoft Graph 文件接口)
- 爬虫接入:91 爬虫(`91VideoSpider/spider_91porn.py`每天凌晨拉一页视频 + 封面到本地)
- 爬虫接入:91 爬虫(`91VideoSpider/spider_91porn.py`由凌晨流水线触发,拉一页视频 + 封面到本地)
## 当前功能
@@ -279,7 +279,7 @@ OneDrive 当前采用 OpenList 在线 API 的续期方式,不要求用户提
| 字段 | 默认值 | 说明 |
|---|---|---|
| `target_new` | `15` | 每次爬取的新视频数。从 page 1 起翻页,跳过已知 viewkey,凑够这么多个新视频后停止 |
| `crawl_hour` | `0` | 0-23,整点触发的小时;默认 00:00-00:59 之间触发 |
| `crawl_hour` | `0` | **已废弃**:旧版每天按这个整点触发爬取,现在统一由 `nightly.cron_hour` 调度(默认 01:00)。字段保留仅为兼容旧 yaml |
| `proxy` | `(空)` | 下载代理 URL,如 `http://127.0.0.1:7890`;留空时回退到 backend 进程的 `HTTPS_PROXY` 环境变量 |
| `python_path` | `python3` | 解释器路径,可填绝对路径 |
| `script_path` | (自动定位) | 脚本绝对路径;不填时从仓库结构里推断 `91VideoSpider/spider_91porn.py` |
@@ -299,9 +299,10 @@ backend/data/spider91/<driveID>/
**触发逻辑**
- 每分钟轮询一次。命中 `crawl_hour` 小时窗口(默认 0:00-0:59)+ 距离上次成功爬取至少 12 小时 → 触发
- 管理后台点 "立即抓取" 等同于立刻手动触发一次(不受时间窗约束
- 每个 `spider91` drive 独立调度;可以挂多个不同 `crawl_hour` 的实例
- 每天 `nightly.cron_hour`(默认 01:00)作为整条流水线 Phase 2 串行触发;详见上文「凌晨流水线」章节
- 管理后台点 spider91 drive 的「立即抓取」按钮:只跑该 drive 一次爬取(不连带 Phase 1 / Phase 3
- 顶栏「立即跑全流程」按钮:跑完整 Phase 1 + 2 + 3
- `crawl_hour` 字段保留但已不参与调度判定;`last_crawl_at` 仅作 admin UI「上次抓取 N 小时前」显示用
**去重**:用 91porn 网站的 `viewkey` 作为唯一标识,配合 `videos.id = "spider91-<driveID>-<viewkey>"` 的拼接规则去重。每次爬取前 backend 会把 catalog 里已存在的 viewkey 列表写到 `.crawl/seen-<时间戳>.txt`,作为 `--seen-viewkeys-file` 传给 Python 脚本;脚本只会请求未见过 viewkey 的详情页。
@@ -331,13 +332,51 @@ backend/data/spider91/<driveID>/
- **目标 PikPak drive 选择**`spider91_upload_drive_id` 全局设置;admin 可通过 `PUT /admin/api/settings` 显式指定。**未设置时会自动选取唯一的 PikPak drive**;如果有多个 PikPak drive,必须在管理后台显式选定其中一个,否则迁移不会发生。
- **PikPak 目录**:用该 PikPak drive 的 `rootId` 作为上传父目录。建议在 PikPak Web 端预先建一个空的子目录(比如 `/91Spider/`),把这个目录的 file ID 填到 PikPak drive 的 `rootId`,这样既能让自动迁移落到这个子目录,也能让该 PikPak drive 的扫描根只看这个子目录,不会和 115 等其它网盘内容重叠。
- **PikPak 文件名**:上传时使用 `<视频标题>-<viewkey后8位>.<ext>` 格式(方案 B)。标题被 sanitize 过:去控制字符、非法字符 `/ \ : * ? " < > |` 替成空格、折叠空白、首尾去点号、按 unicode 截断 80 字符。catalog 的 `file_name` 同步更新成上传名,下次 PikPak 扫盘时按 `(file_name, size)` 也能匹配上。
- **触发节奏**:迁移 worker 每 60 秒轮询一次;每次 spider91 爬虫完成后立刻额外触发一次(不必等周期)。触发不等于上传 —— 是否上传由"本地是否超过 15 个"决定。
- **触发节奏**:迁移作为凌晨流水线的 Phase 3 触发 —— Phase 2 spider91 爬取 + 所有新视频 teaser 生成完毕后才进入;详见上文「凌晨流水线」章节。每天最多触发一次(除非管理员点「立即跑全流程」)。触发不等于上传 —— 是否上传由"本地是否超过 15 个"决定。
- **catalog 改写**:上传成功后事务性地把视频行的 `drive_id` / `file_id` / `file_name` / `content_hash` 改成 PikPak 的;视频自身的 `id``spider91-<driveID>-<viewkey>`)保持不变,所以 `video_tags`、`views`、`likes`、`91porn` 标签等关联数据全部保留。改写后再次扫盘时,scanner 通过 `(content_hash)` 或 `(file_name, size)` 现成的 `findDuplicate` 兜底逻辑认出来,不会重复入库。
- **本地清理**:迁移成功立即删本地 mp4 + thumb(封面已复制到 `backend/data/previews/thumbs/`,前端展示不受影响)。每轮 worker 末尾还有一道防御性兜底 —— 扫所有本地文件,对 catalog 中 `drive_id` 已迁走但本地仍有残留的孤儿做清理(正常路径不会触发)。
- **去重 seen 文件**crawler 每次跑前会写一份 "已知 viewkey" 文件喂给 Python 脚本,让它跳过已爬过的详情页。这个列表按 `id LIKE 'spider91-<driveID>-%'` 查(不依赖 `drive_id`),所以 spider91 视频被迁到 PikPak 后还能被认出来,**不会重复爬**。
- **失败处理**:上传失败时本地文件保留、catalog 行保持原样;下次轮询会重试。账户超额或永久错误目前没有特殊标记,watch 日志(`[spider91migrate]`)即可。
- **不开 PikPak** 不指定 `spider91_upload_drive_id` 也不挂 PikPak drive 时,spider91 视频继续从本地服务(`/p/spider91/<videoID>`),跟以前一样工作;磁盘会持续增长,需要手动管理。
## 凌晨流水线
所有定时任务统一进一条流水线,每天 `nightly.cron_hour`(默认 1 即 01:00)触发一次:
```
01:00 ▶ NightlyJob 启动
├─ Phase 1 扫所有非 spider91 / 非 localupload 网盘
│ 为每个 drive 串行:scan → 检测新增视频 → 检测被删视频
│ (本项目数据库 + 本地封面 / teaser 一并清理;扫描 errors > 0
│ 时跳过删除清理避免误删)→ 入队封面 + teaser(如该 drive
│ 的 teaser_enabled 为 true
│ 等所有 drive 的封面 + teaser 队列 idle
├─ Phase 2 仅当存在 spider91 drive:串行跑 91 爬虫,新视频入队 teaser
│ 等 teaser 队列 idle
└─ Phase 3 spider91 → 云盘迁移(一次完整 sweep;本地保留最新 15 个,
更旧的传到 spider91_upload_drive_id 指定的 PikPak 或 115
```
**关键属性**
- **6 小时软超时**`nightly.max_duration` 控制单轮总耗时上限。到点时正在跑的 phase 跑完,后续 phase 不再启动 —— 不强 kill 任何 ffmpeg / 上传。
- **当天去重**:流水线启动后会把当天日期写入 `settings.nightly.last_run_date`;同一天 01:00 的下一次 tick 看到日期相同就跳过,进程崩溃 + 重启也不会重复跑。
- **失败处理**:单个 drive 扫描失败、单条 teaser 生成失败、单条迁移失败都不会阻塞流水线;都通过日志可观测,下次流水线再试。teaser `failed` 状态需管理员手动「重生失败 teaser」恢复。
- **每 drive 的 teaser 开关**`drives.teaser_enabled` 字段在 Phase 1 / Phase 2 入队时被尊重;关闭时 teaser worker 不会被入队,封面仍然生成。
- **手动触发**`/admin/drives` 顶栏「立即跑全流程」按钮(POST `/admin/api/jobs/nightly/run`)忽略时间窗立即跑一遍,已在跑时被 `runMu.TryLock` 丢弃。
**已不存在的旧调度**(替代关系):
| 旧机制 | 替代为 |
|---|---|
| `scanLoop`:每 6h 在 0207 点窗口扫一次 | Phase 1 |
| `crawlerLoop`spider91 drive 按 `crawl_hour` 每分钟轮询 | Phase 2 |
| `Migrator.Run`:每 60 秒 + 爬完立即触发的迁移 worker | Phase 3 |
`config.yaml` 里 `scanner.interval_seconds` 字段保留但已不生效;spider91 drive 的 `crawl_hour` 凭证字段保留但已不参与调度,`last_crawl_at` 仅作 admin UI 显示「上次抓取 N 小时前」用。
## Teaser 和封面生成策略
- 封面:固定从第 5 秒抽一帧 jpg,不再为封面单独探测视频时长
+95 -137
View File
@@ -29,6 +29,7 @@ import (
"github.com/video-site/backend/internal/drives/quark"
"github.com/video-site/backend/internal/drives/spider91"
"github.com/video-site/backend/internal/drives/wopan"
"github.com/video-site/backend/internal/nightly"
"github.com/video-site/backend/internal/preview"
"github.com/video-site/backend/internal/proxy"
"github.com/video-site/backend/internal/scanner"
@@ -167,6 +168,11 @@ func main() {
SetSpider91UploadDriveID: func(id string) error {
return app.SetSpider91UploadDriveID(ctx, id)
},
OnRunNightlyJob: func() {
if app.nightlyRunner != nil {
app.nightlyRunner.TriggerNow()
}
},
}
r := chi.NewRouter()
@@ -177,12 +183,23 @@ func main() {
apiServer.RegisterRoutes(r, authr)
adminServer.Register(r)
// 启动定时扫描
go app.scanLoop(ctx)
// spider91 爬虫专用的凌晨任务(默认 00:30 触发,避开网盘扫描窗口)
go app.crawlerLoop(ctx)
// spider91 → PikPak 上传 worker
go app.spider91Migrator.Run(ctx)
// 凌晨流水线:每天 cron_hour 触发一次,串行跑
// Phase 1 扫所有非 spider91 / localupload 网盘 + 删除检测 + 入队封面/teaser
// Phase 2 spider91 爬虫 + 入队 teaser
// Phase 3 spider91 → 云盘迁移
// 也响应 admin "立即跑全流程" 按钮(POST /admin/api/jobs/nightly/run → TriggerNow)。
app.nightlyRunner = nightly.New(nightly.Config{
Settings: cat,
CronHour: cfg.Nightly.CronHour,
MaxDuration: cfg.Nightly.MaxDuration,
ListScanTargets: app.listScanTargetIDs,
RunScan: app.runScan,
ListSpider91Drives: app.listSpider91DriveIDs,
RunSpider91Crawl: app.runSpider91Crawl,
WaitPreviewQueuesIdle: app.waitAllPreviewQueuesIdle,
RunMigration: app.spider91Migrator.RunOnce,
})
go app.nightlyRunner.Run(ctx)
srv := &http.Server{
Addr: cfg.Server.Listen,
@@ -228,6 +245,10 @@ type App struct {
// spider91Migrator 周期把 spider91 视频上传到目标 drivePikPak 或 115)。
spider91Migrator *spider91migrate.Migrator
// nightlyRunner 是凌晨流水线调度器:每天 cron_hour 串行跑扫盘 → 91 爬虫 → 迁移。
// 也响应 admin 「立即跑全流程」按钮(TriggerNow)。
nightlyRunner *nightly.Runner
}
// teaserEnabledForDrive 查询某个 drive 当前的 per-drive teaser 开关。
@@ -832,15 +853,22 @@ func (a *App) runScan(ctx context.Context, driveID string) {
log.Printf("[cleanup] removed %d excluded 115 videos for drive=%s", removed, driveID)
}
}
if drv.Kind() == "pikpak" {
// 删除检测:扫描到的 file_ids 是当前云盘上的真实存在;catalog 里这个 drive
// 名下、且其 parent_id 处在本次扫描走过的目录内(或本次是从根扫的)、却
// 不在 SeenFileIDs 中的视频 → 视为已被删除。
//
// spider91 / localupload 走自己的生命周期管理,不应该参与扫描清理;
// stats.Errors > 0 时(云盘 API 中途抖动)保守起见跳过这一轮,避免把
// "暂时列不出来"误认成"被用户删了"。
if drv.Kind() != spider91.Kind && drv.ID() != localupload.DriveID {
if stats.Errors > 0 {
log.Printf("[cleanup] skip stale PikPak cleanup for drive=%s: scan had %d directory errors", driveID, stats.Errors)
log.Printf("[cleanup] skip stale cleanup for drive=%s kind=%s: scan had %d directory errors", driveID, drv.Kind(), stats.Errors)
} else {
removed, err := a.cleanupMissingDriveVideos(ctx, driveID, stats.SeenFileIDs, stats.VisitedDirIDs, startID == drv.RootID())
if err != nil {
log.Printf("[cleanup] stale PikPak cleanup drive=%s error: %v", driveID, err)
log.Printf("[cleanup] stale cleanup drive=%s kind=%s error: %v", driveID, drv.Kind(), err)
} else if removed > 0 {
log.Printf("[cleanup] removed %d stale PikPak videos for drive=%s", removed, driveID)
log.Printf("[cleanup] removed %d stale videos for drive=%s kind=%s", removed, driveID, drv.Kind())
}
}
}
@@ -1056,61 +1084,60 @@ func (a *App) regenFailedPreviews(ctx context.Context, driveID string) {
log.Printf("[preview] enqueued failed videos for regen drive=%s queued=%d", driveID, queued)
}
func (a *App) scanLoop(ctx context.Context) {
if a.cfg.Scanner.IntervalSeconds <= 0 {
return
}
interval := time.Duration(a.cfg.Scanner.IntervalSeconds) * time.Second
var lastScheduledScan time.Time
if a.scanAllOnceIfDue(ctx, time.Now(), lastScheduledScan, interval) {
lastScheduledScan = time.Now()
}
checkEvery := interval
if checkEvery > time.Minute {
checkEvery = time.Minute
}
ticker := time.NewTicker(checkEvery)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
if a.scanAllOnceIfDue(ctx, now, lastScheduledScan, interval) {
lastScheduledScan = now
}
}
}
}
func (a *App) scanAllOnceIfDue(ctx context.Context, now, lastScheduledScan time.Time, interval time.Duration) bool {
if !scheduledScanDue(now, lastScheduledScan, interval) {
return false
}
a.scanAllOnce(ctx)
return true
}
func scheduledScanDue(now, lastScheduledScan time.Time, interval time.Duration) bool {
if interval <= 0 || !scheduledScanAllowed(now) {
return false
}
return lastScheduledScan.IsZero() || now.Sub(lastScheduledScan) >= interval
}
func scheduledScanAllowed(now time.Time) bool {
hour := now.Hour()
return hour >= 2 && hour < 7
}
func (a *App) scanAllOnce(ctx context.Context) {
for _, d := range a.registry.All() {
// listScanTargetIDs 返回 nightly Phase 1 应扫描的所有 drive ID
// (非 spider91、非 localupload)。顺序按 registry.All 给的稳定顺序。
func (a *App) listScanTargetIDs(_ context.Context) []string {
all := a.registry.All()
out := make([]string, 0, len(all))
for _, d := range all {
if !shouldScanDrive(d) {
continue
}
a.runScan(ctx, d.ID())
out = append(out, d.ID())
}
return out
}
// listSpider91DriveIDs 返回 nightly Phase 2 应触发爬取的 spider91 drive ID 列表。
func (a *App) listSpider91DriveIDs(_ context.Context) []string {
a.mu.Lock()
defer a.mu.Unlock()
out := make([]string, 0, len(a.spider91Crawlers))
for id := range a.spider91Crawlers {
out = append(out, id)
}
return out
}
// waitAllPreviewQueuesIdle 阻塞直到所有 drive 的封面 worker 和 teaser worker
// 队列都为空且无 in-flight 任务。
//
// 顺序:先等所有 thumb worker(因为 enqueueDriveGeneration 内部已经先等当前
// drive 的封面再入队 teaser,但这里是跨 drive 的全局同步),再等所有 teaser。
// 若 ctx 在等待中被取消(软超时 / shutdown),立即返回 ctx.Err。
func (a *App) waitAllPreviewQueuesIdle(ctx context.Context) error {
a.mu.Lock()
thumbWorkers := make([]*preview.ThumbWorker, 0, len(a.thumbWorkers))
previewWorkers := make([]*preview.Worker, 0, len(a.workers))
for _, w := range a.thumbWorkers {
thumbWorkers = append(thumbWorkers, w)
}
for _, w := range a.workers {
previewWorkers = append(previewWorkers, w)
}
a.mu.Unlock()
for _, w := range thumbWorkers {
if err := w.WaitIdle(ctx); err != nil {
return err
}
}
for _, w := range previewWorkers {
if err := w.WaitIdle(ctx); err != nil {
return err
}
}
return nil
}
func shouldScanDrive(d drives.Drive) bool {
@@ -1124,78 +1151,13 @@ func shouldScanDrive(d drives.Drive) bool {
return true
}
// ---------- spider91 crawler loop ----------
const (
// spider91DefaultCrawlHour 默认在凌晨 00 点的窗口内触发;可被每个 drive 的
// credentials.crawl_hour 覆盖。
spider91DefaultCrawlHour = 0
// spider91MinIntervalHours 两次自动爬取之间的最小间隔,避免一天触发多次。
spider91MinIntervalHours = 12
)
// crawlerLoop 每分钟轮询一次,命中触发窗口的 spider91 drive 就启动一次爬取。
// 每个 drive 单独跟踪 last_crawl_at(写在 credentials 里)。
func (a *App) crawlerLoop(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
// 启动后立刻检查一次(可能错过了今天的窗口)
a.tickSpider91(ctx, time.Now())
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
a.tickSpider91(ctx, now)
}
}
}
// tickSpider91 检查所有 spider91 drive,决定哪些该触发。
func (a *App) tickSpider91(ctx context.Context, now time.Time) {
a.mu.Lock()
crawlerIDs := make([]string, 0, len(a.spider91Crawlers))
for id := range a.spider91Crawlers {
crawlerIDs = append(crawlerIDs, id)
}
a.mu.Unlock()
for _, id := range crawlerIDs {
d, err := a.cat.GetDrive(ctx, id)
if err != nil || d == nil {
continue
}
if !spider91DueAt(now, d) {
continue
}
go a.runSpider91Crawl(ctx, id)
}
}
// spider91DueAt 判断 drive 是否应该在 now 触发自动爬取。
// - 当前小时必须等于 drive.crawl_hour(默认 0
// - 距离上次 last_crawl_at 至少 spider91MinIntervalHours 小时
func spider91DueAt(now time.Time, d *catalog.Drive) bool {
if d == nil {
return false
}
hour := spider91IntCred(d, "crawl_hour", spider91DefaultCrawlHour)
if hour < 0 || hour > 23 {
hour = spider91DefaultCrawlHour
}
if now.Hour() != hour {
return false
}
last := spider91IntCred(d, "last_crawl_at", 0)
if last <= 0 {
return true
}
delta := now.Sub(time.Unix(int64(last), 0))
return delta >= time.Duration(spider91MinIntervalHours)*time.Hour
}
// ---------- spider91 crawl ----------
// runSpider91Crawl 运行一次完整爬取流程并把 last_crawl_at 写回 drive.credentials。
// 即使爬取失败也会更新 last_crawl_at,避免一直在错误循环里反复触发;下次窗口仍会重试。
//
// 即使爬取失败也会更新 last_crawl_at,避免一直在错误循环里反复触发;下一次 nightly
// 流水线重跑时仍会重试。该方法是阻塞的,被 nightly Phase 2 串行调用,以及被
// admin "立即抓取" 单 drive 异步调用。
func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
a.mu.Lock()
c := a.spider91Crawlers[driveID]
@@ -1223,7 +1185,8 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
driveID, res.TargetNew, res.TotalEntries, res.NewVideos, res.Skipped, res.Failed, res.SeenSnapshot)
}
// 标记最后一次爬取时间
// 标记最后一次爬取时间。这字段已不再用于调度判定(nightly 流水线统一调度),
// 留着仅作为 admin UI 显示"上次抓取 N 小时前"用。
if d.Credentials == nil {
d.Credentials = make(map[string]string)
}
@@ -1238,11 +1201,6 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
if err := a.cat.UpsertDrive(ctx, d); err != nil {
log.Printf("[spider91] drive=%s update last_crawl_at: %v", driveID, err)
}
// 爬完立刻 ping 一次迁移 worker,不等下一个周期。
if a.spider91Migrator != nil {
a.spider91Migrator.Trigger()
}
}
// spider91IntCred 解析 credentials 中的整数字段,缺省时返回 def。
-74
View File
@@ -1,85 +1,11 @@
package main
import (
"strconv"
"testing"
"time"
"github.com/video-site/backend/internal/catalog"
)
func TestSpider91DueAtTriggerWindow(t *testing.T) {
d := &catalog.Drive{
ID: "x",
Credentials: map[string]string{"crawl_hour": "0"},
}
// 凌晨 0 点 → 触发
now := time.Date(2026, 5, 22, 0, 30, 0, 0, time.Local)
if !spider91DueAt(now, d) {
t.Fatalf("expected due at 0:30 with hour=0")
}
// 1 点 → 不触发
now2 := time.Date(2026, 5, 22, 1, 0, 0, 0, time.Local)
if spider91DueAt(now2, d) {
t.Fatalf("not expected due at 1:00 with hour=0")
}
}
func TestSpider91DueAtCustomHour(t *testing.T) {
d := &catalog.Drive{
ID: "x",
Credentials: map[string]string{"crawl_hour": "3"},
}
due := time.Date(2026, 5, 22, 3, 5, 0, 0, time.Local)
if !spider91DueAt(due, d) {
t.Fatalf("expected due at 3:05 with hour=3")
}
notDue := time.Date(2026, 5, 22, 4, 5, 0, 0, time.Local)
if spider91DueAt(notDue, d) {
t.Fatalf("not expected due at 4:05 with hour=3")
}
}
func TestSpider91DueAtRespectsLastCrawl(t *testing.T) {
now := time.Date(2026, 5, 22, 0, 30, 0, 0, time.Local)
// 上次刚跑过 1 小时 → 不触发
d1 := &catalog.Drive{
ID: "x",
Credentials: map[string]string{
"crawl_hour": "0",
"last_crawl_at": strconv.FormatInt(now.Add(-1*time.Hour).Unix(), 10),
},
}
if spider91DueAt(now, d1) {
t.Fatalf("not expected due 1h after last_crawl_at")
}
// 上次跑了 13 小时前(>=12h)→ 触发
d2 := &catalog.Drive{
ID: "x",
Credentials: map[string]string{
"crawl_hour": "0",
"last_crawl_at": strconv.FormatInt(now.Add(-13*time.Hour).Unix(), 10),
},
}
if !spider91DueAt(now, d2) {
t.Fatalf("expected due 13h after last_crawl_at")
}
}
func TestSpider91DueAtFirstRun(t *testing.T) {
d := &catalog.Drive{
ID: "x",
Credentials: map[string]string{"crawl_hour": "0"},
}
now := time.Date(2026, 5, 22, 0, 30, 0, 0, time.Local)
// 没有 last_crawl_at → 视为首次运行,命中窗口就触发
if !spider91DueAt(now, d) {
t.Fatalf("expected due on first run within window")
}
}
func TestSpider91IntCredFallbacks(t *testing.T) {
tests := []struct {
name string
-40
View File
@@ -375,46 +375,6 @@ func TestEnqueueUploadedVideoQueuesLocalPreviewWorker(t *testing.T) {
t.Fatalf("preview status = %q, want ready", got.PreviewStatus)
}
func TestScheduledScanWindowAllowsOnlyEarlyMorning(t *testing.T) {
loc := time.FixedZone("CST", 8*60*60)
cases := []struct {
name string
now time.Time
want bool
}{
{name: "before window", now: time.Date(2026, 5, 12, 1, 59, 0, 0, loc), want: false},
{name: "at start", now: time.Date(2026, 5, 12, 2, 0, 0, 0, loc), want: true},
{name: "inside window", now: time.Date(2026, 5, 12, 6, 59, 0, 0, loc), want: true},
{name: "at end", now: time.Date(2026, 5, 12, 7, 0, 0, 0, loc), want: false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := scheduledScanAllowed(tc.now); got != tc.want {
t.Fatalf("scheduledScanAllowed(%s) = %v, want %v", tc.now.Format(time.RFC3339), got, tc.want)
}
})
}
}
func TestScheduledScanDueRespectsWindowAndInterval(t *testing.T) {
loc := time.FixedZone("CST", 8*60*60)
interval := 2 * time.Hour
inside := time.Date(2026, 5, 12, 2, 0, 0, 0, loc)
if scheduledScanDue(time.Date(2026, 5, 12, 1, 59, 0, 0, loc), time.Time{}, interval) {
t.Fatal("scheduled scan due outside window, want false")
}
if !scheduledScanDue(inside, time.Time{}, interval) {
t.Fatal("first scheduled scan inside window = false, want true")
}
if scheduledScanDue(inside.Add(time.Hour), inside, interval) {
t.Fatal("scheduled scan due before interval elapsed, want false")
}
if !scheduledScanDue(inside.Add(2*time.Hour), inside, interval) {
t.Fatal("scheduled scan due after interval elapsed, want true")
}
}
func TestShouldScanDriveSkipsLocalUpload(t *testing.T) {
if shouldScanDrive(&serverLocalUploadFakeDrive{}) {
t.Fatal("local upload drive should not be scanned")
+14 -2
View File
@@ -25,13 +25,25 @@ storage:
local_preview_dir: "./data/previews"
scanner:
# 自动扫盘最小间隔(秒);只在每天 02:00-07:00 触发,0 表示仅允许管理员手动重扫
interval_seconds: 21600
# 已废弃:旧版的"每天 02:00-07:00 按 IntervalSeconds 间隔重复扫盘"已被移除。
# 现在所有定时任务统一由 nightly 块控制(每天 01:00 跑一条完整流水线)。
# 字段保留仅为兼容旧 yaml,运行时被忽略。
interval_seconds: 0
# 单次扫描每家网盘目录递归层数上限
max_depth: 5
# 被扫描的扩展名
video_extensions: [".mp4", ".mkv", ".mov", ".webm", ".avi"]
nightly:
# 凌晨流水线触发整点(0-23),默认 1 即每天 01:00。流程:
# Phase 1 扫所有非 spider91 / 非 localupload 网盘 → 检测新增 / 删除
# → 入队封面和 teaser → 等所有队列 idle
# Phase 2 spider91 爬虫(如配置)→ 入队 teaser → 等队列 idle
# Phase 3 spider91 → 云盘迁移(一次性 sweep)
cron_hour: 1
# 单次流水线总耗时上限(软超时);超过后当前 phase 跑完不启动后续 phase。
max_duration: 6h
preview:
# 是否启用 ffmpeg 抽帧生成 teaser
enabled: true
+17
View File
@@ -36,6 +36,10 @@ type AdminServer struct {
// Spider91 → PikPak 上传目标 drive ID 读写
GetSpider91UploadDriveID func() string
SetSpider91UploadDriveID func(driveID string) error
// OnRunNightlyJob 触发一次完整的凌晨流水线(Phase1 扫盘 + Phase2 91 爬虫 +
// Phase3 迁移)。立即返回 —— 实际任务在后台跑,admin 在日志或下次状态查询里
// 看进度。重复点击会被 Runner.TryLock 丢弃。
OnRunNightlyJob func()
}
type GenerationStatus struct {
@@ -83,6 +87,9 @@ func (a *AdminServer) Register(r chi.Router) {
// 运行时设置
r.Get("/settings", a.handleGetSettings)
r.Put("/settings", a.handlePutSettings)
// 运维任务
r.Post("/jobs/nightly/run", a.handleRunNightlyJob)
})
})
}
@@ -309,6 +316,16 @@ func (a *AdminServer) handleRescan(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusAccepted, map[string]any{"ok": true})
}
// handleRunNightlyJob 触发一次完整的凌晨流水线(不论当前时间,不论今日是否已跑)。
// 立即返回 202;进度通过 backend 日志和下次 GET /admin/api/drives 的状态变化观察。
// 流水线已在跑时 Runner 会丢弃此次触发并记日志。
func (a *AdminServer) handleRunNightlyJob(w http.ResponseWriter, r *http.Request) {
if a.OnRunNightlyJob != nil {
a.OnRunNightlyJob()
}
writeJSON(w, http.StatusAccepted, map[string]any{"ok": true})
}
// teaserEnabledReq 是 POST /admin/api/drives/{id}/teaser-enabled 的入参。
type teaserEnabledReq struct {
Enabled bool `json:"enabled"`
+27
View File
@@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"
"gopkg.in/yaml.v3"
)
@@ -13,6 +14,7 @@ type Config struct {
Storage Storage `yaml:"storage"`
Scanner Scanner `yaml:"scanner"`
Preview Preview `yaml:"preview"`
Nightly Nightly `yaml:"nightly"`
Drives []Drive `yaml:"drives"`
}
@@ -38,6 +40,8 @@ type Storage struct {
}
type Scanner struct {
// IntervalSeconds 已废弃。旧版每天 02:00–07:00 窗口内按这个间隔重复扫盘;
// 新版统一由 nightly.cron_hour 调度,此字段被忽略,保留仅为兼容旧 yaml。
IntervalSeconds int `yaml:"interval_seconds"`
MaxDepth int `yaml:"max_depth"`
VideoExtensions []string `yaml:"video_extensions"`
@@ -52,6 +56,18 @@ type Preview struct {
Segments int `yaml:"segments"`
}
// Nightly 是凌晨流水线(扫盘 → 91 爬虫 → 迁移)的调度配置。
//
// 一个进程只跑一条 nightly 流水线;该 cron 时间到达且当天还没跑过时触发,
// 也可被管理后台「立即跑全流程」按钮手动触发。MaxDuration 是软超时,超过
// 后当前 phase 完成、后续 phase 不再启动。
type Nightly struct {
// CronHour 是每日触发整点(023);默认 1 表示 01:00。
CronHour int `yaml:"cron_hour"`
// MaxDuration 是单次流水线总耗时上限;默认 6h。
MaxDuration time.Duration `yaml:"max_duration"`
}
// Drive 配置项中的敏感字段(Cookie / RefreshToken 等)最终由管理后台写入 DB
// 这里保留 yaml 中的静态定义,用于启动时预置盘。生产建议只在 DB 里维护。
type Drive struct {
@@ -118,4 +134,15 @@ func (c *Config) applyDefaults() {
if c.Preview.Segments == 0 {
c.Preview.Segments = 3
}
// Nightly defaults。CronHour=0 是合法值(午夜),没法用 zero-value 单独
// 区分"未设"和"显式 0"。把整个 nightly 块当 sentinel —— MaxDuration==0
// 视为整个块缺失,重置成 (cron_hour=1, max_duration=6h)。代价:用户想配
// CronHour=0(午夜)必须同时显式写 max_duration(任何 >0 的值即可)。
// 收益:默认部署(yaml 没 nightly 块)得到 01:00 + 6h,与用户预期一致。
if c.Nightly.MaxDuration <= 0 {
c.Nightly.CronHour = 1
c.Nightly.MaxDuration = 6 * time.Hour
} else if c.Nightly.CronHour < 0 || c.Nightly.CronHour > 23 {
c.Nightly.CronHour = 1
}
}
+312
View File
@@ -0,0 +1,312 @@
// Package nightly orchestrates the single nightly maintenance pipeline that
// replaces the legacy scanLoop / crawlerLoop / spider91 migrator periodic loop.
//
// Pipeline (fired once per day at cron_hour, also via TriggerNow for admin
// "立即跑全流程"):
//
// Phase 1: for each non-spider91 cloud drive
// scan + delete-detection + enqueue thumb + enqueue teaser
// wait until all thumb / teaser queues are idle
// Phase 2: if any spider91 drive configured
// crawl + enqueue teaser for new videos
// wait until teaser queues are idle
// Phase 3: spider91 → cloud migration (single sweep, captcha cooldown still
// honored within this call)
//
// A 6h soft deadline guards each pipeline run; phases check deadline at their
// boundaries and exit cleanly if exceeded (no in-flight ffmpeg / upload is
// killed mid-task).
//
// State persistence: the date string of the most recent successfully started
// run is stored in catalog.settings under the key "nightly.last_run_date".
// This survives restarts so a quick crash inside cron_hour won't trigger a
// duplicate pipeline.
package nightly
import (
"context"
"errors"
"log"
"sync"
"time"
)
const (
// settingLastRunDate stores the YYYY-MM-DD of the last natural cron-triggered
// pipeline run. Manual TriggerNow() also updates this to keep behavior consistent.
settingLastRunDate = "nightly.last_run_date"
// dateLayout matches catalog.GetSetting string semantics; using ISO-8601 date.
dateLayout = "2006-01-02"
// pollInterval is the heartbeat for the natural cron decision loop.
pollInterval = time.Minute
// minSafeMaxDuration prevents misconfiguration: anything below this would
// almost guarantee Phase 2/3 are skipped.
minSafeMaxDuration = 5 * time.Minute
)
// SettingStore is the minimal catalog.Catalog surface we rely on.
type SettingStore interface {
GetSetting(ctx context.Context, key, defaultValue string) (string, error)
SetSetting(ctx context.Context, key, value string) error
}
// Config wires the runner to its dependencies. The function-callback shape
// avoids importing main / drives / preview from this package, keeping the
// dependency graph clean.
type Config struct {
Settings SettingStore
CronHour int // 0-23; default 1 (01:00)
MaxDuration time.Duration // soft deadline for one pipeline run; default 6h
// ListScanTargets returns the drive IDs to run Phase 1 on, in deterministic
// order. Should exclude spider91 and localupload drives.
ListScanTargets func(ctx context.Context) []string
// RunScan synchronously runs scan + cleanup + enqueueDriveGeneration for
// one drive. Errors are expected to be logged inside, not surfaced.
RunScan func(ctx context.Context, driveID string)
// ListSpider91Drives returns spider91 drive IDs to crawl in Phase 2.
// Returns empty slice when no spider91 drive is configured.
ListSpider91Drives func(ctx context.Context) []string
// RunSpider91Crawl synchronously runs one crawl cycle (downloads + thumbs +
// teaser enqueue) for a single spider91 drive.
RunSpider91Crawl func(ctx context.Context, driveID string)
// WaitPreviewQueuesIdle blocks until both the thumbnail and teaser queues
// across all drives are drained (queue empty + no in-flight task). It must
// honor ctx cancellation.
WaitPreviewQueuesIdle func(ctx context.Context) error
// RunMigration runs spider91migrate.Migrator.RunOnce for Phase 3.
RunMigration func(ctx context.Context) error
// Now is injected for tests; nil → time.Now.
Now func() time.Time
}
// Runner drives the nightly pipeline.
type Runner struct {
cfg Config
trigger chan struct{} // buffered(1); manual "run now"
runMu sync.Mutex // prevents overlapping pipeline runs
}
// New constructs a Runner. cfg is shallow-copied; defaults are applied.
func New(cfg Config) *Runner {
if cfg.CronHour < 0 || cfg.CronHour > 23 {
cfg.CronHour = 1
}
if cfg.MaxDuration <= 0 {
cfg.MaxDuration = 6 * time.Hour
}
if cfg.MaxDuration < minSafeMaxDuration {
cfg.MaxDuration = minSafeMaxDuration
}
if cfg.Now == nil {
cfg.Now = time.Now
}
return &Runner{
cfg: cfg,
trigger: make(chan struct{}, 1),
}
}
// Run is a blocking loop until ctx is done. It wakes up once per minute and
// either fires the natural cron-driven pipeline (when cron_hour matches and
// today hasn't run) or honors a manual TriggerNow() request.
func (r *Runner) Run(ctx context.Context) {
t := time.NewTicker(pollInterval)
defer t.Stop()
log.Printf("[nightly] runner started; cron_hour=%d max_duration=%s", r.cfg.CronHour, r.cfg.MaxDuration)
for {
select {
case <-ctx.Done():
log.Printf("[nightly] runner stopping: %v", ctx.Err())
return
case <-t.C:
r.tryNaturalRun(ctx)
case <-r.trigger:
log.Printf("[nightly] manual trigger received")
r.runPipelineLocked(ctx, true)
}
}
}
// TriggerNow asks the running loop to fire a pipeline ASAP. If a pipeline is
// already in progress (or another trigger is already pending), the request
// is dropped — the in-progress run will absorb the intent.
func (r *Runner) TriggerNow() {
select {
case r.trigger <- struct{}{}:
default:
}
}
// tryNaturalRun checks the cron decision and runs the pipeline if due today.
func (r *Runner) tryNaturalRun(ctx context.Context) {
now := r.cfg.Now()
if now.Hour() != r.cfg.CronHour {
return
}
last, err := r.readLastRunDate(ctx)
if err != nil {
log.Printf("[nightly] read last_run_date: %v", err)
return
}
if !shouldRun(now, last) {
return
}
log.Printf("[nightly] natural cron trigger at %s", now.Format(time.RFC3339))
r.runPipelineLocked(ctx, false)
}
// shouldRun returns true when "today" (per now) hasn't already been processed.
func shouldRun(now time.Time, lastRunDate string) bool {
return lastRunDate != now.Format(dateLayout)
}
// runPipelineLocked guards against overlapping runs. If another pipeline is
// in progress, the call returns immediately (logged once). After completion
// (regardless of success), today's date is recorded so subsequent triggers
// the same calendar day are skipped.
func (r *Runner) runPipelineLocked(ctx context.Context, manual bool) {
if !r.runMu.TryLock() {
log.Printf("[nightly] another pipeline is already running, skipping this trigger")
return
}
defer r.runMu.Unlock()
started := r.cfg.Now()
deadline := started.Add(r.cfg.MaxDuration)
runCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
mode := "scheduled"
if manual {
mode = "manual"
}
log.Printf("[nightly] pipeline (%s) start; soft deadline=%s", mode, deadline.Format(time.RFC3339))
r.runPipeline(runCtx)
finished := r.cfg.Now()
log.Printf("[nightly] pipeline (%s) finish; took=%s", mode, finished.Sub(started).Round(time.Second))
// Mark today as processed regardless of success/error. This is intentional:
// a partial / failing pipeline shouldn't trigger again the same day, the
// admin can inspect logs and click "立即跑全流程" to retry explicitly.
dateStr := started.Format(dateLayout)
if err := r.cfg.Settings.SetSetting(ctx, settingLastRunDate, dateStr); err != nil {
log.Printf("[nightly] persist last_run_date: %v", err)
}
}
// runPipeline executes the three phases. It returns when the pipeline finishes
// OR ctx is done (deadline / cancel). Errors are logged but not propagated —
// each phase is best-effort; downstream phases still attempt to run unless ctx
// is dead.
func (r *Runner) runPipeline(ctx context.Context) {
// ---------- Phase 1 ----------
if r.checkDeadline(ctx, "phase 1") {
return
}
scanIDs := []string{}
if r.cfg.ListScanTargets != nil {
scanIDs = r.cfg.ListScanTargets(ctx)
}
if len(scanIDs) == 0 {
log.Printf("[nightly] phase 1 skipped: no cloud drives to scan")
} else {
log.Printf("[nightly] phase 1: scanning %d drive(s)", len(scanIDs))
for _, id := range scanIDs {
if ctx.Err() != nil {
log.Printf("[nightly] phase 1 aborted by ctx: %v", ctx.Err())
return
}
log.Printf("[nightly] phase 1: scanning drive=%s", id)
r.cfg.RunScan(ctx, id)
}
log.Printf("[nightly] phase 1: waiting for preview queues to drain")
if err := r.waitIdle(ctx, "phase 1"); err != nil {
return
}
}
// ---------- Phase 2 ----------
if r.checkDeadline(ctx, "phase 2") {
return
}
spiderIDs := []string{}
if r.cfg.ListSpider91Drives != nil {
spiderIDs = r.cfg.ListSpider91Drives(ctx)
}
if len(spiderIDs) == 0 {
log.Printf("[nightly] phase 2/3 skipped: no spider91 drive configured")
return
}
log.Printf("[nightly] phase 2: crawling %d spider91 drive(s)", len(spiderIDs))
for _, id := range spiderIDs {
if ctx.Err() != nil {
log.Printf("[nightly] phase 2 aborted by ctx: %v", ctx.Err())
return
}
log.Printf("[nightly] phase 2: crawling drive=%s", id)
r.cfg.RunSpider91Crawl(ctx, id)
}
log.Printf("[nightly] phase 2: waiting for teaser queue to drain")
if err := r.waitIdle(ctx, "phase 2"); err != nil {
return
}
// ---------- Phase 3 ----------
if r.checkDeadline(ctx, "phase 3") {
return
}
log.Printf("[nightly] phase 3: spider91 migration")
if r.cfg.RunMigration != nil {
if err := r.cfg.RunMigration(ctx); err != nil {
log.Printf("[nightly] phase 3 migration: %v", err)
}
}
}
// checkDeadline returns true when ctx is already done (i.e., the soft deadline
// has been reached or the runner is shutting down) and the caller should bail.
func (r *Runner) checkDeadline(ctx context.Context, phase string) bool {
if err := ctx.Err(); err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
log.Printf("[nightly] %s: soft deadline reached, bailing out", phase)
default:
log.Printf("[nightly] %s: ctx done (%v), bailing out", phase, err)
}
return true
}
return false
}
// waitIdle calls the configured WaitPreviewQueuesIdle, logging the outcome.
func (r *Runner) waitIdle(ctx context.Context, phase string) error {
if r.cfg.WaitPreviewQueuesIdle == nil {
return nil
}
if err := r.cfg.WaitPreviewQueuesIdle(ctx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Printf("[nightly] %s: soft deadline reached while waiting for preview queues", phase)
} else {
log.Printf("[nightly] %s: wait preview queues: %v", phase, err)
}
return err
}
return nil
}
// readLastRunDate reads the persisted last_run_date or returns "" when unset.
func (r *Runner) readLastRunDate(ctx context.Context) (string, error) {
if r.cfg.Settings == nil {
return "", nil
}
return r.cfg.Settings.GetSetting(ctx, settingLastRunDate, "")
}
+323
View File
@@ -0,0 +1,323 @@
package nightly
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
)
// stubSettings is an in-memory SettingStore for tests.
type stubSettings struct {
mu sync.Mutex
kv map[string]string
}
func newStubSettings() *stubSettings { return &stubSettings{kv: make(map[string]string)} }
func (s *stubSettings) GetSetting(_ context.Context, key, def string) (string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if v, ok := s.kv[key]; ok {
return v, nil
}
return def, nil
}
func (s *stubSettings) SetSetting(_ context.Context, key, value string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.kv[key] = value
return nil
}
func TestShouldRunChecksDate(t *testing.T) {
now := time.Date(2026, 5, 27, 1, 30, 0, 0, time.UTC)
if !shouldRun(now, "") {
t.Fatal("first ever run with empty last_run_date should be due")
}
if !shouldRun(now, "2026-05-26") {
t.Fatal("yesterday's run should not block today")
}
if shouldRun(now, "2026-05-27") {
t.Fatal("today's run already recorded should block another natural run")
}
}
func TestNewAppliesDefaults(t *testing.T) {
r := New(Config{Settings: newStubSettings()})
// CronHour=0 is a legitimate hour (midnight); New() only clamps out-of-range
// values. The "default to 01:00" responsibility lives in the config layer.
if r.cfg.CronHour != 0 {
t.Errorf("CronHour zero-value should be preserved, got %d", r.cfg.CronHour)
}
if r.cfg.MaxDuration != 6*time.Hour {
t.Errorf("MaxDuration default = %s, want 6h", r.cfg.MaxDuration)
}
}
func TestNewRejectsInvalidCronHour(t *testing.T) {
r := New(Config{CronHour: -1, Settings: newStubSettings()})
if r.cfg.CronHour != 1 {
t.Fatalf("invalid cron_hour fall back to 1, got %d", r.cfg.CronHour)
}
r2 := New(Config{CronHour: 25, Settings: newStubSettings()})
if r2.cfg.CronHour != 1 {
t.Fatalf("out-of-range cron_hour fall back to 1, got %d", r2.cfg.CronHour)
}
}
func TestNewRaisesUnsafeMaxDuration(t *testing.T) {
r := New(Config{MaxDuration: time.Second, Settings: newStubSettings()})
if r.cfg.MaxDuration != minSafeMaxDuration {
t.Fatalf("MaxDuration too small clamps to %s, got %s", minSafeMaxDuration, r.cfg.MaxDuration)
}
}
// recorder accumulates the order of phase invocations so tests can assert
// orchestration semantics.
type recorder struct {
mu sync.Mutex
calls []string
}
func (r *recorder) push(s string) {
r.mu.Lock()
r.calls = append(r.calls, s)
r.mu.Unlock()
}
func (r *recorder) snapshot() []string {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]string, len(r.calls))
copy(out, r.calls)
return out
}
func TestRunPipelineHonoursPhaseOrder(t *testing.T) {
rec := &recorder{}
settings := newStubSettings()
r := New(Config{
Settings: settings,
ListScanTargets: func(context.Context) []string {
rec.push("list-scan")
return []string{"drive-a", "drive-b"}
},
RunScan: func(_ context.Context, id string) {
rec.push("scan:" + id)
},
ListSpider91Drives: func(context.Context) []string {
rec.push("list-spider")
return []string{"sp-1"}
},
RunSpider91Crawl: func(_ context.Context, id string) {
rec.push("crawl:" + id)
},
WaitPreviewQueuesIdle: func(context.Context) error {
rec.push("wait-idle")
return nil
},
RunMigration: func(context.Context) error {
rec.push("migrate")
return nil
},
})
r.runPipeline(context.Background())
got := rec.snapshot()
want := []string{
"list-scan",
"scan:drive-a",
"scan:drive-b",
"wait-idle", // after phase 1
"list-spider",
"crawl:sp-1",
"wait-idle", // after phase 2
"migrate",
}
if len(got) != len(want) {
t.Fatalf("call sequence len = %d, want %d; got=%v", len(got), len(want), got)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("call[%d] = %q, want %q (full=%v)", i, got[i], want[i], got)
}
}
}
func TestRunPipelineSkipsMigrationWhenNoSpider91(t *testing.T) {
rec := &recorder{}
r := New(Config{
Settings: newStubSettings(),
ListScanTargets: func(context.Context) []string { return []string{"drive-a"} },
RunScan: func(_ context.Context, id string) { rec.push("scan:" + id) },
ListSpider91Drives: func(context.Context) []string { return nil },
RunSpider91Crawl: func(_ context.Context, id string) { rec.push("crawl:" + id) },
WaitPreviewQueuesIdle: func(context.Context) error {
rec.push("wait-idle")
return nil
},
RunMigration: func(context.Context) error {
rec.push("migrate")
return nil
},
})
r.runPipeline(context.Background())
for _, c := range rec.snapshot() {
if c == "migrate" || c == "crawl:sp-1" {
t.Fatalf("phase 2/3 should be skipped when no spider91 drive, got call %q", c)
}
}
}
func TestRunPipelineExitsWhenContextCancelledMidPhase(t *testing.T) {
rec := &recorder{}
ctx, cancel := context.WithCancel(context.Background())
r := New(Config{
Settings: newStubSettings(),
ListScanTargets: func(context.Context) []string {
return []string{"drive-a", "drive-b", "drive-c"}
},
RunScan: func(_ context.Context, id string) {
rec.push("scan:" + id)
if id == "drive-a" {
cancel()
}
},
ListSpider91Drives: func(context.Context) []string { return []string{"x"} },
RunSpider91Crawl: func(context.Context, string) { rec.push("crawl") },
WaitPreviewQueuesIdle: func(context.Context) error { rec.push("wait-idle"); return nil },
RunMigration: func(context.Context) error { rec.push("migrate"); return nil },
})
r.runPipeline(ctx)
got := rec.snapshot()
for _, c := range got {
if c == "scan:drive-c" || c == "scan:drive-b" {
t.Fatalf("scan should bail out after cancel, got call %q (full=%v)", c, got)
}
}
for _, c := range got {
if c == "crawl" || c == "migrate" {
t.Fatalf("subsequent phase should not run after cancel, got call %q", c)
}
}
}
func TestRunPipelineRecordsLastRunDateAfterCompletion(t *testing.T) {
settings := newStubSettings()
now := time.Date(2026, 5, 27, 1, 5, 0, 0, time.UTC)
r := New(Config{
Settings: settings,
Now: func() time.Time { return now },
ListScanTargets: func(context.Context) []string { return nil },
WaitPreviewQueuesIdle: func(context.Context) error { return nil },
})
r.runPipelineLocked(context.Background(), false)
got, _ := settings.GetSetting(context.Background(), settingLastRunDate, "")
if got != "2026-05-27" {
t.Fatalf("last_run_date = %q, want 2026-05-27", got)
}
}
func TestRunPipelineLockedDropsOverlappingTriggers(t *testing.T) {
var (
started atomic.Int32
releaseFirst = make(chan struct{})
)
r := New(Config{
Settings: newStubSettings(),
ListScanTargets: func(context.Context) []string {
started.Add(1)
<-releaseFirst
return nil
},
WaitPreviewQueuesIdle: func(context.Context) error { return nil },
})
go r.runPipelineLocked(context.Background(), false)
// Wait for first to start
for started.Load() == 0 {
time.Sleep(5 * time.Millisecond)
}
// Second trigger should bail out without invoking ListScanTargets again
r.runPipelineLocked(context.Background(), true)
if started.Load() != 1 {
t.Fatalf("overlapping run should be dropped; started=%d", started.Load())
}
close(releaseFirst)
}
func TestSoftDeadlinePreventsLaterPhases(t *testing.T) {
rec := &recorder{}
settings := newStubSettings()
r := New(Config{
Settings: settings,
MaxDuration: minSafeMaxDuration, // smallest allowed
ListScanTargets: func(context.Context) []string { return nil },
WaitPreviewQueuesIdle: func(ctx context.Context) error {
// Force the wait to see a deadline-exceeded ctx before we proceed
ctxDone := make(chan struct{})
go func() {
<-ctx.Done()
close(ctxDone)
}()
select {
case <-ctxDone:
return ctx.Err()
case <-time.After(time.Second):
return errors.New("test timeout")
}
},
ListSpider91Drives: func(context.Context) []string {
rec.push("list-spider")
return []string{"x"}
},
RunSpider91Crawl: func(context.Context, string) { rec.push("crawl") },
RunMigration: func(context.Context) error { rec.push("migrate"); return nil },
})
// Build a runCtx whose deadline is already past to simulate timeout reached
// at a phase boundary. We can't directly inject, but emulate via canceled.
ctx, cancel := context.WithCancel(context.Background())
cancel()
r.runPipeline(ctx)
for _, c := range rec.snapshot() {
if c == "crawl" || c == "migrate" || c == "list-spider" {
t.Fatalf("later phase should not run after ctx done; got %q", c)
}
}
}
func TestTriggerNowIsNonBlocking(t *testing.T) {
r := New(Config{Settings: newStubSettings()})
// fill the trigger channel
r.TriggerNow()
// Second call must not block
done := make(chan struct{})
go func() {
r.TriggerNow()
close(done)
}()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Fatal("TriggerNow blocked when channel is full")
}
}
+42
View File
@@ -1220,6 +1220,48 @@ func (w *ThumbWorker) Status() TaskStatus {
return taskStatus(&w.activity, &w.rateLimit, w.queue.lengthExcluding(currentID))
}
// WaitIdle 阻塞直到 worker 队列为空且当前没有正在处理的任务。
//
// "队列空"的判定基于 videoQueue —— 它在 Enqueue 时 reserve、processQueued
// defer 里 release,因此 lengthExcluding("") == 0 同时覆盖:
// - channel 中尚未被消费的项
// - 当前正在 processQueued 的项(哪怕处于 cooldown 等待中)
//
// 调用方应通过 ctx 传入超时 / cancelctx 结束时返回 ctx.Err()。
// 200ms 轮询:开销极低,凌晨流水线对几百毫秒级响应延迟不敏感。
func (w *Worker) WaitIdle(ctx context.Context) error {
if w == nil {
return nil
}
return waitQueueIdle(ctx, &w.queue)
}
// WaitIdle 见 Worker.WaitIdle 注释。
func (w *ThumbWorker) WaitIdle(ctx context.Context) error {
if w == nil {
return nil
}
return waitQueueIdle(ctx, &w.queue)
}
func waitQueueIdle(ctx context.Context, q *videoQueue) error {
if q.lengthExcluding("") == 0 {
return nil
}
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if q.lengthExcluding("") == 0 {
return nil
}
}
}
}
func taskStatus(activity *taskActivity, rateLimit *rateLimitState, queueLength int) TaskStatus {
if queueLength < 0 {
queueLength = 0
+78
View File
@@ -567,3 +567,81 @@ func (d *previewFakeDrive) EnsureDir(context.Context, string) (string, error) {
return "", drives.ErrNotSupported
}
func (d *previewFakeDrive) RootID() string { return "root" }
func TestWorkerWaitIdleReturnsImmediatelyWhenQueueEmpty(t *testing.T) {
worker := NewWorker(&fakeTeaserGenerator{}, nil, &previewFakeDrive{})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
start := time.Now()
if err := worker.WaitIdle(ctx); err != nil {
t.Fatalf("WaitIdle on empty queue: %v", err)
}
if took := time.Since(start); took > 50*time.Millisecond {
t.Fatalf("WaitIdle on empty queue took %s, want immediate return", took)
}
}
func TestWorkerWaitIdleBlocksUntilQueueDrains(t *testing.T) {
worker := NewWorker(&fakeTeaserGenerator{}, nil, &previewFakeDrive{})
v := &catalog.Video{ID: "wait-idle-vid"}
if !worker.queue.reserve(v) {
t.Fatalf("reserve should succeed on fresh queue")
}
go func() {
time.Sleep(120 * time.Millisecond)
worker.queue.release(v)
}()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
start := time.Now()
if err := worker.WaitIdle(ctx); err != nil {
t.Fatalf("WaitIdle: %v", err)
}
took := time.Since(start)
if took < 100*time.Millisecond {
t.Fatalf("WaitIdle returned in %s, expected to wait until release", took)
}
if took > time.Second {
t.Fatalf("WaitIdle took %s, expected to return shortly after release", took)
}
}
func TestWorkerWaitIdleHonoursContextCancel(t *testing.T) {
worker := NewWorker(&fakeTeaserGenerator{}, nil, &previewFakeDrive{})
v := &catalog.Video{ID: "ctx-cancel"}
if !worker.queue.reserve(v) {
t.Fatalf("reserve should succeed")
}
t.Cleanup(func() { worker.queue.release(v) })
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
if err := worker.WaitIdle(ctx); err == nil {
t.Fatalf("WaitIdle expected ctx.Err, got nil")
}
}
func TestThumbWorkerWaitIdleBlocksUntilQueueDrains(t *testing.T) {
worker := NewThumbWorker(&fakeThumbGenerator{}, nil, &previewFakeDrive{})
v := &catalog.Video{ID: "thumb-wait-idle"}
if !worker.queue.reserve(v) {
t.Fatalf("reserve should succeed")
}
go func() {
time.Sleep(80 * time.Millisecond)
worker.queue.release(v)
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := worker.WaitIdle(ctx); err != nil {
t.Fatalf("ThumbWorker.WaitIdle: %v", err)
}
}
+46 -3
View File
@@ -19,8 +19,15 @@ type Scanner struct {
MaxDepth int
// 回调:新视频被加入后触发 teaser 生成
OnNewVideo func(v *catalog.Video)
// ProgressInterval 控制扫描内部 heartbeat 的最小输出间隔。
// 0 → 默认 30s< 0 → 关闭 heartbeat(仅留外层 start / done 两行)。
// heartbeat 单行格式:
// [scanner] drive=X progress: scanned=N added=K errors=E dirs=M elapsed=Ts at=<dir>
ProgressInterval time.Duration
}
const defaultScanProgressInterval = 30 * time.Second
func New(cat *catalog.Catalog, drv drives.Drive, exts []string, maxDepth int, onNew func(v *catalog.Video)) *Scanner {
m := make(map[string]bool, len(exts))
for _, e := range exts {
@@ -57,13 +64,44 @@ func (s *Scanner) Run(ctx context.Context, startDirID string) (Stats, error) {
VisitedDirIDs: make(map[string]struct{}),
ExcludedFileIDs: make(map[string]struct{}),
}
if err := s.walk(ctx, startDirID, "", 0, &stats); err != nil {
// heartbeat 闭包:进 / 退每个目录、每处理完一个文件后调一下,用一个时间戳节流。
// 闭包持有的状态都是单 goroutine 顺序写读,不需要锁。
interval := s.ProgressInterval
if interval == 0 {
interval = defaultScanProgressInterval
}
started := time.Now()
lastBeat := started
driveID := ""
if s.Drive != nil {
driveID = s.Drive.ID()
}
progress := func(currentDir string) {
if interval < 0 {
return
}
now := time.Now()
if now.Sub(lastBeat) < interval {
return
}
lastBeat = now
shown := currentDir
if shown == "" {
shown = "(root)"
}
log.Printf("[scanner] drive=%s progress: scanned=%d added=%d errors=%d dirs=%d elapsed=%s at=%s",
driveID, stats.Scanned, stats.Added, stats.Errors, len(stats.VisitedDirIDs),
now.Sub(started).Round(time.Second), shown)
}
if err := s.walk(ctx, startDirID, "", 0, &stats, progress); err != nil {
return stats, err
}
return stats, nil
}
func (s *Scanner) walk(ctx context.Context, dirID, dirName string, depth int, stats *Stats) error {
func (s *Scanner) walk(ctx context.Context, dirID, dirName string, depth int, stats *Stats, progress func(string)) error {
if depth >= s.MaxDepth {
return nil
}
@@ -71,6 +109,7 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, depth int, st
return err
}
stats.VisitedDirIDs[dirID] = struct{}{}
progress(dirName) // 心跳:进入新目录前后是天然的节流点
entries, err := s.Drive.List(ctx, dirID)
if err != nil {
@@ -90,7 +129,7 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, depth int, st
}
continue
}
if err := s.walk(ctx, e.ID, e.Name, depth+1, stats); err != nil {
if err := s.walk(ctx, e.ID, e.Name, depth+1, stats, progress); err != nil {
stats.Errors++
log.Printf("[scanner] walk %s error: %v", e.Name, err)
}
@@ -185,6 +224,10 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, depth int, st
if s.OnNewVideo != nil {
s.OnNewVideo(v)
}
// 兜底:如果某个目录里挤了几千个文件,仅靠"进目录心跳"会很久不响一下;
// 在每条文件处理完之后再 ping 一次,progress 内部的 30s 节流会把绝大多数
// 调用变成廉价的时间比较。
progress(dirName)
}
return nil
}
+79
View File
@@ -3,7 +3,10 @@ package scanner
import (
"context"
"database/sql"
"fmt"
"io"
"log"
"strings"
"testing"
"time"
@@ -601,3 +604,79 @@ func (d *scannerTreeFakeDrive) EnsureDir(context.Context, string) (string, error
return "", drives.ErrNotSupported
}
func (d *scannerTreeFakeDrive) RootID() string { return "root" }
// captureLog 把 log 包默认输出引到一个 bytes.Buffer,便于断言进度日志被打印;
// 测试结束自动恢复。
func captureLog(t *testing.T) *strings.Builder {
t.Helper()
buf := &strings.Builder{}
originalOutput := log.Writer()
originalFlags := log.Flags()
log.SetOutput(buf)
log.SetFlags(0)
t.Cleanup(func() {
log.SetOutput(originalOutput)
log.SetFlags(originalFlags)
})
return buf
}
func TestScannerProgressHeartbeatEmits(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() { cat.Close() })
// 准备 5 个文件,足够触发循环结尾的 progress() 调用至少一次。
entries := make([]drives.Entry, 5)
for i := range entries {
entries[i] = drives.Entry{
ID: fmt.Sprintf("file-%d", i),
Name: fmt.Sprintf("clip-%d.mp4", i),
Size: 100,
ModTime: time.Now(),
}
}
drv := &scannerFakeDrive{entries: entries}
sc := New(cat, drv, []string{".mp4"}, 5, nil)
// 极短间隔,确保至少一次 heartbeat 在 walk 内被触发
sc.ProgressInterval = 1 * time.Microsecond
buf := captureLog(t)
if _, err := sc.Run(ctx, ""); err != nil {
t.Fatalf("scan: %v", err)
}
out := buf.String()
if !strings.Contains(out, "[scanner] drive=drive progress:") {
t.Fatalf("expected progress heartbeat in log, got:\n%s", out)
}
}
func TestScannerProgressHeartbeatDisabled(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() { cat.Close() })
drv := &scannerFakeDrive{entries: []drives.Entry{
{ID: "f-1", Name: "x.mp4", Size: 1, ModTime: time.Now()},
}}
sc := New(cat, drv, []string{".mp4"}, 5, nil)
sc.ProgressInterval = -1 // 显式关闭
buf := captureLog(t)
if _, err := sc.Run(ctx, ""); err != nil {
t.Fatalf("scan: %v", err)
}
if strings.Contains(buf.String(), "progress:") {
t.Fatalf("progress heartbeat should be silenced when interval < 0, got:\n%s", buf.String())
}
}
+18 -68
View File
@@ -129,8 +129,11 @@ type Config struct {
Catalog *catalog.Catalog
Registry Registry
GetTargetDriveID func() string // 通常对应 App.Spider91UploadDriveID()
Interval time.Duration // 0 时默认 60s
BatchLimit int // 单轮最多迁多少个,0 时默认 50
// Interval 已废弃 —— 旧版迁移 worker 是周期 ticker,新版只通过 nightly
// pipeline 调用 RunOnce,不再有内置定时器。保留字段不删是为了兼容外
// 部 yaml / 测试代码里仍传值的场景。
Interval time.Duration
BatchLimit int // 单轮最多迁多少个,0 时默认 50
// KeepLatestN 是每个 spider91 drive 在本地保留的最新视频数。
// 超过的部分中"已迁移"的会被清理;未迁移的不动。0 时默认 15;< 0 关闭清理。
KeepLatestN int
@@ -143,7 +146,6 @@ type Config struct {
type Migrator struct {
cfg Config
trigger chan struct{}
mu sync.Mutex
running bool
@@ -158,9 +160,6 @@ type Migrator struct {
}
func New(cfg Config) *Migrator {
if cfg.Interval == 0 {
cfg.Interval = 60 * time.Second
}
if cfg.BatchLimit == 0 {
cfg.BatchLimit = 50
}
@@ -171,8 +170,7 @@ func New(cfg Config) *Migrator {
cfg.CaptchaCooldown = 5 * time.Minute
}
return &Migrator{
cfg: cfg,
trigger: make(chan struct{}, 1),
cfg: cfg,
}
}
@@ -228,67 +226,19 @@ func (m *Migrator) markCooldownLogged() bool {
}
// Trigger 安排一次"立即跑"。多次调用会被合并成一次(channel buffer=1)。
func (m *Migrator) Trigger() {
select {
case m.trigger <- struct{}{}:
default:
}
}
// Run 是阻塞循环;ctx 取消时退出。
func (m *Migrator) Run(ctx context.Context) {
t := time.NewTicker(m.cfg.Interval)
defer t.Stop()
var cooldownTimer *time.Timer
var cooldownC <-chan time.Time
stopCooldownTimer := func() {
if cooldownTimer == nil {
return
}
if !cooldownTimer.Stop() {
select {
case <-cooldownTimer.C:
default:
}
}
cooldownTimer = nil
cooldownC = nil
}
resetCooldownTimer := func() {
stopCooldownTimer()
active, until := m.inCooldown()
if !active {
return
}
delay := time.Until(until)
if delay < 0 {
delay = 0
}
cooldownTimer = time.NewTimer(delay)
cooldownC = cooldownTimer.C
}
defer stopCooldownTimer()
// 启动后立刻跑一次(不等第一个 tick)
// RunOnce 跑一次完整迁移:列出所有 spider91 drive,对每个超过 KeepLatestN 的旧
// 视频上传到目标 drive,事务性改写 catalog 行,删本地文件。
//
// 这是上层 nightly 流水线 Phase 3 的入口;不再有周期 ticker / Trigger 通道。
// captcha cooldown 状态在单次 RunOnce 内仍生效(多 drive 时遇到 4002 立即停整轮);
// 跨调用持久 5 分钟,下次 RunOnce 命中冷却期会直接 noop。
//
// 当前实现不会向调用方返回 error —— 单条迁移失败已在内部记日志并跳过;
// 整轮被 cooldown / context 取消时也通过日志可观测。保留 error 返回签名是为
// 给未来需要把 nightly 失败状态展示给 admin 用。
func (m *Migrator) RunOnce(ctx context.Context) error {
m.runOnce(ctx)
resetCooldownTimer()
for {
select {
case <-ctx.Done():
return
case <-t.C:
m.runOnce(ctx)
resetCooldownTimer()
case <-m.trigger:
m.runOnce(ctx)
resetCooldownTimer()
case <-cooldownC:
cooldownTimer = nil
cooldownC = nil
m.runOnce(ctx)
resetCooldownTimer()
}
}
return nil
}
// runOnce 单轮:扫所有 spider91 drive,对每条还有本地文件的视频做迁移。
@@ -324,8 +324,7 @@ func TestRunOnceMigratesSpider91VideosAndCleansLocalFiles(t *testing.T) {
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
Interval: time.Hour, // 测试不靠 ticker
KeepLatestN: -1, // 关闭"保留最新 N 个",让 1 条也能立即上传
KeepLatestN: -1, // 关闭"保留最新 N 个",让 1 条也能立即上传
})
m.runOnce(context.Background())
@@ -805,75 +804,6 @@ func TestRunOnceResumesAfterCooldownExpires(t *testing.T) {
}
}
// TestRunWakesWhenCooldownExpires 验证 Run 循环会在 cooldown 到点后主动唤醒
// 一次迁移,而不是等下一个普通 interval tick。
func TestRunWakesWhenCooldownExpires(t *testing.T) {
cat := setupCatalog(t)
src, _ := setupSpider91(t)
pp := newFakePikPak("pikpak-target", "pikpak-root-id")
migrated := make(chan struct{}, 1)
var failOnce sync.Once
pp.uploadFunc = func(ctx context.Context, parentID, name string, r io.Reader, size int64) (UploadResult, error) {
body, _ := io.ReadAll(r)
var failed bool
failOnce.Do(func() { failed = true })
if failed {
captcha := &pikpak.APIError{ErrorCode: 4002, ErrorMsg: "captcha_invalid"}
return UploadResult{}, fmt.Errorf("pikpak upload: request session: %w", captcha)
}
pp.mu.Lock()
pp.gotBodies[name] = body
pp.mu.Unlock()
return UploadResult{
FileID: "remote-" + name,
Hash: "FAKEHASH40CHARSXXXXXXXXXXXXXXXXXXXXXXXXX",
Size: int64(len(body)),
}, nil
}
reg := newFakeRegistry()
reg.Add(src)
reg.Add(pp)
now := time.Now()
id := writeSpider91Video(t, cat, src, "vk-auto-resume", ".mp4", []byte("payload"), now)
m := New(Config{
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
Interval: time.Hour,
KeepLatestN: -1,
CaptchaCooldown: 30 * time.Millisecond,
OnMigrated: func(videoID string) {
if videoID == id {
select {
case migrated <- struct{}{}:
default:
}
}
},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go m.Run(ctx)
select {
case <-migrated:
case <-time.After(500 * time.Millisecond):
t.Fatalf("Run did not resume migration after cooldown expired")
}
got, err := cat.GetVideo(context.Background(), id)
if err != nil {
t.Fatalf("get video: %v", err)
}
if got.DriveID != pp.ID() {
t.Fatalf("after auto resume, drive_id = %q, want PikPak", got.DriveID)
}
}
// TestNonCaptchaErrorDoesNotTriggerCooldown 验证非 captcha 类的上传错误(如
// 网络抖动)不会让整个 worker 进冷却 —— 只跳过这一条,继续尝试 batch 里其它的。
func TestNonCaptchaErrorDoesNotTriggerCooldown(t *testing.T) {
@@ -936,7 +866,6 @@ func TestRunOnceMigratesToP115Target(t *testing.T) {
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return target.ID() },
Interval: time.Hour,
KeepLatestN: -1,
})
m.runOnce(context.Background())
+28 -4
View File
@@ -1,5 +1,5 @@
import { useEffect, useMemo, useState } from "react";
import { Download, Plus, Power, PowerOff, RefreshCw, RotateCcw, Trash2 } from "lucide-react";
import { Download, PlayCircle, Plus, Power, PowerOff, RefreshCw, RotateCcw, Trash2 } from "lucide-react";
import * as api from "./api";
import { useToast } from "./ToastContext";
import { Modal } from "./Modal";
@@ -199,6 +199,20 @@ export function DrivesPage() {
}
}
/**
* 线Phase1 Phase2 spider91
* Phase3 spider91 202 backend
*
*/
async function handleRunNightly() {
try {
await api.runNightlyJob();
show("已触发完整流水线(扫盘 → 91 爬虫 → 迁移),耗时较长,可在 backend 日志观察进度", "success");
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
}
}
async function handleRegenFailed(d: api.AdminDrive) {
setRegenFailedId(d.id);
try {
@@ -253,9 +267,19 @@ export function DrivesPage() {
<section>
<header className="admin-page__header">
<h1 className="admin-page__title"></h1>
<button className="admin-btn is-primary" onClick={openCreate}>
<Plus size={14} />
</button>
<div style={{ display: "flex", gap: "8px" }}>
<button
type="button"
className="admin-btn"
onClick={handleRunNightly}
title="立即跑一次完整流水线:扫所有云盘 → 91 爬虫 → spider91 视频迁移到云盘。耗时较长,期间不要重复触发。"
>
<PlayCircle size={14} />
</button>
<button className="admin-btn is-primary" onClick={openCreate}>
<Plus size={14} />
</button>
</div>
</header>
{storage && <StorageSummary storage={storage} />}
+13
View File
@@ -273,3 +273,16 @@ export function updateSettings(body: Partial<Settings>) {
body: JSON.stringify(body),
});
}
// ---------- Jobs ----------
/**
* 线Phase1 + Phase2 91 + Phase3
* 202 backend
*
* 线
*/
export function runNightlyJob() {
return request<{ ok: boolean }>("/jobs/nightly/run", { method: "POST" });
}