feat: add drive task stop controls

Add per-drive and global admin controls to stop scan, preview, thumbnail, and fingerprint work.

Keep stopped pending generation resumable, wire cancellation through workers and nightly runs, and refine mobile drive-management UI/history behavior.
This commit is contained in:
nianzhibai
2026-06-03 23:42:54 +08:00
parent df6f0ebbbf
commit 5080203b7c
11 changed files with 1069 additions and 148 deletions
+410 -93
View File
@@ -176,11 +176,17 @@ func main() {
_, isSpider91 := app.spider91Crawlers[driveID]
app.mu.Unlock()
if isSpider91 {
go app.runSpider91Crawl(ctx, driveID)
app.scheduleSpider91Crawl(ctx, driveID)
return
}
app.scheduleScan(ctx, driveID)
},
OnStopDriveTasks: func(driveID string) bool {
return app.stopDriveTasks(ctx, driveID)
},
OnStopAllTasks: func() int {
return app.stopAllDriveTasks(ctx)
},
OnRegenPreview: func(videoID string) {
go app.regenPreview(ctx, videoID)
},
@@ -328,10 +334,17 @@ type App struct {
scanGlobalMu sync.Mutex
// scanQueueMu 保护 scanQueued。
scanQueueMu sync.Mutex
// scanQueued 跟踪哪些 driveID 已经排队或正在跑,去重后续重复点击。
// 一个 drive 在 scheduleScan 入队时被加入,在 runScan goroutine 结束时被移除。
// scanQueued 跟踪哪些 driveID 已经排队或正在跑扫盘/91 爬取,去重后续重复点击。
// 一个 drive 在 scheduleScan/scheduleSpider91Crawl 入队时被加入,后台 goroutine
// 结束时被移除。
scanQueued map[string]bool
// taskCancelMu 保护 driveTaskCancels。这里登记的是可被"停止任务"按钮中断
// 的 drive 级任务上下文:扫盘、91 爬取、指纹补队列、失败生成重试等。
taskCancelMu sync.Mutex
driveTaskCancelSeq uint64
driveTaskCancels map[string]map[uint64]context.CancelFunc
// fingerprintQueueing 去重每个 drive 的 pending 指纹补队列任务,避免定时
// reconcile 和扫盘结束同时为同一批 pending 视频启动多个长时间入队 goroutine。
fingerprintQueueMu sync.Mutex
@@ -499,29 +512,11 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
for id, worker := range thumbWorkers {
status := out[id]
status.Thumbnail = generationStatusFromPreview(worker.Status())
missing, err := a.cat.CountVideosNeedingThumbnail(context.Background(), id)
if err != nil {
log.Printf("[thumb] count thumbnail work %s: %v", id, err)
} else {
status.Thumbnail.QueueLength = missing
if missing > 0 && status.Thumbnail.State == "idle" {
status.Thumbnail.State = "queued"
}
}
out[id] = status
}
for id, worker := range fingerprintWorkers {
status := out[id]
status.Fingerprint = generationStatusFromFingerprint(worker.Status())
pending, err := a.cat.CountVideosNeedingFingerprint(context.Background(), id)
if err != nil {
log.Printf("[fingerprint] count pending fingerprints %s: %v", id, err)
} else {
status.Fingerprint.QueueLength = pending
if pending > 0 && status.Fingerprint.State == "idle" {
status.Fingerprint.State = "queued"
}
}
out[id] = status
}
return out
@@ -738,25 +733,7 @@ func (a *App) attachDriveUnlocked(ctx context.Context, d *catalog.Drive) error {
a.registry.Set(d.ID, drv)
// preview worker
gen := preview.New(preview.Config{
FFmpegPath: a.cfg.Preview.FFmpegPath,
FFprobePath: a.cfg.Preview.FFprobePath,
DurationSeconds: a.cfg.Preview.DurationSeconds,
Width: a.cfg.Preview.Width,
Segments: a.cfg.Preview.Segments,
LocalDir: a.cfg.Storage.LocalPreviewDir,
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprintConfigForDrive(drv))
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
go thumbWorker.Run(workerCtx)
go fingerprintWorker.Run(workerCtx)
a.registerPreviewWorkers(ctx, d.ID, worker, thumbWorker, fingerprintWorker, cancel)
a.startDriveGenerationWorkers(ctx, d.ID, drv, true)
// spider91 driver 还需要一个 crawler,挂在专用 map 里供 crawlerLoop 调用
if sd, ok := drv.(*spider91.Driver); ok {
@@ -773,25 +750,36 @@ func (a *App) attachLocalUpload(ctx context.Context) error {
}
a.registry.Set(drv.ID(), drv)
gen := preview.New(preview.Config{
FFmpegPath: a.cfg.Preview.FFmpegPath,
FFprobePath: a.cfg.Preview.FFprobePath,
DurationSeconds: a.cfg.Preview.DurationSeconds,
Width: a.cfg.Preview.Width,
Segments: a.cfg.Preview.Segments,
LocalDir: a.cfg.Storage.LocalPreviewDir,
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprintConfigForDrive(drv))
a.startDriveGenerationWorkers(ctx, drv.ID(), drv, true)
return nil
}
func (a *App) newDriveGenerationWorkers(drv drives.Drive) (*preview.Worker, *preview.ThumbWorker, *fingerprint.Worker) {
previewCfg := preview.Config{}
if a.cfg != nil {
previewCfg = preview.Config{
FFmpegPath: a.cfg.Preview.FFmpegPath,
FFprobePath: a.cfg.Preview.FFprobePath,
DurationSeconds: a.cfg.Preview.DurationSeconds,
Width: a.cfg.Preview.Width,
Segments: a.cfg.Preview.Segments,
LocalDir: a.cfg.Storage.LocalPreviewDir,
}
}
gen := preview.New(previewCfg)
return preview.NewWorker(gen, a.cat, drv),
preview.NewThumbWorker(gen, a.cat, drv),
fingerprint.NewWorker(a.cat, drv, fingerprintConfigForDrive(drv))
}
func (a *App) startDriveGenerationWorkers(ctx context.Context, driveID string, drv drives.Drive, enqueue bool) {
worker, thumbWorker, fingerprintWorker := a.newDriveGenerationWorkers(drv)
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
go thumbWorker.Run(workerCtx)
go fingerprintWorker.Run(workerCtx)
a.registerPreviewWorkers(ctx, drv.ID(), worker, thumbWorker, fingerprintWorker, cancel)
return nil
a.registerPreviewWorkersWithOptions(workerCtx, driveID, worker, thumbWorker, fingerprintWorker, cancel, enqueue)
}
func (a *App) localUploadDir() string {
@@ -898,6 +886,10 @@ func (a *App) attachSpider91Crawler(d *catalog.Drive, drv *spider91.Driver) {
}
func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker, fingerprintWorker *fingerprint.Worker, cancel context.CancelFunc) {
a.registerPreviewWorkersWithOptions(ctx, driveID, worker, thumbWorker, fingerprintWorker, cancel, true)
}
func (a *App) registerPreviewWorkersWithOptions(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker, fingerprintWorker *fingerprint.Worker, cancel context.CancelFunc, enqueue bool) {
a.mu.Lock()
if a.cancels == nil {
a.cancels = make(map[string]context.CancelFunc)
@@ -936,12 +928,238 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
}
a.mu.Unlock()
if !enqueue {
return
}
go a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
if fingerprintWorker != nil {
a.scheduleFingerprintBackfill(ctx, driveID, fingerprintWorker)
}
}
func (a *App) registerDriveTaskContext(ctx context.Context, driveID string) (context.Context, func()) {
if ctx == nil {
ctx = context.Background()
}
taskCtx, cancel := context.WithCancel(ctx)
a.taskCancelMu.Lock()
if a.driveTaskCancels == nil {
a.driveTaskCancels = make(map[string]map[uint64]context.CancelFunc)
}
a.driveTaskCancelSeq++
token := a.driveTaskCancelSeq
if a.driveTaskCancels[driveID] == nil {
a.driveTaskCancels[driveID] = make(map[uint64]context.CancelFunc)
}
a.driveTaskCancels[driveID][token] = cancel
a.taskCancelMu.Unlock()
done := func() {
cancel()
a.taskCancelMu.Lock()
if cancels := a.driveTaskCancels[driveID]; cancels != nil {
delete(cancels, token)
if len(cancels) == 0 {
delete(a.driveTaskCancels, driveID)
}
}
a.taskCancelMu.Unlock()
}
return taskCtx, done
}
func (a *App) cancelDriveTaskContexts(driveID string) int {
a.taskCancelMu.Lock()
cancelsByToken := a.driveTaskCancels[driveID]
delete(a.driveTaskCancels, driveID)
a.taskCancelMu.Unlock()
for _, cancel := range cancelsByToken {
if cancel != nil {
cancel()
}
}
return len(cancelsByToken)
}
func (a *App) cancelAllDriveTaskContexts() map[string]int {
a.taskCancelMu.Lock()
all := a.driveTaskCancels
a.driveTaskCancels = nil
a.taskCancelMu.Unlock()
out := make(map[string]int, len(all))
for driveID, cancelsByToken := range all {
out[driveID] = len(cancelsByToken)
for _, cancel := range cancelsByToken {
if cancel != nil {
cancel()
}
}
}
return out
}
func (a *App) clearQueuedDriveTask(driveID string) bool {
a.scanQueueMu.Lock()
queued := a.scanQueued[driveID]
delete(a.scanQueued, driveID)
a.scanQueueMu.Unlock()
return queued
}
func (a *App) clearAllQueuedDriveTasks() []string {
a.scanQueueMu.Lock()
ids := make([]string, 0, len(a.scanQueued))
for id := range a.scanQueued {
ids = append(ids, id)
}
a.scanQueued = nil
a.scanQueueMu.Unlock()
return ids
}
func (a *App) clearFingerprintQueueing(driveID string) bool {
a.fingerprintQueueMu.Lock()
queued := a.fingerprintQueueing[driveID]
delete(a.fingerprintQueueing, driveID)
a.fingerprintQueueMu.Unlock()
return queued
}
func (a *App) clearAllFingerprintQueueing() []string {
a.fingerprintQueueMu.Lock()
ids := make([]string, 0, len(a.fingerprintQueueing))
for id := range a.fingerprintQueueing {
ids = append(ids, id)
}
a.fingerprintQueueing = nil
a.fingerprintQueueMu.Unlock()
return ids
}
func (a *App) resetDriveGenerationWorkers(ctx context.Context, driveID string) bool {
var drv drives.Drive
var attached bool
if a.registry != nil {
drv, attached = a.registry.Get(driveID)
}
a.mu.Lock()
hadWorkers := a.workers[driveID] != nil ||
a.thumbWorkers[driveID] != nil ||
a.fingerprintWorkers[driveID] != nil ||
a.cancels[driveID] != nil
oldCancel := a.cancels[driveID]
a.mu.Unlock()
if attached && drv != nil {
a.startDriveGenerationWorkers(ctx, driveID, drv, false)
return hadWorkers
}
if oldCancel != nil {
oldCancel()
}
a.mu.Lock()
delete(a.workers, driveID)
delete(a.thumbWorkers, driveID)
delete(a.fingerprintWorkers, driveID)
delete(a.cancels, driveID)
a.mu.Unlock()
return hadWorkers
}
func (a *App) resetAllDriveGenerationWorkers(ctx context.Context) []string {
seen := make(map[string]struct{})
if a.registry != nil {
for _, drv := range a.registry.All() {
if drv == nil {
continue
}
driveID := drv.ID()
seen[driveID] = struct{}{}
a.startDriveGenerationWorkers(ctx, driveID, drv, false)
}
}
a.mu.Lock()
stale := make([]string, 0)
for id := range a.cancels {
if _, ok := seen[id]; !ok {
stale = append(stale, id)
}
}
for id := range a.workers {
if _, ok := seen[id]; !ok {
stale = append(stale, id)
}
}
for id := range a.thumbWorkers {
if _, ok := seen[id]; !ok {
stale = append(stale, id)
}
}
for id := range a.fingerprintWorkers {
if _, ok := seen[id]; !ok {
stale = append(stale, id)
}
}
a.mu.Unlock()
for _, id := range stale {
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
a.resetDriveGenerationWorkers(ctx, id)
}
ids := make([]string, 0, len(seen))
for id := range seen {
ids = append(ids, id)
}
return ids
}
func (a *App) stopDriveTasks(ctx context.Context, driveID string) bool {
driveID = strings.TrimSpace(driveID)
if driveID == "" {
return false
}
canceled := a.cancelDriveTaskContexts(driveID)
queued := a.clearQueuedDriveTask(driveID)
fingerprintQueued := a.clearFingerprintQueueing(driveID)
hadWorkers := a.resetDriveGenerationWorkers(ctx, driveID)
stopped := canceled > 0 || queued || fingerprintQueued || hadWorkers
log.Printf("[tasks] stop drive=%s stopped=%v canceled_tasks=%d queued=%v fingerprint_queue=%v workers=%v",
driveID, stopped, canceled, queued, fingerprintQueued, hadWorkers)
return stopped
}
func (a *App) stopAllDriveTasks(ctx context.Context) int {
stoppedIDs := make(map[string]struct{})
if a.nightlyRunner != nil && a.nightlyRunner.StopCurrent() {
log.Printf("[tasks] requested nightly pipeline stop")
}
for id := range a.cancelAllDriveTaskContexts() {
stoppedIDs[id] = struct{}{}
}
for _, id := range a.clearAllQueuedDriveTasks() {
stoppedIDs[id] = struct{}{}
}
for _, id := range a.clearAllFingerprintQueueing() {
stoppedIDs[id] = struct{}{}
}
for _, id := range a.resetAllDriveGenerationWorkers(ctx) {
stoppedIDs[id] = struct{}{}
}
log.Printf("[tasks] stop all drive tasks drives=%d", len(stoppedIDs))
return len(stoppedIDs)
}
func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Worker) {
pending, err := a.cat.ListVideosByPreviewStatus(ctx, driveID, "pending", 0)
if err != nil {
@@ -1020,12 +1238,14 @@ func (a *App) scheduleFingerprintBackfill(ctx context.Context, driveID string, w
if w == nil {
return
}
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
a.fingerprintQueueMu.Lock()
if a.fingerprintQueueing == nil {
a.fingerprintQueueing = make(map[string]bool)
}
if a.fingerprintQueueing[driveID] {
a.fingerprintQueueMu.Unlock()
done()
return
}
a.fingerprintQueueing[driveID] = true
@@ -1033,11 +1253,12 @@ func (a *App) scheduleFingerprintBackfill(ctx context.Context, driveID string, w
go func() {
defer func() {
done()
a.fingerprintQueueMu.Lock()
delete(a.fingerprintQueueing, driveID)
a.fingerprintQueueMu.Unlock()
}()
a.enqueueFingerprints(ctx, driveID, w)
a.enqueueFingerprints(taskCtx, driveID, w)
}()
}
@@ -1063,6 +1284,9 @@ func (a *App) enqueueFingerprints(ctx context.Context, driveID string, w *finger
}
func (a *App) detachDrive(id string) {
a.cancelDriveTaskContexts(id)
a.clearQueuedDriveTask(id)
a.clearFingerprintQueueing(id)
a.registry.Remove(id)
a.mu.Lock()
if cancel, ok := a.cancels[id]; ok {
@@ -1137,12 +1361,14 @@ func (a *App) listDriveDirChildren(ctx context.Context, driveID, parentID string
// 用于 admin UI「重扫」、「立即抓取」这类异步触发;nightly Phase 1 应继续直接
// 调 runScan(同步、按 for 循环顺序),不需要走 scheduleScan。
func (a *App) scheduleScan(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
a.scanQueueMu.Lock()
if a.scanQueued == nil {
a.scanQueued = make(map[string]bool)
}
if a.scanQueued[driveID] {
a.scanQueueMu.Unlock()
done()
log.Printf("[scan] drive=%s already queued or running, skip duplicate request", driveID)
return
}
@@ -1154,17 +1380,28 @@ func (a *App) scheduleScan(ctx context.Context, driveID string) {
a.scanQueueMu.Lock()
delete(a.scanQueued, driveID)
a.scanQueueMu.Unlock()
done()
}()
a.runScan(ctx, driveID)
a.runScanWithTaskContext(taskCtx, driveID)
}()
}
func (a *App) runScan(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
a.runScanWithTaskContext(taskCtx, driveID)
}
func (a *App) runScanWithTaskContext(ctx context.Context, driveID string) {
// 全局串行:同一时刻只有一个扫盘任务在跑(admin 重扫 + nightly Phase 1 共用)。
// 等待这把锁的 goroutine 在排队,按到达顺序逐个执行。
a.scanGlobalMu.Lock()
defer a.scanGlobalMu.Unlock()
if err := ctx.Err(); err != nil {
log.Printf("[scan] drive=%s canceled before start: %v", driveID, err)
return
}
if err := a.ensureDriveAttached(ctx, driveID); err != nil {
log.Printf("[scan] drive %s attach failed: %v", driveID, err)
return
@@ -1638,11 +1875,13 @@ func (a *App) regenPreview(ctx context.Context, videoID string) {
if err != nil {
return
}
taskCtx, done := a.registerDriveTaskContext(ctx, v.DriveID)
defer done()
a.mu.Lock()
worker := a.workers[v.DriveID]
a.mu.Unlock()
if worker != nil {
worker.EnqueueBlocking(ctx, v)
worker.EnqueueBlocking(taskCtx, v)
}
}
@@ -1675,7 +1914,9 @@ func (a *App) regenAllPreviews(ctx context.Context) {
}
func (a *App) regenFailedPreviews(ctx context.Context, driveID string) {
items, err := a.cat.ListVideosByPreviewStatus(ctx, driveID, "failed", 0)
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
failed, err := a.cat.ListVideosByPreviewStatus(taskCtx, driveID, "failed", 0)
if err != nil {
log.Printf("[preview] list failed videos for regen drive=%s: %v", driveID, err)
return
@@ -1687,27 +1928,37 @@ func (a *App) regenFailedPreviews(ctx context.Context, driveID string) {
log.Printf("[preview] regen failed drive=%s skipped: worker not found", driveID)
return
}
log.Printf("[preview] enqueue failed videos for regen drive=%s count=%d", driveID, len(items))
queued := 0
for _, v := range items {
if err := ctx.Err(); err != nil {
log.Printf("[preview] enqueue failed canceled drive=%s queued=%d: %v", driveID, queued, err)
reset := 0
for _, v := range failed {
if err := taskCtx.Err(); err != nil {
log.Printf("[preview] reset failed canceled drive=%s reset=%d: %v", driveID, reset, err)
return
}
if err := a.cat.UpdatePreview(ctx, v.ID, "", "pending"); err != nil {
if err := a.cat.UpdatePreview(taskCtx, v.ID, "", "pending"); err != nil {
log.Printf("[preview] reset failed video %s drive=%s: %v", v.ID, driveID, err)
continue
}
v.PreviewFileID = ""
v.PreviewLocal = ""
v.PreviewStatus = "pending"
if !worker.EnqueueBlocking(ctx, v) {
log.Printf("[preview] enqueue failed canceled drive=%s queued=%d", driveID, queued)
reset++
}
items, err := a.cat.ListVideosByPreviewStatus(taskCtx, driveID, "pending", 0)
if err != nil {
log.Printf("[preview] list pending videos for regen drive=%s: %v", driveID, err)
return
}
log.Printf("[preview] enqueue pending videos for regen drive=%s count=%d reset_failed=%d", driveID, len(items), reset)
queued := 0
for _, v := range items {
if err := taskCtx.Err(); err != nil {
log.Printf("[preview] enqueue pending canceled drive=%s queued=%d: %v", driveID, queued, err)
return
}
if !worker.EnqueueBlocking(taskCtx, v) {
log.Printf("[preview] enqueue pending canceled drive=%s queued=%d", driveID, queued)
return
}
queued++
}
log.Printf("[preview] enqueued failed videos for regen drive=%s queued=%d", driveID, queued)
log.Printf("[preview] enqueued pending videos for regen drive=%s queued=%d reset_failed=%d", driveID, queued, reset)
}
// regenFailedThumbnails 把某 drive 下 thumbnail_status=failed 的视频全部重置为
@@ -1717,7 +1968,9 @@ func (a *App) regenFailedPreviews(ctx context.Context, driveID string) {
// 操作不会触发已生成失败的视频重新去网盘取流 —— 只是把 catalog 的状态翻到 pending
// 并入队;真正的取链 / ffmpeg 在 thumb worker 里执行。
func (a *App) regenFailedThumbnails(ctx context.Context, driveID string) {
items, err := a.cat.ListVideosByThumbnailStatus(ctx, driveID, "failed", 0)
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
failed, err := a.cat.ListVideosByThumbnailStatus(taskCtx, driveID, "failed", 0)
if err != nil {
log.Printf("[thumb] list failed videos for regen drive=%s: %v", driveID, err)
return
@@ -1729,17 +1982,16 @@ func (a *App) regenFailedThumbnails(ctx context.Context, driveID string) {
log.Printf("[thumb] regen failed drive=%s skipped: thumb worker not found", driveID)
return
}
log.Printf("[thumb] enqueue failed thumbnails for regen drive=%s count=%d", driveID, len(items))
queued := 0
for _, v := range items {
if err := ctx.Err(); err != nil {
log.Printf("[thumb] enqueue failed canceled drive=%s queued=%d: %v", driveID, queued, err)
reset := 0
for _, v := range failed {
if err := taskCtx.Err(); err != nil {
log.Printf("[thumb] reset failed canceled drive=%s reset=%d: %v", driveID, reset, err)
return
}
// 状态翻 pending;保留 thumbnail_url 字段(thumb worker 先看 url 是否已写
// 来判断是否真的要再生)。但既然之前是 failed 说明 url 没写过,所以这里
// 把 url 一并清空更稳。
if err := a.cat.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{
if err := a.cat.UpdateVideoMeta(taskCtx, v.ID, catalog.VideoMetaPatch{
ThumbnailURL: "",
ThumbnailStatus: "pending",
ResetThumbnailFailures: true,
@@ -1747,18 +1999,33 @@ func (a *App) regenFailedThumbnails(ctx context.Context, driveID string) {
log.Printf("[thumb] reset failed video %s drive=%s: %v", v.ID, driveID, err)
continue
}
v.ThumbnailURL = ""
if !thumbWorker.EnqueueBlocking(ctx, v) {
log.Printf("[thumb] enqueue failed canceled drive=%s queued=%d", driveID, queued)
reset++
}
items, err := a.cat.ListVideosNeedingThumbnail(taskCtx, driveID, 0)
if err != nil {
log.Printf("[thumb] list pending thumbnails for regen drive=%s: %v", driveID, err)
return
}
log.Printf("[thumb] enqueue pending thumbnails for regen drive=%s count=%d reset_failed=%d", driveID, len(items), reset)
queued := 0
for _, v := range items {
if err := taskCtx.Err(); err != nil {
log.Printf("[thumb] enqueue pending canceled drive=%s queued=%d: %v", driveID, queued, err)
return
}
if !thumbWorker.EnqueueBlocking(taskCtx, v) {
log.Printf("[thumb] enqueue pending canceled drive=%s queued=%d", driveID, queued)
return
}
queued++
}
log.Printf("[thumb] enqueued failed thumbnails for regen drive=%s queued=%d", driveID, queued)
log.Printf("[thumb] enqueued pending thumbnails for regen drive=%s queued=%d reset_failed=%d", driveID, queued, reset)
}
func (a *App) regenFailedFingerprints(ctx context.Context, driveID string) {
items, err := a.cat.ListVideosByFingerprintStatus(ctx, driveID, "failed", 0)
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
failed, err := a.cat.ListVideosByFingerprintStatus(taskCtx, driveID, "failed", 0)
if err != nil {
log.Printf("[fingerprint] list failed videos for regen drive=%s: %v", driveID, err)
return
@@ -1770,27 +2037,37 @@ func (a *App) regenFailedFingerprints(ctx context.Context, driveID string) {
log.Printf("[fingerprint] regen failed drive=%s skipped: fingerprint worker not found", driveID)
return
}
log.Printf("[fingerprint] enqueue failed videos for regen drive=%s count=%d", driveID, len(items))
queued := 0
for _, v := range items {
if err := ctx.Err(); err != nil {
log.Printf("[fingerprint] enqueue failed canceled drive=%s queued=%d: %v", driveID, queued, err)
reset := 0
for _, v := range failed {
if err := taskCtx.Err(); err != nil {
log.Printf("[fingerprint] reset failed canceled drive=%s reset=%d: %v", driveID, reset, err)
return
}
if err := a.cat.UpdateVideoFingerprint(ctx, v.ID, "", "pending", ""); err != nil {
if err := a.cat.UpdateVideoFingerprint(taskCtx, v.ID, "", "pending", ""); err != nil {
log.Printf("[fingerprint] reset failed video %s drive=%s: %v", v.ID, driveID, err)
continue
}
v.SampledSHA256 = ""
v.FingerprintStatus = "pending"
v.FingerprintError = ""
if !fingerprintWorker.EnqueueBlocking(ctx, v) {
log.Printf("[fingerprint] enqueue failed canceled drive=%s queued=%d", driveID, queued)
reset++
}
items, err := a.cat.ListVideosNeedingFingerprint(taskCtx, driveID, 0)
if err != nil {
log.Printf("[fingerprint] list pending videos for regen drive=%s: %v", driveID, err)
return
}
log.Printf("[fingerprint] enqueue pending videos for regen drive=%s count=%d reset_failed=%d", driveID, len(items), reset)
queued := 0
for _, v := range items {
if err := taskCtx.Err(); err != nil {
log.Printf("[fingerprint] enqueue pending canceled drive=%s queued=%d: %v", driveID, queued, err)
return
}
if !fingerprintWorker.EnqueueBlocking(taskCtx, v) {
log.Printf("[fingerprint] enqueue pending canceled drive=%s queued=%d", driveID, queued)
return
}
queued++
}
log.Printf("[fingerprint] enqueued failed videos for regen drive=%s queued=%d", driveID, queued)
log.Printf("[fingerprint] enqueued pending videos for regen drive=%s queued=%d reset_failed=%d", driveID, queued, reset)
}
// listScanTargetIDs 返回 nightly Phase 1 应扫描的所有 drive ID
@@ -1872,12 +2149,48 @@ func shouldScanDrive(d drives.Drive) bool {
// ---------- spider91 crawl ----------
func (a *App) scheduleSpider91Crawl(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
a.scanQueueMu.Lock()
if a.scanQueued == nil {
a.scanQueued = make(map[string]bool)
}
if a.scanQueued[driveID] {
a.scanQueueMu.Unlock()
done()
log.Printf("[spider91] drive=%s already queued or running, skip duplicate crawl request", driveID)
return
}
a.scanQueued[driveID] = true
a.scanQueueMu.Unlock()
go func() {
defer func() {
a.scanQueueMu.Lock()
delete(a.scanQueued, driveID)
a.scanQueueMu.Unlock()
done()
}()
a.runSpider91CrawlWithTaskContext(taskCtx, driveID)
}()
}
// runSpider91Crawl 运行一次完整爬取流程并把 last_crawl_at 写回 drive.credentials。
//
// 即使爬取失败也会更新 last_crawl_at,避免一直在错误循环里反复触发;下一次 nightly
// 流水线重跑时仍会重试。该方法是阻塞的,被 nightly Phase 2 串行调用,以及被
// admin "立即抓取" 单 drive 异步调用。
func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
defer done()
a.runSpider91CrawlWithTaskContext(taskCtx, driveID)
}
func (a *App) runSpider91CrawlWithTaskContext(ctx context.Context, driveID string) {
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s crawl canceled before start: %v", driveID, err)
return
}
a.mu.Lock()
c := a.spider91Crawlers[driveID]
a.mu.Unlock()
@@ -1930,6 +2243,10 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
if err := a.cat.UpsertDrive(ctx, d); err != nil {
log.Printf("[spider91] drive=%s update last_crawl_at: %v", driveID, err)
}
if err := ctx.Err(); err != nil {
log.Printf("[spider91] drive=%s crawl canceled after run: %v", driveID, err)
return
}
// 爬取全部完成后,统一把所有还 pending 的预览视频入队。
// 这是新流水线设计:crawler 自身不再每条入库就立即触发预览视频生成,
+164
View File
@@ -225,6 +225,170 @@ func TestRegisterPreviewWorkersBackfillsHistoricalFingerprints(t *testing.T) {
t.Fatalf("fingerprint status=%q sampled=%q, want ready with hash", got.FingerprintStatus, got.SampledSHA256)
}
func TestStopDriveTasksCancelsQueuedTasksAndReplacesWorkers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
drv := &serverFakeDrive{}
registry := proxy.NewRegistry()
registry.Set("drive-id", drv)
gen := &serverFakeTeaserGenerator{}
oldWorker := preview.NewWorker(gen, cat, drv)
oldThumbWorker := preview.NewThumbWorker(gen, cat, drv)
oldFingerprintWorker := fingerprint.NewWorker(cat, drv, fingerprint.Config{})
oldCanceled := make(chan struct{})
app := &App{
cfg: &config.Config{},
cat: cat,
registry: registry,
workers: map[string]*preview.Worker{"drive-id": oldWorker},
thumbWorkers: map[string]*preview.ThumbWorker{"drive-id": oldThumbWorker},
fingerprintWorkers: map[string]*fingerprint.Worker{"drive-id": oldFingerprintWorker},
cancels: map[string]context.CancelFunc{
"drive-id": func() { close(oldCanceled) },
},
scanQueued: map[string]bool{"drive-id": true},
fingerprintQueueing: map[string]bool{"drive-id": true},
}
taskCtx, done := app.registerDriveTaskContext(ctx, "drive-id")
defer done()
if !app.stopDriveTasks(ctx, "drive-id") {
t.Fatal("stopDriveTasks returned false, want true")
}
select {
case <-oldCanceled:
case <-time.After(time.Second):
t.Fatal("old worker cancel was not called")
}
if err := taskCtx.Err(); err == nil {
t.Fatal("registered drive task context was not canceled")
}
if app.scanQueued["drive-id"] {
t.Fatal("scan queue marker was not cleared")
}
if app.fingerprintQueueing["drive-id"] {
t.Fatal("fingerprint queue marker was not cleared")
}
app.mu.Lock()
newWorker := app.workers["drive-id"]
newThumbWorker := app.thumbWorkers["drive-id"]
newFingerprintWorker := app.fingerprintWorkers["drive-id"]
newCancel := app.cancels["drive-id"]
app.mu.Unlock()
if newWorker == nil || newWorker == oldWorker {
t.Fatalf("preview worker was not replaced")
}
if newThumbWorker == nil || newThumbWorker == oldThumbWorker {
t.Fatalf("thumb worker was not replaced")
}
if newFingerprintWorker == nil || newFingerprintWorker == oldFingerprintWorker {
t.Fatalf("fingerprint worker was not replaced")
}
if newCancel == nil {
t.Fatalf("replacement worker cancel was not registered")
}
newCancel()
}
func TestDriveGenerationStatusUsesWorkerQueueNotPendingCatalogRows(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now()
if err := cat.UpsertVideo(ctx, &catalog.Video{
ID: "pending-thumb",
DriveID: "drive-id",
FileID: "file-id",
Title: "Pending Thumb",
PreviewStatus: "ready",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
}); err != nil {
t.Fatalf("seed video: %v", err)
}
if err := cat.UpdateVideoMeta(ctx, "pending-thumb", catalog.VideoMetaPatch{ThumbnailStatus: "pending"}); err != nil {
t.Fatalf("mark thumbnail pending: %v", err)
}
thumbWorker := preview.NewThumbWorker(&serverFakeTeaserGenerator{}, cat, &serverFakeDrive{})
app := &App{
cat: cat,
workers: map[string]*preview.Worker{},
thumbWorkers: map[string]*preview.ThumbWorker{"drive-id": thumbWorker},
fingerprintWorkers: map[string]*fingerprint.Worker{},
}
status := app.driveGenerationStatuses()["drive-id"].Thumbnail
if status.State != "idle" || status.QueueLength != 0 {
t.Fatalf("thumbnail status = %#v, want idle with empty worker queue", status)
}
}
func TestRegenFailedThumbnailsQueuesPendingRowsAfterStop(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now()
if err := cat.UpsertVideo(ctx, &catalog.Video{
ID: "pending-thumb",
DriveID: "drive-id",
FileID: "file-id",
Title: "Pending Thumb",
PreviewStatus: "ready",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
}); err != nil {
t.Fatalf("seed video: %v", err)
}
if err := cat.UpdateVideoMeta(ctx, "pending-thumb", catalog.VideoMetaPatch{ThumbnailStatus: "pending"}); err != nil {
t.Fatalf("mark thumbnail pending: %v", err)
}
thumbWorker := preview.NewThumbWorker(&serverFakeTeaserGenerator{}, cat, &serverFakeDrive{})
app := &App{
cat: cat,
thumbWorkers: map[string]*preview.ThumbWorker{"drive-id": thumbWorker},
}
app.regenFailedThumbnails(ctx, "drive-id")
if got := thumbWorker.Status().QueueLength; got != 1 {
t.Fatalf("thumb queue length = %d, want pending row re-enqueued", got)
}
}
func TestRunScanStartsFingerprintBeforeThumbnailAndPreviewDrain(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+28
View File
@@ -46,6 +46,8 @@ type AdminServer struct {
OnDriveDeleteCleanup func(ctx context.Context, driveID string) (int, error)
OnDriveRemoved func(driveID string)
OnScanRequested func(driveID string)
OnStopDriveTasks func(driveID string) bool
OnStopAllTasks func() int
OnRegenPreview func(videoID string)
OnRegenAllPreviews func()
OnRegenFailedPreviews func(driveID string)
@@ -126,6 +128,7 @@ func (a *AdminServer) Register(r chi.Router) {
r.Get("/drives/p123/qr/{uniID}", a.handleP123QRStatus)
r.Delete("/drives/{id}", a.handleDeleteDrive)
r.Post("/drives/{id}/rescan", a.handleRescan)
r.Post("/drives/{id}/tasks/stop", a.handleStopDriveTasks)
r.Post("/drives/{id}/teaser-enabled", a.handleSetDriveTeaserEnabled)
r.Post("/drives/{id}/skip-dirs", a.handleSetDriveSkipDirs)
r.Get("/drives/{id}/dirtree", a.handleListDriveDirTree)
@@ -152,6 +155,7 @@ func (a *AdminServer) Register(r chi.Router) {
r.Get("/update/check", a.handleCheckUpdate)
r.Get("/jobs/nightly/status", a.handleNightlyJobStatus)
r.Post("/jobs/nightly/run", a.handleRunNightlyJob)
r.Post("/tasks/stop", a.handleStopAllTasks)
})
})
}
@@ -670,6 +674,18 @@ func (a *AdminServer) handleRescan(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusAccepted, map[string]any{"ok": true})
}
func (a *AdminServer) handleStopDriveTasks(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
stopped := false
if a.OnStopDriveTasks != nil {
stopped = a.OnStopDriveTasks(id)
}
writeJSON(w, http.StatusAccepted, map[string]any{
"ok": true,
"stopped": stopped,
})
}
func (a *AdminServer) p123QRClient() *p123.QRClient {
return p123.NewQRClient(p123.QRConfig{
UserAPIBaseURL: a.P123UserAPIBaseURL,
@@ -722,6 +738,18 @@ func (a *AdminServer) handleNightlyJobStatus(w http.ResponseWriter, r *http.Requ
writeJSON(w, http.StatusOK, a.nightlyJobStatus())
}
func (a *AdminServer) handleStopAllTasks(w http.ResponseWriter, r *http.Request) {
stoppedDrives := 0
if a.OnStopAllTasks != nil {
stoppedDrives = a.OnStopAllTasks()
}
writeJSON(w, http.StatusAccepted, map[string]any{
"ok": true,
"stoppedDrives": stoppedDrives,
"status": a.nightlyJobStatus(),
})
}
func (a *AdminServer) nightlyJobStatus() NightlyJobStatus {
if a.GetNightlyJobStatus == nil {
return NightlyJobStatus{State: "idle"}
+69
View File
@@ -296,6 +296,75 @@ func TestHandleNightlyJobStatusDefaultsToIdle(t *testing.T) {
}
}
func TestHandleStopDriveTasksInvokesHookWithDriveID(t *testing.T) {
calledWith := ""
server := &AdminServer{
OnStopDriveTasks: func(driveID string) bool {
calledWith = driveID
return true
},
}
req := httptest.NewRequest(http.MethodPost, "/admin/api/drives/PikPak/tasks/stop", nil)
rctx := chi.NewRouteContext()
rctx.URLParams.Add("id", "PikPak")
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
rr := httptest.NewRecorder()
server.handleStopDriveTasks(rr, req)
if rr.Code != http.StatusAccepted {
t.Fatalf("status = %d, body = %s", rr.Code, rr.Body.String())
}
if calledWith != "PikPak" {
t.Fatalf("hook called with %q, want PikPak", calledWith)
}
var got struct {
OK bool `json:"ok"`
Stopped bool `json:"stopped"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
if !got.OK || !got.Stopped {
t.Fatalf("response = %#v, want stopped", got)
}
}
func TestHandleStopAllTasksInvokesHookAndReturnsStatus(t *testing.T) {
called := false
server := &AdminServer{
OnStopAllTasks: func() int {
called = true
return 2
},
GetNightlyJobStatus: func() NightlyJobStatus {
return NightlyJobStatus{State: "running", Running: true}
},
}
req := httptest.NewRequest(http.MethodPost, "/admin/api/tasks/stop", nil)
rr := httptest.NewRecorder()
server.handleStopAllTasks(rr, req)
if rr.Code != http.StatusAccepted {
t.Fatalf("status = %d, body = %s", rr.Code, rr.Body.String())
}
if !called {
t.Fatal("OnStopAllTasks was not called")
}
var got struct {
OK bool `json:"ok"`
StoppedDrives int `json:"stoppedDrives"`
Status NightlyJobStatus `json:"status"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
if !got.OK || got.StoppedDrives != 2 || got.Status.State != "running" || !got.Status.Running {
t.Fatalf("response = %#v, want stopped drives and status", got)
}
}
func TestHandleUpsertDrivePreservesExistingCredentialsWhenRequestCredentialsEmpty(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
+39 -3
View File
@@ -115,6 +115,7 @@ type Runner struct {
queued bool
startedAt time.Time
lastFinishedAt time.Time
currentCancel context.CancelFunc
}
// New constructs a Runner. cfg is shallow-copied; defaults are applied.
@@ -175,6 +176,28 @@ func (r *Runner) TriggerNow() bool {
}
}
// StopCurrent cancels the currently running pipeline and drops one queued
// manual trigger, if present. It returns true when there was something to stop.
func (r *Runner) StopCurrent() bool {
r.stateMu.Lock()
wasRunning := r.running
wasQueued := r.queued
cancel := r.currentCancel
r.queued = false
r.stateMu.Unlock()
if wasQueued {
select {
case <-r.trigger:
default:
}
}
if cancel != nil {
cancel()
}
return wasRunning || wasQueued || cancel != nil
}
func (r *Runner) Status() Status {
r.stateMu.Lock()
running := r.running
@@ -232,14 +255,25 @@ func shouldRun(now time.Time, lastRunDate string) bool {
//
// 流水线没有总耗时上限:一直跑到 ctx 取消(进程退出)或所有 phase 完成。
func (r *Runner) runPipelineLocked(ctx context.Context, manual bool) {
if manual {
r.stateMu.Lock()
queued := r.queued
r.stateMu.Unlock()
if !queued {
log.Printf("[nightly] manual trigger was canceled before start")
return
}
}
if !r.runMu.TryLock() {
log.Printf("[nightly] another pipeline is already running, skipping this trigger")
return
}
started := r.cfg.Now()
r.markStarted(started)
runCtx, cancel := context.WithCancel(ctx)
r.markStarted(started, cancel)
defer func() {
cancel()
r.markFinished(r.cfg.Now())
r.runMu.Unlock()
}()
@@ -250,7 +284,7 @@ func (r *Runner) runPipelineLocked(ctx context.Context, manual bool) {
}
log.Printf("[nightly] pipeline (%s) start", mode)
r.runPipeline(ctx)
r.runPipeline(runCtx)
finished := r.cfg.Now()
log.Printf("[nightly] pipeline (%s) finish; took=%s", mode, finished.Sub(started).Round(time.Second))
@@ -264,12 +298,13 @@ func (r *Runner) runPipelineLocked(ctx context.Context, manual bool) {
}
}
func (r *Runner) markStarted(started time.Time) {
func (r *Runner) markStarted(started time.Time, cancel context.CancelFunc) {
r.stateMu.Lock()
defer r.stateMu.Unlock()
r.running = true
r.queued = false
r.startedAt = started
r.currentCancel = cancel
}
func (r *Runner) markFinished(finished time.Time) {
@@ -278,6 +313,7 @@ func (r *Runner) markFinished(finished time.Time) {
r.running = false
r.startedAt = time.Time{}
r.lastFinishedAt = finished
r.currentCancel = nil
}
// runPipeline executes the three phases. It returns when the pipeline finishes
+55
View File
@@ -395,6 +395,61 @@ func TestStatusTracksQueuedRunningAndFinished(t *testing.T) {
}
}
func TestStopCurrentCancelsRunningPipeline(t *testing.T) {
scanStarted := make(chan struct{})
scanCanceled := make(chan struct{})
var startedOnce sync.Once
r := New(Config{
Settings: newStubSettings(),
ListScanTargets: func(context.Context) []string {
return []string{"drive"}
},
RunScan: func(ctx context.Context, _ string) {
startedOnce.Do(func() { close(scanStarted) })
<-ctx.Done()
close(scanCanceled)
},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
if !r.TriggerNow() {
t.Fatal("TriggerNow should queue a manual run")
}
select {
case <-scanStarted:
case <-time.After(time.Second):
t.Fatal("pipeline did not start")
}
if !r.StopCurrent() {
t.Fatal("StopCurrent should report a running pipeline")
}
select {
case <-scanCanceled:
case <-time.After(time.Second):
t.Fatal("StopCurrent did not cancel pipeline context")
}
}
func TestStopCurrentDropsQueuedTrigger(t *testing.T) {
r := New(Config{Settings: newStubSettings()})
if !r.TriggerNow() {
t.Fatal("TriggerNow should queue a manual run")
}
if !r.StopCurrent() {
t.Fatal("StopCurrent should report a queued pipeline")
}
if got := r.Status(); got.State != "idle" || got.Running || got.Queued {
t.Fatalf("status = %#v, want idle after dropping queued trigger", got)
}
if !r.TriggerNow() {
t.Fatal("TriggerNow should accept a new request after queued stop")
}
}
func TestTriggerNowAcceptsOnlyOneConcurrentRequest(t *testing.T) {
r := New(Config{Settings: newStubSettings()})
+120 -36
View File
@@ -1,7 +1,9 @@
import { useEffect, useMemo, useRef, useState } from "react";
import { useSearchParams } from "react-router-dom";
import {
ArrowLeft,
ChevronRight,
CircleStop,
Download,
FolderTree,
HardDrive,
@@ -57,9 +59,12 @@ export function DrivesPage() {
const [regenFailedFingerprintId, setRegenFailedFingerprintId] = useState("");
const [togglingTeaserId, setTogglingTeaserId] = useState("");
const [scanningAll, setScanningAll] = useState(false);
const [stoppingAll, setStoppingAll] = useState(false);
const [trackingNightly, setTrackingNightly] = useState(false);
const [scanningDriveId, setScanningDriveId] = useState("");
const [selectedDriveId, setSelectedDriveId] = useState<string | null>(null);
const [stoppingDriveId, setStoppingDriveId] = useState("");
const [searchParams, setSearchParams] = useSearchParams();
const selectedDriveId = searchParams.get("drive") || null;
const { show } = useToast();
const pollConnectionLost = useRef(false);
const nightlyBusy = scanningAll || nightlyStatus.running || nightlyStatus.queued;
@@ -72,6 +77,22 @@ export function DrivesPage() {
[list]
);
function openDriveDetail(id: string) {
setSearchParams((prev) => {
const next = new URLSearchParams(prev);
next.set("drive", id);
return next;
});
}
function closeDriveDetail(options?: { replace?: boolean }) {
setSearchParams((prev) => {
const next = new URLSearchParams(prev);
next.delete("drive");
return next;
}, options);
}
async function refresh() {
setLoading(true);
setLoadError("");
@@ -254,7 +275,7 @@ export function DrivesPage() {
show(`已删除,并清理 ${resp.deletedVideos ?? 0} 个视频`, "success");
setDeleteTarget(null);
if (selectedDriveId === d.id) {
setSelectedDriveId(null);
closeDriveDetail({ replace: true });
}
refresh();
} catch (e) {
@@ -303,11 +324,51 @@ export function DrivesPage() {
}
}
async function handleStopAllTasks() {
if (stoppingAll) return;
setStoppingAll(true);
try {
const resp = await api.stopAllTasks();
setNightlyStatus(resp.status);
setTrackingNightly(false);
show(
resp.stoppedDrives > 0
? `已停止 ${resp.stoppedDrives} 个网盘的当前任务`
: "没有正在运行的网盘任务",
"success"
);
refreshDriveList();
} catch (e) {
show(e instanceof Error ? e.message : "停止失败", "error");
} finally {
setStoppingAll(false);
}
}
async function handleStopDriveTasks(d: api.AdminDrive) {
if (stoppingDriveId) return;
setStoppingDriveId(d.id);
try {
const resp = await api.stopDriveTasks(d.id);
show(
resp.stopped
? `已停止「${d.name || d.id}」的当前任务`
: `${d.name || d.id}」没有正在运行的任务`,
"success"
);
refreshDriveList();
} catch (e) {
show(e instanceof Error ? e.message : "停止失败", "error");
} finally {
setStoppingDriveId("");
}
}
async function handleRegenFailed(d: api.AdminDrive) {
setRegenFailedId(d.id);
try {
await api.regenFailedPreviews(d.id);
show("已触发失败预览视频重新生成", "success");
show("已触发预览视频生成", "success");
refresh();
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
@@ -320,7 +381,7 @@ export function DrivesPage() {
setRegenFailedThumbId(d.id);
try {
await api.regenFailedThumbnails(d.id);
show("已触发失败封面重新生成", "success");
show("已触发封面生成", "success");
refresh();
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
@@ -333,7 +394,7 @@ export function DrivesPage() {
setRegenFailedFingerprintId(d.id);
try {
await api.regenFailedFingerprints(d.id);
show("已触发失败指纹重新生成", "success");
show("已触发指纹生成", "success");
refresh();
} catch (e) {
show(e instanceof Error ? e.message : "触发失败", "error");
@@ -391,7 +452,7 @@ export function DrivesPage() {
<button
type="button"
className="admin-drive-detail__back-btn"
onClick={() => setSelectedDriveId(null)}
onClick={() => closeDriveDetail({ replace: true })}
title="返回网盘列表"
>
<ArrowLeft size={16} />
@@ -449,28 +510,40 @@ export function DrivesPage() {
</div>
<div className="admin-detail-actions">
<button
type="button"
className="admin-btn is-primary"
onClick={() => handleRescan(d)}
disabled={!!scanningDriveId}
>
{d.kind === "spider91" ? (
<>
<Download size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即抓取"}
</>
) : (
<>
<RefreshCw size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即重扫"}
</>
)}
</button>
<div className="admin-task-controls" aria-label="当前网盘任务控制">
<button
type="button"
className="admin-btn is-primary"
onClick={() => handleRescan(d)}
disabled={!!scanningDriveId}
>
{d.kind === "spider91" ? (
<>
<Download size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即抓取"}
</>
) : (
<>
<RefreshCw size={13} className={scanningDriveId === d.id ? "admin-spin" : undefined} />
{scanningDriveId === d.id ? "触发中..." : "立即重扫"}
</>
)}
</button>
<button
type="button"
className="admin-btn is-stop"
onClick={() => handleStopDriveTasks(d)}
disabled={!!stoppingDriveId}
title="停止此网盘当前的扫描、封面、预览视频和视频指纹生成任务。"
>
<CircleStop size={13} />
{stoppingDriveId === d.id ? "停止中..." : "停止所有任务"}
</button>
</div>
<button type="button" className="admin-btn" onClick={() => openEdit(d)}>
{d.kind === "spider91" ? "编辑配置" : "编辑配置凭证"}
</button>
<button type="button" className="admin-btn is-danger" onClick={() => setDeleteTarget(d)} style={{ marginLeft: "auto" }}>
<button type="button" className="admin-btn is-danger admin-detail-actions__danger" onClick={() => setDeleteTarget(d)}>
<Trash2 size={13} />
</button>
</div>
@@ -589,16 +662,27 @@ export function DrivesPage() {
<section>
<header className="admin-page__header">
<h1 className="admin-page__title"></h1>
<div style={{ display: "flex", gap: "8px" }}>
<button
type="button"
className="admin-btn"
onClick={handleRunNightly}
disabled={scanningAll}
title={nightlyBusyText(nightlyStatus) || "立即扫描所有网盘。耗时较长,期间不要重复触发。"}
>
<PlayCircle size={14} /> {nightlyButtonText(nightlyStatus, scanningAll)}
</button>
<div className="admin-page__actions admin-drive-list-actions">
<div className="admin-task-controls" aria-label="所有网盘任务控制">
<button
type="button"
className="admin-btn"
onClick={handleRunNightly}
disabled={scanningAll}
title={nightlyBusyText(nightlyStatus) || "立即扫描所有网盘。耗时较长,期间不要重复触发。"}
>
<PlayCircle size={14} /> {nightlyButtonText(nightlyStatus, scanningAll)}
</button>
<button
type="button"
className="admin-btn is-stop"
onClick={handleStopAllTasks}
disabled={stoppingAll}
title="停止所有网盘当前的扫描、封面、预览视频和视频指纹生成任务。"
>
<CircleStop size={14} /> {stoppingAll ? "停止中..." : "停止所有网盘任务"}
</button>
</div>
<button type="button" className="admin-btn is-primary" onClick={openCreate}>
<Plus size={14} />
</button>
@@ -631,7 +715,7 @@ export function DrivesPage() {
type="button"
key={d.id}
className="admin-drive-card"
onClick={() => setSelectedDriveId(d.id)}
onClick={() => openDriveDetail(d.id)}
aria-label={`管理网盘 ${d.name || d.id}`}
>
<div className="admin-drive-card__header">
+14
View File
@@ -176,6 +176,13 @@ export function rescan(id: string) {
);
}
export function stopDriveTasks(id: string) {
return request<{ ok: boolean; stopped: boolean }>(
`/drives/${encodeURIComponent(id)}/tasks/stop`,
{ method: "POST" }
);
}
export type P123QRSession = {
loginUuid: string;
uniID: string;
@@ -439,3 +446,10 @@ export function runNightlyJob() {
{ method: "POST" }
);
}
export function stopAllTasks() {
return request<{ ok: boolean; stoppedDrives: number; status: NightlyJobStatus }>(
"/tasks/stop",
{ method: "POST" }
);
}
+16 -10
View File
@@ -141,7 +141,7 @@ export function DriveCardMetrics({ d }: { d: api.AdminDrive }) {
</strong>
</div>
<div className="admin-drive-card__metric">
<span> (/)</span>
<span> (/)</span>
<strong>
{d.fingerprintReadyCount ?? 0}
<span style={{ fontSize: "11px", fontWeight: "normal", color: "var(--text-faint)" }}>
@@ -174,6 +174,15 @@ export function DriveGenerationPanel({
onRegenFailedThumbnails: () => void;
onRegenFailedFingerprints: () => void;
}) {
const canQueueThumbnails =
(d.thumbnailFailedCount ?? 0) > 0 ||
(d.thumbnailPendingCount ?? 0) > 0 ||
(d.thumbnailDurationPendingCount ?? 0) > 0;
const canQueuePreviews =
(d.teaserFailedCount ?? 0) > 0 || (d.teaserPendingCount ?? 0) > 0;
const canQueueFingerprints =
(d.fingerprintFailedCount ?? 0) > 0 || (d.fingerprintPendingCount ?? 0) > 0;
return (
<div className="admin-detail-card">
<header className="admin-detail-card__title">
@@ -249,30 +258,27 @@ export function DriveGenerationPanel({
<div className="admin-detail-actions">
<button
className="admin-btn"
disabled={(d.thumbnailFailedCount ?? 0) <= 0 || regenFailedThumbId === d.id}
disabled={!canQueueThumbnails || regenFailedThumbId === d.id}
onClick={onRegenFailedThumbnails}
>
<RotateCcw size={13} />
<span></span>
<span>{(d.thumbnailFailedCount ?? 0) > 0 ? "重试失败封面" : "继续生成封面"}</span>
</button>
<button
className="admin-btn"
disabled={(d.teaserFailedCount ?? 0) <= 0 || regenFailedId === d.id}
disabled={!canQueuePreviews || regenFailedId === d.id}
onClick={onRegenFailed}
>
<RotateCcw size={13} />
<span></span>
<span>{(d.teaserFailedCount ?? 0) > 0 ? "重试失败预览视频" : "继续生成预览视频"}</span>
</button>
<button
className="admin-btn"
disabled={
(d.fingerprintFailedCount ?? 0) <= 0 ||
regenFailedFingerprintId === d.id
}
disabled={!canQueueFingerprints || regenFailedFingerprintId === d.id}
onClick={onRegenFailedFingerprints}
>
<RotateCcw size={13} />
<span></span>
<span>{(d.fingerprintFailedCount ?? 0) > 0 ? "重试失败指纹" : "继续生成指纹"}</span>
</button>
</div>
</div>
+112 -6
View File
@@ -219,6 +219,68 @@
display: flex;
flex-wrap: wrap;
gap: var(--space-2);
align-items: center;
}
.admin-task-controls {
display: inline-flex;
align-items: center;
flex-wrap: wrap;
max-width: 100%;
gap: var(--space-2);
padding: 0;
border: 0;
border-radius: var(--radius-sm);
background: transparent;
}
.admin-task-controls .admin-btn {
min-height: 34px;
}
.admin-task-controls .admin-btn.is-primary {
background: var(--accent);
border-color: transparent;
box-shadow: none;
}
.admin-task-controls .admin-btn.is-primary:hover:not(:disabled) {
filter: none;
background: var(--accent-hover);
border-color: transparent;
box-shadow: none;
}
.admin-detail-actions__danger {
margin-left: auto;
}
@media (max-width: 640px) {
.admin-drive-list-actions {
width: 100%;
}
.admin-task-controls {
width: 100%;
}
.admin-task-controls .admin-btn {
flex: 1 1 130px;
}
.admin-drive-list-actions > .admin-btn {
width: 100%;
min-height: 34px;
}
.admin-detail-actions > .admin-btn {
flex: 1 1 130px;
min-height: 34px;
}
.admin-detail-actions__danger {
margin-left: 0;
}
}
/* =========================================================
@@ -431,6 +493,7 @@
.admin-sidebar__mobile-menu:focus-visible {
outline: 2px solid var(--accent);
outline-offset: 2px;
box-shadow: none;
}
.admin-btn:active:not(:disabled) {
@@ -438,17 +501,18 @@
}
.admin-btn.is-primary {
background: var(--accent-gradient);
border-color: transparent;
background: var(--accent);
border-color: var(--accent);
color: var(--text-on-accent);
box-shadow: 0 4px 12px var(--accent-glow), var(--shadow-inset);
box-shadow: none;
}
.admin-btn.is-primary:hover:not(:disabled) {
filter: brightness(1.05);
border-color: transparent;
filter: none;
background: var(--accent-hover);
border-color: var(--accent-hover);
color: var(--text-on-accent);
box-shadow: 0 6px 16px var(--accent-glow), var(--shadow-inset);
box-shadow: none;
}
.admin-btn.is-danger {
@@ -463,6 +527,48 @@
color: var(--danger);
}
.admin-btn.is-stop {
color: var(--warning);
border-color: rgba(245, 181, 74, 0.36);
background: var(--warning-soft);
}
.admin-btn.is-stop:hover:not(:disabled) {
color: var(--warning);
border-color: var(--warning);
background: rgba(245, 181, 74, 0.2);
box-shadow: 0 2px 10px rgba(245, 181, 74, 0.12);
}
.admin-detail-actions > .admin-btn:not(.is-danger) {
background: var(--accent);
border-color: var(--accent);
color: var(--text-on-accent);
box-shadow: none;
}
.admin-detail-actions > .admin-btn:not(.is-danger):hover:not(:disabled) {
background: var(--accent-hover);
border-color: var(--accent-hover);
color: var(--text-on-accent);
box-shadow: none;
}
.admin-detail-actions .admin-btn.is-stop {
background: var(--warning);
border-color: var(--warning);
color: var(--text-on-accent);
box-shadow: none;
}
.admin-detail-actions .admin-btn.is-stop:hover:not(:disabled) {
background: var(--warning);
border-color: var(--warning);
color: var(--text-on-accent);
filter: brightness(1.04);
box-shadow: none;
}
.admin-btn:disabled {
opacity: 0.45;
cursor: not-allowed;
+42
View File
@@ -6,10 +6,18 @@ const drivesPageSource = readFileSync(
new URL("../src/admin/DrivesPage.tsx", import.meta.url),
"utf8"
);
const driveComponentsSource = readFileSync(
new URL("../src/admin/drive/DriveComponents.tsx", import.meta.url),
"utf8"
);
const driveFormSource = readFileSync(
new URL("../src/admin/drive/DriveForm.tsx", import.meta.url),
"utf8"
);
const apiSource = readFileSync(
new URL("../src/admin/api.ts", import.meta.url),
"utf8"
);
const constantsSource = readFileSync(
new URL("../src/admin/drive/constants.ts", import.meta.url),
"utf8"
@@ -150,3 +158,37 @@ test("drive type selector keeps primary source order", () => {
{ value: "wopan", label: "联通沃盘" },
]);
});
test("drive management exposes stop task controls", () => {
assert.match(apiSource, /stopDriveTasks/);
assert.match(apiSource, /\/drives\/\$\{encodeURIComponent\(id\)\}\/tasks\/stop/);
assert.match(apiSource, /stopAllTasks/);
assert.match(apiSource, /"\/tasks\/stop"/);
assert.match(drivesPageSource, /is-stop/);
assert.match(drivesPageSource, /停止所有任务/);
assert.match(drivesPageSource, /停止所有网盘任务/);
});
test("drive detail selection is stored in the URL history", () => {
assert.match(drivesPageSource, /useSearchParams/);
assert.match(drivesPageSource, /searchParams\.get\("drive"\)/);
assert.match(drivesPageSource, /function openDriveDetail\(id: string\)/);
assert.match(drivesPageSource, /next\.set\("drive", id\)/);
assert.match(drivesPageSource, /function closeDriveDetail/);
assert.match(drivesPageSource, /next\.delete\("drive"\)/);
assert.doesNotMatch(drivesPageSource, /setSelectedDriveId/);
});
test("drive generation actions can resume pending work after stop", () => {
assert.match(driveComponentsSource, /thumbnailPendingCount/);
assert.match(driveComponentsSource, /teaserPendingCount/);
assert.match(driveComponentsSource, /fingerprintPendingCount/);
assert.match(driveComponentsSource, /继续生成封面/);
assert.match(driveComponentsSource, /继续生成预览视频/);
assert.match(driveComponentsSource, /继续生成指纹/);
});
test("drive cards label fingerprint count as video fingerprint count", () => {
assert.match(driveComponentsSource, /视频指纹数 \(就绪\/失败\)/);
assert.doesNotMatch(driveComponentsSource, />指纹数 \(就绪\/失败\)</);
});