Improve drive scan task coordination

This commit is contained in:
nianzhibai
2026-06-08 17:37:58 +08:00
parent dc7d2a5de3
commit 5fc8e9ebb7
15 changed files with 815 additions and 109 deletions
+6
View File
@@ -35,3 +35,9 @@ tmp/
91VideoSpider/__pycache__/
__pycache__/
*.pyc
# Local scratch images
/image.jpg
/image003.jpg
/image004.jpg
/image02.png
+247 -68
View File
@@ -170,16 +170,15 @@ func main() {
OnDriveRemoved: func(driveID string) {
app.detachDrive(driveID)
},
OnScanRequested: func(driveID string) {
OnScanRequested: func(driveID string) bool {
// spider91 的"重扫"等同于手动触发一次爬取;其它 drive 走标准 scan
app.mu.Lock()
_, isSpider91 := app.spider91Crawlers[driveID]
app.mu.Unlock()
if isSpider91 {
app.scheduleSpider91Crawl(ctx, driveID)
return
return app.scheduleSpider91Crawl(ctx, driveID)
}
app.scheduleScan(ctx, driveID)
return app.scheduleScan(ctx, driveID)
},
OnStopDriveTasks: func(driveID string) bool {
return app.stopDriveTasks(ctx, driveID)
@@ -318,29 +317,20 @@ type App struct {
// 空字符串表示本地保存不上传,不再自动挑选 pikpak/p115/p123/onedrive drive。
spider91UploadDriveID string
// spider91Migrator 周期把 spider91 视频上传到目标 drivePikPak、115、123 或 OneDrive)。
spider91Migrator *spider91migrate.Migrator
// spider91Migrator 把 spider91 视频上传到目标 drivePikPak、115、123 或 OneDrive)。
spider91Migrator spider91MigrationRunner
// nightlyRunner 是凌晨流水线调度器:每天 cron_hour 串行跑扫盘 → 91 爬虫 → 迁移。
// 也响应 admin 「扫描所有网盘」按钮(TriggerNow)。
nightlyRunner *nightly.Runner
// scanGlobalMu 串行化所有云盘扫盘任务,确保同一时刻全系统只有一个扫盘
// 在跑(包括 admin 手动重扫和 nightly Phase 1)。即便用户同时点多个 drive
// 的"重扫"按钮,goroutine 也会排队等这把锁,逐个执行。
//
// 设计取舍:
// - 不同 drive 的扫盘技术上可以并行(互不干涉),但用户希望"线性来"以
// 避免带宽 / CPU 抢占,所以做全局串行。
// - nightly Phase 1 已经是 for 循环顺序调用 runScan,加了这把锁后行为
// 不变,只是顺手把 admin 异步触发的请求也接入同一条队列。
scanGlobalMu sync.Mutex
// scanQueueMu 保护 scanQueued。
// scanQueueMu 保护 scanQueued 和 scanProgress。
scanQueueMu sync.Mutex
// scanQueued 跟踪哪些 driveID 已经排队或正在跑扫盘/91 爬取,去重后续重复点击。
// 一个 drive 在 scheduleScan/scheduleSpider91Crawl 入队时被加入,后台 goroutine
// 结束时被移除。
// 不同 drive 互不等待,可以并行扫;同一个 drive 只能有一个扫盘/抓取任务。
scanQueued map[string]bool
// scanProgress 跟踪每个正在扫盘/抓取的 drive 当前进度。
scanProgress map[string]driveScanProgress
// taskCancelMu 保护 driveTaskCancels。这里登记的是可被"停止任务"按钮中断
// 的 drive 级任务上下文:扫盘、91 爬取、指纹补队列、失败生成重试等。
@@ -354,6 +344,15 @@ type App struct {
fingerprintQueueing map[string]bool
}
type driveScanProgress struct {
Scanned int
Added int
}
type spider91MigrationRunner interface {
RunOnce(ctx context.Context) error
}
// teaserEnabledForDrive 查询某个 drive 当前的 per-drive 预览视频开关。
//
// 预览视频生成不再由全局 setting 控制,而是由 catalog.drives.teaser_enabled
@@ -491,6 +490,17 @@ func (a *App) loadSpider91UploadDriveID(ctx context.Context) {
}
func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
a.scanQueueMu.Lock()
scanningDrives := make(map[string]bool, len(a.scanQueued))
for id, running := range a.scanQueued {
scanningDrives[id] = running
}
scanProgresses := make(map[string]driveScanProgress, len(a.scanProgress))
for id, progress := range a.scanProgress {
scanProgresses[id] = progress
}
a.scanQueueMu.Unlock()
a.mu.Lock()
previewWorkers := make(map[string]*preview.Worker, len(a.workers))
for id, worker := range a.workers {
@@ -506,7 +516,20 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
}
a.mu.Unlock()
out := make(map[string]api.DriveGenerationStatuses, len(previewWorkers)+len(thumbWorkers)+len(fingerprintWorkers))
out := make(map[string]api.DriveGenerationStatuses, len(scanningDrives)+len(previewWorkers)+len(thumbWorkers)+len(fingerprintWorkers))
for id, running := range scanningDrives {
if !running {
continue
}
progress := scanProgresses[id]
status := out[id]
status.Scan = api.GenerationStatus{
State: "scanning",
ScannedCount: progress.Scanned,
AddedCount: progress.Added,
}
out[id] = status
}
for id, worker := range previewWorkers {
status := out[id]
status.Preview = generationStatusFromPreview(worker.Status())
@@ -856,6 +879,12 @@ func (a *App) attachSpider91Crawler(d *catalog.Drive, drv *spider91.Driver) {
proxyURL := strings.TrimSpace(d.Credentials["proxy"])
driveID := d.ID
var progressMu sync.Mutex
checkedVideos := 0
expectedNewVideos := 0
updateProgress := func(scanned, added int) {
a.updateDriveScanProgress(driveID, scanned, added)
}
c := spider91.NewCrawler(spider91.CrawlerConfig{
Driver: drv,
Catalog: a.cat,
@@ -864,6 +893,35 @@ func (a *App) attachSpider91Crawler(d *catalog.Drive, drv *spider91.Driver) {
WorkDir: filepath.Dir(scriptPath),
CommonThumbDir: a.commonThumbsDir(),
ProxyURL: proxyURL,
OnProgress: func(progress spider91.CrawlProgress) {
progressMu.Lock()
if progress.TotalEntries == 0 && progress.NewVideos == 0 && progress.Skipped == 0 && progress.Failed == 0 {
checkedVideos = 0
expectedNewVideos = 0
} else if progress.TotalEntries > expectedNewVideos {
expectedNewVideos = progress.TotalEntries
}
scanned := checkedVideos
added := expectedNewVideos
progressMu.Unlock()
updateProgress(scanned, added)
},
OnCheckedVideo: func() {
progressMu.Lock()
checkedVideos++
scanned := checkedVideos
added := expectedNewVideos
progressMu.Unlock()
updateProgress(scanned, added)
},
OnExtractedVideo: func() {
progressMu.Lock()
expectedNewVideos++
scanned := checkedVideos
added := expectedNewVideos
progressMu.Unlock()
updateProgress(scanned, added)
},
// 新流程:预览视频不在每条视频入库时立即入队,而是 RunOnce 全部下完后由
// runSpider91Crawl 统一调 enqueueDriveGeneration 一次性入队。这样:
// - 下载阶段不和 ffmpeg 抢 CPU/IO
@@ -1008,6 +1066,7 @@ func (a *App) clearQueuedDriveTask(driveID string) bool {
a.scanQueueMu.Lock()
queued := a.scanQueued[driveID]
delete(a.scanQueued, driveID)
delete(a.scanProgress, driveID)
a.scanQueueMu.Unlock()
return queued
}
@@ -1019,6 +1078,7 @@ func (a *App) clearAllQueuedDriveTasks() []string {
ids = append(ids, id)
}
a.scanQueued = nil
a.scanProgress = nil
a.scanQueueMu.Unlock()
return ids
}
@@ -1042,6 +1102,102 @@ func (a *App) clearAllFingerprintQueueing() []string {
return ids
}
func (a *App) beginDriveScanOrCrawl(driveID string) bool {
driveID = strings.TrimSpace(driveID)
if driveID == "" {
return false
}
a.scanQueueMu.Lock()
defer a.scanQueueMu.Unlock()
if a.scanQueued == nil {
a.scanQueued = make(map[string]bool)
}
if a.scanQueued[driveID] {
return false
}
a.scanQueued[driveID] = true
if a.scanProgress == nil {
a.scanProgress = make(map[string]driveScanProgress)
}
a.scanProgress[driveID] = driveScanProgress{}
return true
}
func (a *App) endDriveScanOrCrawl(driveID string) {
a.scanQueueMu.Lock()
delete(a.scanQueued, driveID)
delete(a.scanProgress, driveID)
a.scanQueueMu.Unlock()
}
func (a *App) updateDriveScanProgress(driveID string, scanned, added int) {
driveID = strings.TrimSpace(driveID)
if driveID == "" {
return
}
a.scanQueueMu.Lock()
if a.scanQueued[driveID] {
if a.scanProgress == nil {
a.scanProgress = make(map[string]driveScanProgress)
}
a.scanProgress[driveID] = driveScanProgress{Scanned: scanned, Added: added}
}
a.scanQueueMu.Unlock()
}
func (a *App) driveHasActiveWork(driveID string) bool {
driveID = strings.TrimSpace(driveID)
if driveID == "" {
return true
}
a.scanQueueMu.Lock()
scanning := a.scanQueued[driveID]
a.scanQueueMu.Unlock()
if scanning {
return true
}
a.taskCancelMu.Lock()
taskContexts := len(a.driveTaskCancels[driveID])
a.taskCancelMu.Unlock()
if taskContexts > 0 {
return true
}
a.fingerprintQueueMu.Lock()
fingerprintQueueing := a.fingerprintQueueing[driveID]
a.fingerprintQueueMu.Unlock()
if fingerprintQueueing {
return true
}
a.mu.Lock()
previewWorker := a.workers[driveID]
thumbWorker := a.thumbWorkers[driveID]
fingerprintWorker := a.fingerprintWorkers[driveID]
a.mu.Unlock()
if previewTaskBusy(thumbWorker.Status()) {
return true
}
if previewTaskBusy(previewWorker.Status()) {
return true
}
if fingerprintTaskBusy(fingerprintWorker.Status()) {
return true
}
return false
}
func previewTaskBusy(status preview.TaskStatus) bool {
return status.State != "" && status.State != "idle"
}
func fingerprintTaskBusy(status fingerprint.TaskStatus) bool {
return status.State != "" && status.State != "idle"
}
func (a *App) resetDriveGenerationWorkers(ctx context.Context, driveID string) bool {
var drv drives.Drive
var attached bool
@@ -1355,52 +1511,41 @@ func (a *App) listDriveDirChildren(ctx context.Context, driveID, parentID string
// scheduleScan 异步触发某个 drive 的扫盘。
//
// 调用立即返回;扫盘任务在后台 goroutine 里排队执行 —— 系统中所有扫盘共享
// 一把 scanGlobalMu,按提交顺序串行跑
//
// 去重:如果该 drive 已经在排队或正在跑,重复请求会被丢弃并记日志。这样用户
// 反复点同一个 drive 的"重扫"按钮,也只会有一次实际工作。
//
// 用于 admin UI「重扫」、「立即抓取」这类异步触发;nightly Phase 1 应继续直接
// 调 runScan(同步、按 for 循环顺序),不需要走 scheduleScan。
func (a *App) scheduleScan(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
a.scanQueueMu.Lock()
if a.scanQueued == nil {
a.scanQueued = make(map[string]bool)
// 调用立即返回。不同 drive 的扫盘可以并行;同一个 drive 如果已有扫盘、封面、
// 预览视频或指纹任务在跑,本次请求会被拒绝
func (a *App) scheduleScan(ctx context.Context, driveID string) bool {
if a.driveHasActiveWork(driveID) {
log.Printf("[scan] drive=%s has active work, skip duplicate request", driveID)
return false
}
if a.scanQueued[driveID] {
a.scanQueueMu.Unlock()
done()
if !a.beginDriveScanOrCrawl(driveID) {
log.Printf("[scan] drive=%s already queued or running, skip duplicate request", driveID)
return
return false
}
a.scanQueued[driveID] = true
a.scanQueueMu.Unlock()
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
go func() {
defer func() {
a.scanQueueMu.Lock()
delete(a.scanQueued, driveID)
a.scanQueueMu.Unlock()
a.endDriveScanOrCrawl(driveID)
done()
}()
a.runScanWithTaskContext(taskCtx, driveID)
}()
return true
}
func (a *App) runScan(ctx context.Context, driveID string) {
if !a.beginDriveScanOrCrawl(driveID) {
log.Printf("[scan] drive=%s already queued or running, skip direct scan", driveID)
return
}
defer a.endDriveScanOrCrawl(driveID)
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
a.runScanWithTaskContext(taskCtx, driveID)
}
func (a *App) runScanWithTaskContext(ctx context.Context, driveID string) {
// 全局串行:同一时刻只有一个扫盘任务在跑(admin 重扫 + nightly Phase 1 共用)。
// 等待这把锁的 goroutine 在排队,按到达顺序逐个执行。
a.scanGlobalMu.Lock()
defer a.scanGlobalMu.Unlock()
if err := ctx.Err(); err != nil {
log.Printf("[scan] drive=%s canceled before start: %v", driveID, err)
return
@@ -1438,6 +1583,9 @@ func (a *App) runScanWithTaskContext(ctx context.Context, driveID string) {
return
}
sc := scanner.New(a.cat, drv, a.cfg.Scanner.VideoExtensions, d.SkipDirIDs, onNew)
sc.OnProgress = func(stats scanner.Stats) {
a.updateDriveScanProgress(driveID, stats.Scanned, stats.Added)
}
startID := d.RootID
@@ -2310,30 +2458,27 @@ func shouldScanDrive(d drives.Drive) bool {
// ---------- spider91 crawl ----------
func (a *App) scheduleSpider91Crawl(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
a.scanQueueMu.Lock()
if a.scanQueued == nil {
a.scanQueued = make(map[string]bool)
func (a *App) scheduleSpider91Crawl(ctx context.Context, driveID string) bool {
if a.driveHasActiveWork(driveID) {
log.Printf("[spider91] drive=%s has active work, skip duplicate crawl request", driveID)
return false
}
if a.scanQueued[driveID] {
a.scanQueueMu.Unlock()
done()
if !a.beginDriveScanOrCrawl(driveID) {
log.Printf("[spider91] drive=%s already queued or running, skip duplicate crawl request", driveID)
return
return false
}
a.scanQueued[driveID] = true
a.scanQueueMu.Unlock()
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
go func() {
defer func() {
a.scanQueueMu.Lock()
delete(a.scanQueued, driveID)
a.scanQueueMu.Unlock()
a.endDriveScanOrCrawl(driveID)
done()
}()
a.runSpider91CrawlWithTaskContext(taskCtx, driveID)
if a.runSpider91CrawlWithTaskContext(taskCtx, driveID) {
a.runSpider91MigrationAfterManualCrawl(taskCtx, driveID)
}
}()
return true
}
// runSpider91Crawl 运行一次完整爬取流程并把 last_crawl_at 写回 drive.credentials。
@@ -2342,15 +2487,20 @@ func (a *App) scheduleSpider91Crawl(ctx context.Context, driveID string) {
// 流水线重跑时仍会重试。该方法是阻塞的,被 nightly Phase 2 串行调用,以及被
// admin "立即抓取" 单 drive 异步调用。
func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
if !a.beginDriveScanOrCrawl(driveID) {
log.Printf("[spider91] drive=%s already queued or running, skip direct crawl", driveID)
return
}
defer a.endDriveScanOrCrawl(driveID)
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
a.runSpider91CrawlWithTaskContext(taskCtx, driveID)
}
func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID string) {
func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID string) bool {
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s crawl canceled before start: %v", driveID, err)
return
return false
}
a.mu.Lock()
c := a.spider91Crawlers[driveID]
@@ -2358,21 +2508,21 @@ func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID strin
if c == nil {
if err := a.ensureDriveAttached(ctx, driveID); err != nil {
log.Printf("[spider91] drive=%s attach failed: %v", driveID, err)
return
return false
}
a.mu.Lock()
c = a.spider91Crawlers[driveID]
a.mu.Unlock()
if c == nil {
log.Printf("[spider91] drive=%s crawler not attached", driveID)
return
return false
}
}
d, err := a.cat.GetDrive(ctx, driveID)
if err != nil || d == nil {
log.Printf("[spider91] drive=%s lookup failed: %v", driveID, err)
return
return false
}
targetNew := spider91IntCred(d, "target_new", spider91.DefaultTargetNew)
if targetNew <= 0 {
@@ -2406,7 +2556,7 @@ func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID strin
}
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s crawl canceled after run: %v", driveID, err)
return
return false
}
// 爬取全部完成后,统一把所有还 pending 的预览视频入队。
@@ -2421,6 +2571,35 @@ func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID strin
a.mu.Unlock()
a.scheduleFingerprintBackfill(ctx, driveID, fingerprintWorker)
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
return runErr == nil
}
func (a *App) runSpider91MigrationAfterManualCrawl(ctx context.Context, driveID string) {
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s skip post-crawl migration: %v", driveID, err)
return
}
targetDriveID := a.Spider91UploadDriveID()
if targetDriveID == "" {
return
}
if a.spider91Migrator == nil {
log.Printf("[spider91] drive=%s skip post-crawl migration: migrator not configured", driveID)
return
}
log.Printf("[spider91] drive=%s waiting for generation queues before post-crawl migration target=%s", driveID, targetDriveID)
if err := a.waitAllPreviewQueuesIdle(ctx); err != nil {
log.Printf("[spider91] drive=%s post-crawl migration wait canceled: %v", driveID, err)
return
}
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s skip post-crawl migration after wait: %v", driveID, err)
return
}
log.Printf("[spider91] drive=%s running post-crawl migration target=%s", driveID, targetDriveID)
if err := a.spider91Migrator.RunOnce(ctx); err != nil {
log.Printf("[spider91] drive=%s post-crawl migration: %v", driveID, err)
}
}
// spider91IntCred 解析 credentials 中的整数字段,缺省时返回 def。
+172
View File
@@ -260,6 +260,7 @@ func TestStopDriveTasksCancelsQueuedTasksAndReplacesWorkers(t *testing.T) {
"drive-id": func() { close(oldCanceled) },
},
scanQueued: map[string]bool{"drive-id": true},
scanProgress: map[string]driveScanProgress{"drive-id": {Scanned: 8, Added: 2}},
fingerprintQueueing: map[string]bool{"drive-id": true},
}
taskCtx, done := app.registerDriveTaskContext(ctx, "drive-id")
@@ -279,6 +280,9 @@ func TestStopDriveTasksCancelsQueuedTasksAndReplacesWorkers(t *testing.T) {
if app.scanQueued["drive-id"] {
t.Fatal("scan queue marker was not cleared")
}
if _, ok := app.scanProgress["drive-id"]; ok {
t.Fatal("scan progress marker was not cleared")
}
if app.fingerprintQueueing["drive-id"] {
t.Fatal("fingerprint queue marker was not cleared")
}
@@ -304,6 +308,117 @@ func TestStopDriveTasksCancelsQueuedTasksAndReplacesWorkers(t *testing.T) {
newCancel()
}
func TestScheduleScanRejectsDriveWithActiveGenerationWork(t *testing.T) {
ctx := context.Background()
thumbWorker := preview.NewThumbWorker(&serverFakeTeaserGenerator{}, nil, &serverFakeDrive{})
if !thumbWorker.Enqueue(&catalog.Video{ID: "busy-video", DriveID: "drive-id", Title: "Busy Video"}) {
t.Fatal("failed to enqueue busy thumbnail task")
}
app := &App{
thumbWorkers: map[string]*preview.ThumbWorker{"drive-id": thumbWorker},
}
if app.scheduleScan(ctx, "drive-id") {
t.Fatal("scheduleScan accepted a drive with active generation work")
}
}
func TestScheduleScanRunsDifferentDrivesConcurrently(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
seedDriveWithTeaser(t, cat, "drive-a", true)
seedDriveWithTeaser(t, cat, "drive-b", true)
started := make(chan string, 2)
release := make(chan struct{})
registry := proxy.NewRegistry()
registry.Set("drive-a", &serverBlockingListDrive{id: "drive-a", started: started, release: release})
registry.Set("drive-b", &serverBlockingListDrive{id: "drive-b", started: started, release: release})
app := &App{
cfg: &config.Config{
Scanner: config.Scanner{VideoExtensions: []string{".mp4"}},
},
cat: cat,
registry: registry,
}
if !app.scheduleScan(ctx, "drive-a") {
t.Fatal("scheduleScan drive-a was rejected")
}
if !app.scheduleScan(ctx, "drive-b") {
t.Fatal("scheduleScan drive-b was rejected")
}
seen := map[string]struct{}{}
deadline := time.After(time.Second)
for len(seen) < 2 {
select {
case id := <-started:
seen[id] = struct{}{}
case <-deadline:
close(release)
t.Fatalf("started drives = %#v, want both drives before releasing List", seen)
}
}
close(release)
}
func TestDriveGenerationStatusIncludesScanState(t *testing.T) {
app := &App{
scanQueued: map[string]bool{"drive-id": true},
scanProgress: map[string]driveScanProgress{"drive-id": {Scanned: 12, Added: 3}},
}
status := app.driveGenerationStatuses()["drive-id"].Scan
if status.State != "scanning" {
t.Fatalf("scan status = %#v, want scanning", status)
}
if status.ScannedCount != 12 || status.AddedCount != 3 {
t.Fatalf("scan counts = scanned %d added %d, want 12 and 3", status.ScannedCount, status.AddedCount)
}
}
func TestRunSpider91MigrationAfterManualCrawlRequiresConfiguredUploadTarget(t *testing.T) {
ctx := context.Background()
registry := proxy.NewRegistry()
migrator := &serverFakeSpider91MigrationRunner{}
app := &App{
registry: registry,
spider91Migrator: migrator,
workers: map[string]*preview.Worker{},
thumbWorkers: map[string]*preview.ThumbWorker{},
fingerprintWorkers: map[string]*fingerprint.Worker{},
}
app.runSpider91MigrationAfterManualCrawl(ctx, "91spider")
if migrator.called != 0 {
t.Fatalf("migration called without upload target")
}
app.spider91UploadDriveID = "pikpak"
app.runSpider91MigrationAfterManualCrawl(ctx, "91spider")
if migrator.called != 0 {
t.Fatalf("migration called when upload target is not attached")
}
registry.Set("pikpak", &serverFakeKindDrive{id: "pikpak", kind: "pikpak"})
app.runSpider91MigrationAfterManualCrawl(ctx, "91spider")
if migrator.called != 1 {
t.Fatalf("migration calls = %d, want 1", migrator.called)
}
}
func TestDriveGenerationStatusUsesWorkerQueueNotPendingCatalogRows(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
@@ -1496,6 +1611,63 @@ func (d *serverFakeDrive) EnsureDir(context.Context, string) (string, error) {
}
func (d *serverFakeDrive) RootID() string { return "root" }
type serverFakeKindDrive struct {
serverFakeDrive
id string
kind string
}
func (d *serverFakeKindDrive) Kind() string { return d.kind }
func (d *serverFakeKindDrive) ID() string { return d.id }
type serverFakeSpider91MigrationRunner struct {
called int
}
func (r *serverFakeSpider91MigrationRunner) RunOnce(context.Context) error {
r.called++
return nil
}
type serverBlockingListDrive struct {
id string
started chan string
release chan struct{}
}
func (d *serverBlockingListDrive) Kind() string { return "fake" }
func (d *serverBlockingListDrive) ID() string { return d.id }
func (d *serverBlockingListDrive) Init(context.Context) error {
return nil
}
func (d *serverBlockingListDrive) List(ctx context.Context, _ string) ([]drives.Entry, error) {
if d.started != nil {
select {
case d.started <- d.id:
default:
}
}
select {
case <-d.release:
return nil, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (d *serverBlockingListDrive) Stat(context.Context, string) (*drives.Entry, error) {
return nil, drives.ErrNotSupported
}
func (d *serverBlockingListDrive) StreamURL(context.Context, string) (*drives.StreamLink, error) {
return &drives.StreamLink{URL: "https://video.example/clip.mp4"}, nil
}
func (d *serverBlockingListDrive) Upload(context.Context, string, string, io.Reader, int64) (string, error) {
return "", drives.ErrNotSupported
}
func (d *serverBlockingListDrive) EnsureDir(context.Context, string) (string, error) {
return "", drives.ErrNotSupported
}
func (d *serverBlockingListDrive) RootID() string { return "root" }
type serverFingerprintFakeDrive struct {
serverFakeDrive
path string
+39 -6
View File
@@ -45,7 +45,7 @@ type AdminServer struct {
OnDriveSaved func(driveID string) error
OnDriveDeleteCleanup func(ctx context.Context, driveID string) (int, error)
OnDriveRemoved func(driveID string)
OnScanRequested func(driveID string)
OnScanRequested func(driveID string) bool
OnStopDriveTasks func(driveID string) bool
OnStopAllTasks func() int
OnRegenPreview func(videoID string)
@@ -81,6 +81,11 @@ type AdminServer struct {
P123HTTPClient *http.Client
}
const (
driveTaskBusyMessage = "当前存储有正在进行的任务,请稍后重试"
fullScanBusyMessage = "当前有全量扫描任务正在进行,请稍后重试"
)
// DriveDirEntry 是 dirtree 接口的一条返回项:网盘上的一个目录节点。
type DriveDirEntry struct {
ID string `json:"id"`
@@ -92,9 +97,12 @@ type GenerationStatus struct {
CurrentTitle string `json:"currentTitle,omitempty"`
QueueLength int `json:"queueLength"`
CooldownUntil string `json:"cooldownUntil,omitempty"`
ScannedCount int `json:"scannedCount"`
AddedCount int `json:"addedCount"`
}
type DriveGenerationStatuses struct {
Scan GenerationStatus `json:"scan"`
Thumbnail GenerationStatus `json:"thumbnail"`
Preview GenerationStatus `json:"preview"`
Fingerprint GenerationStatus `json:"fingerprint"`
@@ -417,6 +425,7 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
// 其它 kind 留 0;前端用它显示"上次抓取: N 小时前"。
Spider91Proxy string `json:"spider91Proxy,omitempty"`
LastCrawlAt int64 `json:"lastCrawlAt,omitempty"`
ScanGenerationStatus GenerationStatus `json:"scanGenerationStatus"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
FingerprintGenerationStatus GenerationStatus `json:"fingerprintGenerationStatus"`
@@ -437,6 +446,9 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
thumbCounts := thumbnailCounts[d.ID]
fingerprintCount := fingerprintCounts[d.ID]
generation := generationStatuses[d.ID]
if generation.Scan.State == "" {
generation.Scan.State = "idle"
}
if generation.Thumbnail.State == "" {
generation.Thumbnail.State = "idle"
}
@@ -476,6 +488,7 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
SkipDirIDs: append([]string{}, d.SkipDirIDs...),
Spider91Proxy: spider91ProxyForDrive(d),
LastCrawlAt: lastCrawlAt,
ScanGenerationStatus: generation.Scan,
ThumbnailGenerationStatus: generation.Thumbnail,
PreviewGenerationStatus: generation.Preview,
FingerprintGenerationStatus: generation.Fingerprint,
@@ -675,10 +688,26 @@ type deleteDriveReq struct {
func (a *AdminServer) handleRescan(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if a.OnScanRequested != nil {
a.OnScanRequested(id)
status := a.nightlyJobStatus()
if status.Running || status.Queued {
writeJSON(w, http.StatusAccepted, map[string]any{
"ok": true,
"accepted": false,
"message": fullScanBusyMessage,
"status": status,
})
return
}
writeJSON(w, http.StatusAccepted, map[string]any{"ok": true})
accepted := true
if a.OnScanRequested != nil {
accepted = a.OnScanRequested(id)
}
resp := map[string]any{"ok": true, "accepted": accepted}
if !accepted {
resp["message"] = driveTaskBusyMessage
}
writeJSON(w, http.StatusAccepted, resp)
}
func (a *AdminServer) handleStopDriveTasks(w http.ResponseWriter, r *http.Request) {
@@ -734,11 +763,15 @@ func (a *AdminServer) handleRunNightlyJob(w http.ResponseWriter, r *http.Request
if a.OnRunNightlyJob != nil {
accepted = a.OnRunNightlyJob()
}
writeJSON(w, http.StatusAccepted, map[string]any{
resp := map[string]any{
"ok": true,
"accepted": accepted,
"status": a.nightlyJobStatus(),
})
}
if !accepted {
resp["message"] = fullScanBusyMessage
}
writeJSON(w, http.StatusAccepted, resp)
}
func (a *AdminServer) handleNightlyJobStatus(w http.ResponseWriter, r *http.Request) {
+114 -1
View File
@@ -278,6 +278,108 @@ func TestHandleRunNightlyJobReturnsAcceptedStatus(t *testing.T) {
}
}
func TestHandleRunNightlyJobReturnsBusyMessageWhenRejected(t *testing.T) {
req := httptest.NewRequest(http.MethodPost, "/admin/api/jobs/nightly/run", nil)
rr := httptest.NewRecorder()
(&AdminServer{
OnRunNightlyJob: func() bool {
return false
},
GetNightlyJobStatus: func() NightlyJobStatus {
return NightlyJobStatus{State: "running", Running: true}
},
}).handleRunNightlyJob(rr, req)
if rr.Code != http.StatusAccepted {
t.Fatalf("status = %d, want 202; body = %s", rr.Code, rr.Body.String())
}
var got struct {
OK bool `json:"ok"`
Accepted bool `json:"accepted"`
Message string `json:"message"`
Status NightlyJobStatus `json:"status"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
if !got.OK || got.Accepted || got.Message != fullScanBusyMessage || !got.Status.Running {
t.Fatalf("response = %#v, want rejected busy message", got)
}
}
func TestHandleRescanRejectsWhenNightlyBusy(t *testing.T) {
called := false
req := httptest.NewRequest(http.MethodPost, "/admin/api/drives/PikPak/rescan", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", "PikPak")
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
rr := httptest.NewRecorder()
(&AdminServer{
OnScanRequested: func(driveID string) bool {
called = true
return true
},
GetNightlyJobStatus: func() NightlyJobStatus {
return NightlyJobStatus{State: "running", Running: true}
},
}).handleRescan(rr, req)
if rr.Code != http.StatusAccepted {
t.Fatalf("status = %d, want 202; body = %s", rr.Code, rr.Body.String())
}
if called {
t.Fatal("OnScanRequested was called while nightly job was busy")
}
var got struct {
OK bool `json:"ok"`
Accepted bool `json:"accepted"`
Message string `json:"message"`
Status NightlyJobStatus `json:"status"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
if !got.OK || got.Accepted || got.Message != fullScanBusyMessage || !got.Status.Running {
t.Fatalf("response = %#v, want rejected full scan busy message", got)
}
}
func TestHandleRescanReturnsAcceptedFlagAndBusyMessage(t *testing.T) {
calledWith := ""
req := httptest.NewRequest(http.MethodPost, "/admin/api/drives/PikPak/rescan", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", "PikPak")
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
rr := httptest.NewRecorder()
(&AdminServer{
OnScanRequested: func(driveID string) bool {
calledWith = driveID
return false
},
}).handleRescan(rr, req)
if rr.Code != http.StatusAccepted {
t.Fatalf("status = %d, want 202; body = %s", rr.Code, rr.Body.String())
}
var got struct {
OK bool `json:"ok"`
Accepted bool `json:"accepted"`
Message string `json:"message"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
if calledWith != "PikPak" {
t.Fatalf("hook called with %q, want PikPak", calledWith)
}
if !got.OK || got.Accepted || got.Message != driveTaskBusyMessage {
t.Fatalf("response = %#v, want rejected busy message", got)
}
}
func TestHandleNightlyJobStatusDefaultsToIdle(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "/admin/api/jobs/nightly/status", nil)
rr := httptest.NewRecorder()
@@ -854,6 +956,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
GetDriveGenerationStatuses: func() map[string]DriveGenerationStatuses {
return map[string]DriveGenerationStatuses{
"OneDrive": {
Scan: GenerationStatus{State: "scanning", ScannedCount: 12, AddedCount: 3},
Thumbnail: GenerationStatus{State: "cooling", QueueLength: 3, CooldownUntil: "2026-05-16T21:00:00+08:00"},
Preview: GenerationStatus{State: "generating", CurrentTitle: "OD Pending"},
Fingerprint: GenerationStatus{State: "generating", CurrentTitle: "OD Pending"},
@@ -867,6 +970,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
}
var got []struct {
ID string `json:"id"`
ScanGenerationStatus GenerationStatus `json:"scanGenerationStatus"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
FingerprintGenerationStatus GenerationStatus `json:"fingerprintGenerationStatus"`
@@ -895,6 +999,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
FingerprintReady int
FingerprintPending int
FingerprintFailed int
Scan GenerationStatus
Thumbnail GenerationStatus
Preview GenerationStatus
Fingerprint GenerationStatus
@@ -911,6 +1016,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
FingerprintReady int
FingerprintPending int
FingerprintFailed int
Scan GenerationStatus
Thumbnail GenerationStatus
Preview GenerationStatus
Fingerprint GenerationStatus
@@ -925,6 +1031,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
FingerprintReady: d.FingerprintReadyCount,
FingerprintPending: d.FingerprintPendingCount,
FingerprintFailed: d.FingerprintFailedCount,
Scan: d.ScanGenerationStatus,
Thumbnail: d.ThumbnailGenerationStatus,
Preview: d.PreviewGenerationStatus,
Fingerprint: d.FingerprintGenerationStatus,
@@ -942,6 +1049,12 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
if byID["OneDrive"].Thumbnail.State != "cooling" || byID["OneDrive"].Preview.State != "generating" {
t.Fatalf("OneDrive generation statuses = %#v, want thumbnail cooling and preview generating", byID["OneDrive"])
}
if byID["OneDrive"].Scan.State != "scanning" {
t.Fatalf("OneDrive scan status = %#v, want scanning", byID["OneDrive"].Scan)
}
if byID["OneDrive"].Scan.ScannedCount != 12 || byID["OneDrive"].Scan.AddedCount != 3 {
t.Fatalf("OneDrive scan counts = %#v, want scanned=12 added=3", byID["OneDrive"].Scan)
}
if byID["OneDrive"].FingerprintReady != 1 || byID["OneDrive"].FingerprintPending != 1 || byID["OneDrive"].FingerprintFailed != 1 {
t.Fatalf("OneDrive fingerprint counts = %#v, want ready=1 pending=1 failed=1", byID["OneDrive"])
}
@@ -957,7 +1070,7 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
if byID["PikPak"].FingerprintPending != 2 {
t.Fatalf("PikPak fingerprint counts = %#v, want pending=2", byID["PikPak"])
}
if byID["PikPak"].Thumbnail.State != "idle" || byID["PikPak"].Preview.State != "idle" || byID["PikPak"].Fingerprint.State != "idle" {
if byID["PikPak"].Scan.State != "idle" || byID["PikPak"].Thumbnail.State != "idle" || byID["PikPak"].Preview.State != "idle" || byID["PikPak"].Fingerprint.State != "idle" {
t.Fatalf("PikPak generation statuses = %#v, want idle defaults", byID["PikPak"])
}
}
+58 -5
View File
@@ -64,6 +64,12 @@ type CrawlerConfig struct {
// OnNewVideo 是新视频成功入库后的回调,用于触发预览视频 worker。
OnNewVideo func(v *catalog.Video)
// OnProgress 在抓取统计变化时触发,用于后台管理页展示实时进度。
OnProgress func(progress CrawlProgress)
// OnCheckedVideo 在 Python 爬虫开始检查一个列表页视频时触发。
OnCheckedVideo func()
// OnExtractedVideo 在 Python 爬虫提取到一个新视频直链时触发。
OnExtractedVideo func()
}
// Crawler 把 Python 爬虫产出包装成 catalog 入库流程。
@@ -219,6 +225,16 @@ type CrawlResult struct {
SeenFile string
}
// CrawlProgress 是 RunOnce 过程中可安全对外发布的实时计数。
type CrawlProgress struct {
TargetNew int
TotalEntries int
NewVideos int
Skipped int
Failed int
SeenSnapshot int
}
// spiderVideoEntry 对应 spider_91porn.py 输出 JSON 中的单条视频。
type spiderVideoEntry struct {
Title string `json:"title"`
@@ -266,6 +282,20 @@ func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, err
result := &CrawlResult{TargetNew: targetNew, StartedAt: time.Now()}
defer func() { result.FinishedAt = time.Now() }()
emitProgress := func() {
if c.cfg.OnProgress == nil {
return
}
c.cfg.OnProgress(CrawlProgress{
TargetNew: result.TargetNew,
TotalEntries: result.TotalEntries,
NewVideos: result.NewVideos,
Skipped: result.Skipped,
Failed: result.Failed,
SeenSnapshot: result.SeenSnapshot,
})
}
emitProgress()
// 1. 准备 .crawl/ 目录 + 已知源视频 ID 列表
//
@@ -291,6 +321,7 @@ func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, err
return result, fmt.Errorf("spider91 crawler: build seen list: %w", err)
}
result.SeenSnapshot = seenCount
emitProgress()
// 2-3. 启动 Python 爬虫(流式 stdout 协议),并边读边处理。
//
@@ -321,9 +352,11 @@ func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, err
continue
}
result.TotalEntries++
emitProgress()
sourceID := sourceIDForItem(item)
if sourceID == "" || strings.TrimSpace(item.VideoURL) == "" {
result.Failed++
emitProgress()
continue
}
if result.NewVideos >= targetNew {
@@ -335,22 +368,27 @@ func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, err
if err != nil {
log.Printf("[spider91] drive=%s viewkey=%s source_id=%s check deleted: %v", c.cfg.Driver.ID(), item.Viewkey, sourceID, err)
result.Failed++
emitProgress()
continue
}
if deleted {
result.Skipped++
emitProgress()
continue
}
if existing, _ := c.cfg.Catalog.GetVideo(ctx, videoID); existing != nil {
result.Skipped++
emitProgress()
continue
}
if perr := c.processOne(ctx, videoID, item); perr != nil {
log.Printf("[spider91] drive=%s viewkey=%s source_id=%s failed: %v", c.cfg.Driver.ID(), item.Viewkey, sourceID, perr)
result.Failed++
emitProgress()
continue
}
result.NewVideos++
emitProgress()
}
if scerr := scanner.Err(); scerr != nil {
log.Printf("[spider91] drive=%s stdout scan: %v", c.cfg.Driver.ID(), scerr)
@@ -458,12 +496,12 @@ func (c *Crawler) startSpiderTargetNew(ctx context.Context, targetNew int, seenP
return nil, nil, fmt.Errorf("start: %w", err)
}
// stderr 转发到 backend log。子进程退出时 reader 自动 EOFgoroutine 自然结束。
go forwardSpiderLog(c.cfg.Driver.ID(), stderr)
go forwardSpiderLog(c.cfg.Driver.ID(), stderr, c.cfg.OnCheckedVideo, c.cfg.OnExtractedVideo)
return cmd, stdout, nil
}
// forwardSpiderLog 把 Python stderr 逐行转发到 backend log,便于调试。
func forwardSpiderLog(driveID string, r io.Reader) {
func forwardSpiderLog(driveID string, r io.Reader, onCheckedVideo func(), onExtractedVideo func()) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
for scanner.Scan() {
@@ -472,9 +510,23 @@ func forwardSpiderLog(driveID string, r io.Reader) {
continue
}
log.Printf("[spider91:py] drive=%s %s", driveID, line)
if onCheckedVideo != nil && isSpider91CheckedVideoLogLine(line) {
onCheckedVideo()
}
if onExtractedVideo != nil && isSpider91ExtractedVideoLogLine(line) {
onExtractedVideo()
}
}
}
func isSpider91CheckedVideoLogLine(line string) bool {
return checkedVideoLogRE.MatchString(line)
}
func isSpider91ExtractedVideoLogLine(line string) bool {
return strings.Contains(line, "[OK] 成功提取视频直链")
}
// processOne 处理单个 91 源视频:下载视频 + 封面 + 复制封面 + 入库。
// 任一步失败会清理已写入的临时文件,不留半成品。
func (c *Crawler) processOne(ctx context.Context, videoID string, item spiderVideoEntry) error {
@@ -847,9 +899,10 @@ func spider91CookieHeader(cookies []*http.Cookie) string {
}
var (
strencode2RE = regexp.MustCompile(`strencode2\(["']([^"']+)["']\)`)
srcAttrRE = regexp.MustCompile(`src=['"]([^'"]+)['"]`)
mp4URLRE = regexp.MustCompile(`https?://[^\s"'<>]+\.mp4[^\s"'<>]*`)
checkedVideoLogRE = regexp.MustCompile(`处理视频\s+\d+/\d+:`)
strencode2RE = regexp.MustCompile(`strencode2\(["']([^"']+)["']\)`)
srcAttrRE = regexp.MustCompile(`src=['"]([^'"]+)['"]`)
mp4URLRE = regexp.MustCompile(`https?://[^\s"'<>]+\.mp4[^\s"'<>]*`)
)
func parseSpider91VideoURL(html string) string {
@@ -707,6 +707,18 @@ func TestSpider91CookieHeader(t *testing.T) {
}
}
func TestSpider91ProgressLogLineClassifiers(t *testing.T) {
if !isSpider91CheckedVideoLogLine("[2026-06-08 16:49:17] 处理视频 3/24: 标题") {
t.Fatal("checked video log line was not recognized")
}
if isSpider91CheckedVideoLogLine("[2026-06-08 16:49:17] [页 2] 发现 24 个视频") {
t.Fatal("page summary log line should not count as checked video")
}
if !isSpider91ExtractedVideoLogLine("[2026-06-08 16:49:39] [OK] 成功提取视频直链") {
t.Fatal("extracted video log line was not recognized")
}
}
func spider91DetailHTML(videoURL string) string {
fragment := `<video><source src="` + videoURL + `" type="video/mp4"></video>`
return `document.write(strencode2("` + url.PathEscape(fragment) + `"));`
+8 -1
View File
@@ -25,6 +25,8 @@ type Scanner struct {
SkipDirIDs map[string]struct{}
// 回调:新视频被加入后触发预览视频生成
OnNewVideo func(v *catalog.Video)
// OnProgress 在扫描进度变化时触发。回调只应读取 Stats 里的计数,不应修改 map 字段。
OnProgress func(stats Stats)
// ProgressInterval 控制扫描内部 heartbeat 的最小输出间隔。
// 0 → 默认 30s< 0 → 关闭 heartbeat(仅留外层 start / done 两行)。
// heartbeat 单行格式:
@@ -91,6 +93,9 @@ func (s *Scanner) Run(ctx context.Context, startDirID string) (Stats, error) {
driveID = s.Drive.ID()
}
progress := func(currentDir string) {
if s.OnProgress != nil {
s.OnProgress(stats)
}
if interval < 0 {
return
}
@@ -149,7 +154,6 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, stats *Stats,
continue
}
stats.Scanned++
ext := strings.ToLower(path.Ext(e.Name))
if !s.Exts[ext] {
continue
@@ -157,6 +161,8 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, stats *Stats,
if e.Size <= 0 {
continue
}
stats.Scanned++
progress(dirName)
stats.SeenFileIDs[e.ID] = struct{}{}
id := s.Drive.Kind() + "-" + s.Drive.ID() + "-" + e.ID
@@ -266,6 +272,7 @@ func (s *Scanner) walk(ctx context.Context, dirID, dirName string, stats *Stats,
return err
}
stats.Added++
progress(dirName)
if s.OnNewVideo != nil {
s.OnNewVideo(v)
}
+33
View File
@@ -91,6 +91,39 @@ func TestRunIgnoresZeroSizeVideoFiles(t *testing.T) {
}
}
func TestRunScannedCountsOnlyVideoCandidates(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
drv := &scannerFakeDrive{
entries: []drives.Entry{
{ID: "file-1", Name: "clip.mp4", Size: 123},
{ID: "file-2", Name: "notes.txt", Size: 123},
{ID: "file-3", Name: "empty.mp4", Size: 0},
},
}
sc := New(cat, drv, []string{".mp4"}, nil, nil)
stats, err := sc.Run(ctx, "")
if err != nil {
t.Fatalf("scan: %v", err)
}
if stats.Scanned != 1 {
t.Fatalf("scanned = %d, want one non-empty video candidate", stats.Scanned)
}
if stats.Added != 1 {
t.Fatalf("added = %d, want one added video", stats.Added)
}
}
func TestRunStopsWhenContextCanceledDuringFileLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
+57 -12
View File
@@ -38,6 +38,21 @@ import { DriveForm } from "./drive/DriveForm";
import { DeleteDriveModal } from "./drive/DeleteDriveModal";
import { SkipDirsPanel } from "./drive/SkipDirsPanel";
const DRIVE_BUSY_MESSAGE = "当前存储有正在进行的任务,请稍后重试";
const NIGHTLY_BUSY_MESSAGE = "当前有全量扫描任务正在进行,请稍后重试";
function isDriveBusy(d: api.AdminDrive) {
return [
d.scanGenerationStatus,
d.thumbnailGenerationStatus,
d.previewGenerationStatus,
d.fingerprintGenerationStatus,
].some((status) => {
const state = status?.state || "idle";
return state !== "idle";
});
}
export function DrivesPage() {
const [list, setList] = useState<api.AdminDrive[]>([]);
const [storage, setStorage] = useState<api.AdminDriveStorage | null>(null);
@@ -61,7 +76,8 @@ export function DrivesPage() {
const [scanningAll, setScanningAll] = useState(false);
const [stoppingAll, setStoppingAll] = useState(false);
const [trackingNightly, setTrackingNightly] = useState(false);
const [scanningDriveId, setScanningDriveId] = useState("");
const [scanningDriveIds, setScanningDriveIds] = useState<Record<string, boolean>>({});
const scanningDriveIdsRef = useRef(new Set<string>());
const [stoppingDriveId, setStoppingDriveId] = useState("");
const [searchParams, setSearchParams] = useSearchParams();
const selectedDriveId = searchParams.get("drive") || null;
@@ -295,25 +311,47 @@ export function DrivesPage() {
}
async function handleRescan(d: api.AdminDrive) {
if (scanningDriveId) return;
setScanningDriveId(d.id);
if (nightlyBusy) {
show(nightlyBusyText(nightlyStatus) || NIGHTLY_BUSY_MESSAGE, "info");
return;
}
if (isDriveBusy(d) || scanningDriveIdsRef.current.has(d.id)) {
show(DRIVE_BUSY_MESSAGE, "info");
return;
}
scanningDriveIdsRef.current.add(d.id);
setScanningDriveIds((prev) => ({ ...prev, [d.id]: true }));
try {
await api.rescan(d.id);
const resp = await api.rescan(d.id);
if (!resp.accepted) {
if (resp.status) {
setNightlyStatus(resp.status);
}
show(resp.message || DRIVE_BUSY_MESSAGE, "info");
refreshDriveList();
return;
}
if (d.kind === "spider91") {
show("已触发抓取任务,需要 2-4 分钟,可稍后刷新视频列表查看", "success");
} else {
show("已触发扫描,可稍后刷新视频列表查看", "success");
}
refreshDriveList();
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
} finally {
setScanningDriveId("");
scanningDriveIdsRef.current.delete(d.id);
setScanningDriveIds((prev) => {
const next = { ...prev };
delete next[d.id];
return next;
});
}
}
async function handleRunNightly() {
if (nightlyBusy) {
show(nightlyBusyText(nightlyStatus) || "当前已有扫描所有网盘任务", "info");
show(nightlyBusyText(nightlyStatus) || NIGHTLY_BUSY_MESSAGE, "info");
return;
}
setScanningAll(true);
@@ -324,7 +362,7 @@ export function DrivesPage() {
setTrackingNightly(!resp.status.running);
show("已触发扫描所有网盘,耗时较长,可在任务状态和 backend 日志观察进度", "success");
} else {
show("当前已有扫描所有网盘任务", "info");
show(resp.message || NIGHTLY_BUSY_MESSAGE, "info");
}
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
@@ -515,17 +553,24 @@ export function DrivesPage() {
type="button"
className="admin-btn is-primary"
onClick={() => handleRescan(d)}
disabled={!!scanningDriveId}
aria-disabled={nightlyBusy || isDriveBusy(d) || !!scanningDriveIds[d.id]}
title={
nightlyBusy
? nightlyBusyText(nightlyStatus) || NIGHTLY_BUSY_MESSAGE
: isDriveBusy(d) || scanningDriveIds[d.id]
? DRIVE_BUSY_MESSAGE
: undefined
}
>
{d.kind === "spider91" ? (
<>
<Download size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即抓取"}
<Download size={13} className={scanningDriveIds[d.id] ? "admin-spin" : undefined} />
{scanningDriveIds[d.id] ? "触发中..." : "立即抓取"}
</>
) : (
<>
<RefreshCw size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即重扫"}
<RefreshCw size={13} className={scanningDriveIds[d.id] ? "admin-spin" : undefined} />
{scanningDriveIds[d.id] ? "触发中..." : "立即重扫"}
</>
)}
</button>
+5 -2
View File
@@ -95,6 +95,7 @@ export type AdminDrive = {
lastCrawlAt?: number;
// spider91 专用代理地址;仅后台管理接口返回,用于编辑表单回显。
spider91Proxy?: string;
scanGenerationStatus?: DriveGenerationStatus;
thumbnailGenerationStatus?: DriveGenerationStatus;
previewGenerationStatus?: DriveGenerationStatus;
fingerprintGenerationStatus?: DriveGenerationStatus;
@@ -115,6 +116,8 @@ export type DriveGenerationStatus = {
currentTitle?: string;
queueLength: number;
cooldownUntil?: string;
scannedCount: number;
addedCount: number;
};
export function listDrives() {
@@ -170,7 +173,7 @@ export function deleteDrive(id: string, body: DeleteDriveInput) {
}
export function rescan(id: string) {
return request<{ ok: boolean }>(
return request<{ ok: boolean; accepted: boolean; message?: string; status?: NightlyJobStatus }>(
`/drives/${encodeURIComponent(id)}/rescan`,
{ method: "POST" }
);
@@ -448,7 +451,7 @@ export function getNightlyJobStatus() {
}
export function runNightlyJob() {
return request<{ ok: boolean; accepted: boolean; status: NightlyJobStatus }>(
return request<{ ok: boolean; accepted: boolean; status: NightlyJobStatus; message?: string }>(
"/jobs/nightly/run",
{ method: "POST" }
);
+27 -9
View File
@@ -204,6 +204,11 @@ export function DriveGenerationPanel({
</header>
<div className="admin-gen-columns">
<DriveGenCol
label={d.kind === "spider91" ? "抓取" : "扫盘"}
status={d.scanGenerationStatus}
showCounts={false}
/>
<DriveGenCol
label="封面"
status={d.thumbnailGenerationStatus}
@@ -265,6 +270,7 @@ function DriveGenCol({
pending,
failed,
extra,
showCounts = true,
}: {
label: string;
status?: api.DriveGenerationStatus;
@@ -272,10 +278,14 @@ function DriveGenCol({
pending?: number;
failed?: number;
extra?: number;
showCounts?: boolean;
}) {
const state = status?.state || "idle";
const detail = generationDetail(status);
const title = generationTitle(status, detail);
const stateLabel = label === "抓取" && state === "scanning" ? "抓取中" : generationStateLabel(state);
const showScanProgress = !showCounts && (state === "scanning" || (status?.scannedCount ?? 0) > 0 || (status?.addedCount ?? 0) > 0);
const scannedLabel = label === "抓取" ? "已抓取" : "已扫描";
return (
<div className="admin-gen-col">
<div className="admin-gen-col__head">
@@ -284,18 +294,26 @@ function DriveGenCol({
className={`admin-status admin-generation-state is-${generationStateClass(state)}`}
title={title || undefined}
>
{generationStateLabel(state)}
{stateLabel}
</span>
</div>
{detail && <div className="admin-gen-col__detail">{detail}</div>}
<div className="admin-gen-col__counts">
<div className="admin-gen-col__count"><span></span><strong>{ready ?? 0}</strong></div>
<div className="admin-gen-col__count"><span></span><strong>{pending ?? 0}</strong></div>
<div className="admin-gen-col__count"><span></span><strong>{failed ?? 0}</strong></div>
{(extra ?? 0) > 0 && (
<div className="admin-gen-col__count"><span></span><strong>{extra}</strong></div>
)}
</div>
{showScanProgress && (
<div className="admin-gen-col__counts admin-gen-col__counts--scan">
<div className="admin-gen-col__count"><span>{scannedLabel}</span><strong>{status?.scannedCount ?? 0}</strong></div>
<div className="admin-gen-col__count"><span></span><strong>{status?.addedCount ?? 0}</strong></div>
</div>
)}
{showCounts && (
<div className="admin-gen-col__counts">
<div className="admin-gen-col__count"><span></span><strong>{ready ?? 0}</strong></div>
<div className="admin-gen-col__count"><span></span><strong>{pending ?? 0}</strong></div>
<div className="admin-gen-col__count"><span></span><strong>{failed ?? 0}</strong></div>
{(extra ?? 0) > 0 && (
<div className="admin-gen-col__count"><span></span><strong>{extra}</strong></div>
)}
</div>
)}
</div>
);
}
+4 -3
View File
@@ -56,12 +56,12 @@ export function nightlyButtonText(status: { running: boolean; queued: boolean },
}
export function nightlyBusyText(status: { running: boolean; queued: boolean }) {
if (status.running) return "扫描任务正在运行";
if (status.queued) return "扫描任务已排队";
if (status.running || status.queued) return "当前有全量扫描任务正在进行,请稍后重试";
return "";
}
export function generationStateLabel(state: string): string {
if (state === "scanning") return "扫盘中";
if (state === "generating") return "生成中";
if (state === "cooling") return "冷却中";
if (state === "queued") return "排队中";
@@ -69,7 +69,8 @@ export function generationStateLabel(state: string): string {
}
export function generationStateClass(state: string): string {
if (state === "generating" || state === "cooling" || state === "queued") {
if (state === "scanning" || state === "generating" || state === "cooling" || state === "queued") {
if (state === "scanning") return "generating";
return state;
}
return "idle";
+2 -2
View File
@@ -3107,11 +3107,11 @@
}
/* =========================================================
* Drive Generation 3-Column Layout
* Drive Generation
* ========================================================= */
.admin-gen-columns {
display: grid;
grid-template-columns: repeat(3, 1fr);
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: var(--space-3);
margin-bottom: var(--space-4);
}
+31
View File
@@ -186,6 +186,37 @@ test("drive management exposes stop task controls", () => {
assert.match(drivesPageSource, /停止所有网盘任务/);
});
test("drive rescan reports busy storage tasks instead of queueing duplicates", () => {
assert.match(apiSource, /accepted:\s*boolean;\s*message\?:\s*string/);
assert.match(apiSource, /scanGenerationStatus\?: DriveGenerationStatus/);
assert.match(drivesPageSource, /当前存储有正在进行的任务,请稍后重试/);
assert.match(drivesPageSource, /function isDriveBusy\(d: api\.AdminDrive\)/);
assert.match(drivesPageSource, /d\.scanGenerationStatus/);
assert.match(drivesPageSource, /status\?\.state \|\| "idle"/);
assert.match(drivesPageSource, /scanningDriveIdsRef\.current\.has\(d\.id\)/);
assert.match(drivesPageSource, /if \(!resp\.accepted\)/);
assert.doesNotMatch(drivesPageSource, /disabled=\{!!scanningDriveId\}/);
});
test("nightly scan duplicate trigger uses full-scan busy message", () => {
assert.match(apiSource, /status:\s*NightlyJobStatus;\s*message\?:\s*string/);
assert.match(drivesPageSource, /当前有全量扫描任务正在进行,请稍后重试/);
assert.match(drivesPageSource, /resp\.message \|\| NIGHTLY_BUSY_MESSAGE/);
assert.match(constantsSource, /当前有全量扫描任务正在进行,请稍后重试/);
});
test("drive generation panel shows scan or crawler status first", () => {
assert.match(driveComponentsSource, /label=\{d\.kind === "spider91" \? "抓取" : "扫盘"\}/);
assert.match(driveComponentsSource, /status=\{d\.scanGenerationStatus\}/);
assert.match(driveComponentsSource, /showCounts=\{false\}/);
assert.match(driveComponentsSource, /label === "抓取" && state === "scanning" \? "抓取中"/);
assert.match(driveComponentsSource, /status\?\.scannedCount/);
assert.match(driveComponentsSource, /预计新增/);
assert.match(apiSource, /scannedCount:\s*number/);
assert.match(apiSource, /addedCount:\s*number/);
assert.match(constantsSource, /if \(state === "scanning"\) return "扫盘中"/);
});
test("drive detail selection is stored in the URL history", () => {
assert.match(drivesPageSource, /useSearchParams/);
assert.match(drivesPageSource, /searchParams\.get\("drive"\)/);