feat: improve media generation pipeline status

This commit is contained in:
nianzhibai
2026-05-30 17:37:31 +08:00
parent afbff9eb55
commit e78fa9d978
11 changed files with 729 additions and 200 deletions
+208 -77
View File
@@ -40,6 +40,8 @@ import (
"github.com/video-site/backend/internal/spider91migrate"
)
const fingerprintReconcileInterval = time.Minute
func main() {
cfgPath := "./config.yaml"
if v := os.Getenv("VIDEO_CONFIG"); v != "" {
@@ -79,7 +81,8 @@ func main() {
GetTargetDriveID: func() string { return app.Spider91UploadDriveID() },
})
// 初始化现有 drives
// 初始化本地内置盘;外部云盘放到 HTTP 服务启动后异步挂载,避免上游
// 登录态校验拖慢端口监听。
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -88,16 +91,7 @@ func main() {
if err := app.attachLocalUpload(ctx); err != nil {
log.Printf("[local-upload] attach failed: %v", err)
}
existing, err := cat.ListDrives(ctx)
if err != nil {
log.Fatalf("list drives: %v", err)
}
for _, d := range existing {
if err := app.attachDrive(ctx, d); err != nil {
log.Printf("[drive %s] attach failed: %v", d.ID, err)
}
}
go app.runFingerprintReconciler(ctx)
authr := &auth.Authenticator{
Username: cfg.Server.Admin.Username,
@@ -256,6 +250,7 @@ func main() {
log.Fatalf("server error: %v", err)
}
}()
go app.attachExistingDrives(ctx)
// 等待退出信号
sigs := make(chan os.Signal, 1)
@@ -283,6 +278,10 @@ type App struct {
// spider91Crawlers 按 driveID 索引,每个 spider91 drive 独立一个 Crawler
spider91Crawlers map[string]*spider91.Crawler
// driveAttachMu 串行化云盘挂载/重挂载。挂载会访问上游服务,可能较慢;
// 串行化可以避免启动后台挂载和手动扫盘按需挂载同一个 drive 时重复创建 worker。
driveAttachMu sync.Mutex
// 全站主题("dark" | "pink"),从 DB 读
theme string
// 显式指定的 spider91 上传目标 drive ID。
@@ -311,6 +310,11 @@ type App struct {
// scanQueued 跟踪哪些 driveID 已经排队或正在跑,去重后续重复点击。
// 一个 drive 在 scheduleScan 入队时被加入,在 runScan goroutine 结束时被移除。
scanQueued map[string]bool
// fingerprintQueueing 去重每个 drive 的 pending 指纹补队列任务,避免定时
// reconcile 和扫盘结束同时为同一批 pending 视频启动多个长时间入队 goroutine。
fingerprintQueueMu sync.Mutex
fingerprintQueueing map[string]bool
}
// teaserEnabledForDrive 查询某个 drive 当前的 per-drive teaser 开关。
@@ -438,9 +442,13 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
for id, worker := range a.thumbWorkers {
thumbWorkers[id] = worker
}
fingerprintWorkers := make(map[string]*fingerprint.Worker, len(a.fingerprintWorkers))
for id, worker := range a.fingerprintWorkers {
fingerprintWorkers[id] = worker
}
a.mu.Unlock()
out := make(map[string]api.DriveGenerationStatuses, len(previewWorkers)+len(thumbWorkers))
out := make(map[string]api.DriveGenerationStatuses, len(previewWorkers)+len(thumbWorkers)+len(fingerprintWorkers))
for id, worker := range previewWorkers {
status := out[id]
status.Preview = generationStatusFromPreview(worker.Status())
@@ -460,6 +468,20 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
}
out[id] = status
}
for id, worker := range fingerprintWorkers {
status := out[id]
status.Fingerprint = generationStatusFromFingerprint(worker.Status())
pending, err := a.cat.CountVideosNeedingFingerprint(context.Background(), id)
if err != nil {
log.Printf("[fingerprint] count pending fingerprints %s: %v", id, err)
} else {
status.Fingerprint.QueueLength = pending
if pending > 0 && status.Fingerprint.State == "idle" {
status.Fingerprint.State = "queued"
}
}
out[id] = status
}
return out
}
@@ -479,7 +501,67 @@ func generationStatusFromPreview(status preview.TaskStatus) api.GenerationStatus
return out
}
func generationStatusFromFingerprint(status fingerprint.TaskStatus) api.GenerationStatus {
state := status.State
if state == "" {
state = "idle"
}
out := api.GenerationStatus{
State: state,
CurrentTitle: status.CurrentTitle,
QueueLength: status.QueueLength,
}
if !status.CooldownUntil.IsZero() {
out.CooldownUntil = status.CooldownUntil.Format(time.RFC3339)
}
return out
}
func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
a.driveAttachMu.Lock()
defer a.driveAttachMu.Unlock()
return a.attachDriveUnlocked(ctx, d)
}
func (a *App) ensureDriveAttached(ctx context.Context, driveID string) error {
if _, ok := a.registry.Get(driveID); ok {
return nil
}
a.driveAttachMu.Lock()
defer a.driveAttachMu.Unlock()
if _, ok := a.registry.Get(driveID); ok {
return nil
}
d, err := a.cat.GetDrive(ctx, driveID)
if err != nil {
return err
}
return a.attachDriveUnlocked(ctx, d)
}
func (a *App) attachExistingDrives(ctx context.Context) {
existing, err := a.cat.ListDrives(ctx)
if err != nil {
log.Printf("[drive] list existing drives: %v", err)
return
}
log.Printf("[drive] attaching %d configured drive(s) in background", len(existing))
for _, d := range existing {
if err := ctx.Err(); err != nil {
log.Printf("[drive] background attach stopped: %v", err)
return
}
if err := a.attachDrive(ctx, d); err != nil {
log.Printf("[drive %s] attach failed: %v", d.ID, err)
}
}
log.Printf("[drive] background attach complete")
}
func (a *App) attachDriveUnlocked(ctx context.Context, d *catalog.Drive) error {
if d == nil {
return errors.New("nil drive")
}
var drv drives.Drive
switch d.Kind {
case "quark":
@@ -583,7 +665,7 @@ func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprint.Config{})
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprintConfigForDrive(drv))
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
@@ -617,7 +699,7 @@ func (a *App) attachLocalUpload(ctx context.Context) error {
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprint.Config{})
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprintConfigForDrive(drv))
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
@@ -632,6 +714,20 @@ func (a *App) localUploadDir() string {
return filepath.Join(filepath.Dir(a.cfg.Storage.LocalPreviewDir), "uploads")
}
func fingerprintConfigForDrive(drv drives.Drive) fingerprint.Config {
cfg := fingerprint.Config{RateLimitCooldown: 5 * time.Minute}
if drv == nil {
return cfg
}
switch strings.ToLower(drv.Kind()) {
case "p115", "onedrive":
cfg.RateLimitCooldown = 10 * time.Minute
case "pikpak":
cfg.RateLimitCooldown = 5 * time.Minute
}
return cfg
}
// spider91RootDir 是所有 spider91 drive 共享的根目录。
func (a *App) spider91RootDir() string {
return filepath.Join(filepath.Dir(a.cfg.Storage.LocalPreviewDir), "spider91")
@@ -755,19 +851,9 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
}
a.mu.Unlock()
if worker != nil {
if thumbWorker != nil {
worker.BeforeTask = func(taskCtx context.Context) bool {
return a.waitForThumbnailsBeforePreview(taskCtx, driveID)
}
} else {
worker.BeforeTask = nil
}
}
go a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
if fingerprintWorker != nil {
go a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
a.scheduleFingerprintBackfill(ctx, driveID, fingerprintWorker)
}
}
@@ -792,45 +878,16 @@ func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Wor
func (a *App) enqueueDriveGeneration(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker) {
// 封面 worker 始终入队(与早期"全局 preview.enabled=false 时仍然生成封面"
// 的行为一致);teaser worker 仅在该 drive 的 TeaserEnabled 为 true 时入队。
// 两条队列互不等待,避免封面批量生成拖住预览视频生成。
if thumbWorker != nil {
a.enqueueThumbnails(ctx, driveID, thumbWorker)
}
if worker == nil || !a.teaserEnabledForDrive(ctx, driveID) {
return
}
if thumbWorker != nil && !a.waitForThumbnailsBeforePreview(ctx, driveID) {
return
}
a.enqueuePending(ctx, driveID, worker)
}
func (a *App) waitForThumbnailsBeforePreview(ctx context.Context, driveID string) bool {
const pollInterval = time.Second
var lastLog time.Time
for {
missing, err := a.cat.CountVideosNeedingThumbnail(ctx, driveID)
if err != nil {
log.Printf("[preview] count missing thumbnails drive=%s: %v", driveID, err)
return false
}
if missing == 0 {
return true
}
now := time.Now()
if lastLog.IsZero() || now.Sub(lastLog) >= time.Minute {
log.Printf("[preview] drive=%s waiting for %d thumbnails before teaser generation", driveID, missing)
lastLog = now
}
timer := time.NewTimer(pollInterval)
select {
case <-ctx.Done():
timer.Stop()
return false
case <-timer.C:
}
}
}
func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.ThumbWorker) {
pending, err := a.cat.ListVideosNeedingThumbnail(ctx, driveID, 0)
if err != nil {
@@ -849,6 +906,56 @@ func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.
}
}
func (a *App) runFingerprintReconciler(ctx context.Context) {
ticker := time.NewTicker(fingerprintReconcileInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
a.enqueueAllPendingFingerprints(ctx)
}
}
}
func (a *App) enqueueAllPendingFingerprints(ctx context.Context) {
a.mu.Lock()
workers := make(map[string]*fingerprint.Worker, len(a.fingerprintWorkers))
for id, worker := range a.fingerprintWorkers {
workers[id] = worker
}
a.mu.Unlock()
for driveID, worker := range workers {
a.scheduleFingerprintBackfill(ctx, driveID, worker)
}
}
func (a *App) scheduleFingerprintBackfill(ctx context.Context, driveID string, w *fingerprint.Worker) {
if w == nil {
return
}
a.fingerprintQueueMu.Lock()
if a.fingerprintQueueing == nil {
a.fingerprintQueueing = make(map[string]bool)
}
if a.fingerprintQueueing[driveID] {
a.fingerprintQueueMu.Unlock()
return
}
a.fingerprintQueueing[driveID] = true
a.fingerprintQueueMu.Unlock()
go func() {
defer func() {
a.fingerprintQueueMu.Lock()
delete(a.fingerprintQueueing, driveID)
a.fingerprintQueueMu.Unlock()
}()
a.enqueueFingerprints(ctx, driveID, w)
}()
}
func (a *App) enqueueFingerprints(ctx context.Context, driveID string, w *fingerprint.Worker) {
if w == nil {
return
@@ -974,6 +1081,10 @@ func (a *App) runScan(ctx context.Context, driveID string) {
a.scanGlobalMu.Lock()
defer a.scanGlobalMu.Unlock()
if err := a.ensureDriveAttached(ctx, driveID); err != nil {
log.Printf("[scan] drive %s attach failed: %v", driveID, err)
return
}
drv, ok := a.registry.Get(driveID)
if !ok {
log.Printf("[scan] drive %s not attached", driveID)
@@ -986,12 +1097,12 @@ func (a *App) runScan(ctx context.Context, driveID string) {
fingerprintWorker := a.fingerprintWorkers[driveID]
a.mu.Unlock()
var onNew func(v *catalog.Video)
if thumbWorker != nil {
onNew = func(v *catalog.Video) {
if thumbWorker != nil && v.ThumbnailURL == "" {
thumbWorker.Enqueue(v)
}
onNew := func(v *catalog.Video) {
if thumbWorker != nil && v.ThumbnailURL == "" {
thumbWorker.Enqueue(v)
}
if fingerprintWorker != nil {
fingerprintWorker.Enqueue(v)
}
}
@@ -1035,8 +1146,8 @@ func (a *App) runScan(ctx context.Context, driveID string) {
}
}
}
a.scheduleFingerprintBackfill(ctx, driveID, fingerprintWorker)
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
}
func (a *App) cleanupMissingDriveVideos(ctx context.Context, driveID string, liveFileIDs map[string]struct{}, visitedDirIDs map[string]struct{}, fullDriveScan bool) (int, error) {
@@ -1390,26 +1501,36 @@ func (a *App) regenFailedThumbnails(ctx context.Context, driveID string) {
}
// listScanTargetIDs 返回 nightly Phase 1 应扫描的所有 drive ID
// (非 spider91、非 localupload)。顺序按 registry.All 给的稳定顺序。
func (a *App) listScanTargetIDs(_ context.Context) []string {
all := a.registry.All()
// (非 spider91、非 localupload)。它直接读 catalog,而不是 registry,这样
// 进程刚启动、云盘还在后台挂载时,nightly 也不会漏掉配置过的 drive。
func (a *App) listScanTargetIDs(ctx context.Context) []string {
all, err := a.cat.ListDrives(ctx)
if err != nil {
log.Printf("[nightly] list scan target drives: %v", err)
return nil
}
out := make([]string, 0, len(all))
for _, d := range all {
if !shouldScanDrive(d) {
if d == nil || d.ID == localupload.DriveID || d.Kind == spider91.Kind {
continue
}
out = append(out, d.ID())
out = append(out, d.ID)
}
return out
}
// listSpider91DriveIDs 返回 nightly Phase 2 应触发爬取的 spider91 drive ID 列表。
func (a *App) listSpider91DriveIDs(_ context.Context) []string {
a.mu.Lock()
defer a.mu.Unlock()
out := make([]string, 0, len(a.spider91Crawlers))
for id := range a.spider91Crawlers {
out = append(out, id)
func (a *App) listSpider91DriveIDs(ctx context.Context) []string {
all, err := a.cat.ListDrives(ctx)
if err != nil {
log.Printf("[nightly] list spider91 drives: %v", err)
return nil
}
out := make([]string, 0, len(all))
for _, d := range all {
if d != nil && d.Kind == spider91.Kind {
out = append(out, d.ID)
}
}
return out
}
@@ -1417,8 +1538,8 @@ func (a *App) listSpider91DriveIDs(_ context.Context) []string {
// waitAllPreviewQueuesIdle 阻塞直到所有 drive 的封面 worker 和 teaser worker
// 队列都为空且无 in-flight 任务。
//
// 顺序:先等所有 thumb worker(因为 enqueueDriveGeneration 内部已经先等当前
// drive 的封面再入队 teaser,但这里是跨 drive 的全局同步),再等所有 teaser
// 顺序:先等所有 thumb worker,再等所有 teaser。两个队列生成时互不等待;
// nightly 只在 phase 边界统一等待它们都 drain
// 若 ctx 在等待中被取消(软超时 / shutdown),立即返回 ctx.Err。
func (a *App) waitAllPreviewQueuesIdle(ctx context.Context) error {
a.mu.Lock()
@@ -1468,7 +1589,17 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
c := a.spider91Crawlers[driveID]
a.mu.Unlock()
if c == nil {
return
if err := a.ensureDriveAttached(ctx, driveID); err != nil {
log.Printf("[spider91] drive=%s attach failed: %v", driveID, err)
return
}
a.mu.Lock()
c = a.spider91Crawlers[driveID]
a.mu.Unlock()
if c == nil {
log.Printf("[spider91] drive=%s crawler not attached", driveID)
return
}
}
d, err := a.cat.GetDrive(ctx, driveID)
@@ -1517,8 +1648,8 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
thumbWorker := a.thumbWorkers[driveID]
fingerprintWorker := a.fingerprintWorkers[driveID]
a.mu.Unlock()
a.scheduleFingerprintBackfill(ctx, driveID, fingerprintWorker)
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
}
// spider91IntCred 解析 credentials 中的整数字段,缺省时返回 def。
+189 -34
View File
@@ -15,6 +15,7 @@ import (
"github.com/video-site/backend/internal/drives"
"github.com/video-site/backend/internal/fingerprint"
"github.com/video-site/backend/internal/preview"
"github.com/video-site/backend/internal/proxy"
)
func TestRegisterPreviewWorkerBackfillsPendingWhenDriveTeaserEnabled(t *testing.T) {
@@ -78,7 +79,7 @@ func TestRegisterPreviewWorkerBackfillsPendingWhenDriveTeaserEnabled(t *testing.
t.Fatalf("preview status = %q, want ready", got.PreviewStatus)
}
func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
func TestRegisterPreviewWorkersRunThumbnailsAndPreviewsIndependently(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -94,16 +95,18 @@ func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
seedDriveWithTeaser(t, cat, "drive-id", true)
now := time.Now()
for _, v := range []*catalog.Video{
{ID: "video-1", DriveID: "drive-id", FileID: "file-1", Title: "Clip 1", PreviewStatus: "pending"},
{ID: "video-2", DriveID: "drive-id", FileID: "file-2", Title: "Clip 2", PreviewStatus: "pending"},
} {
v.PublishedAt = now
v.CreatedAt = now
v.UpdatedAt = now
if err := cat.UpsertVideo(ctx, v); err != nil {
t.Fatalf("seed video %s: %v", v.ID, err)
}
video := &catalog.Video{
ID: "video-1",
DriveID: "drive-id",
FileID: "file-1",
Title: "Clip 1",
PreviewStatus: "pending",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
}
if err := cat.UpsertVideo(ctx, video); err != nil {
t.Fatalf("seed video: %v", err)
}
app := &App{
@@ -111,7 +114,10 @@ func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
}
gen := &serverFakeTeaserGenerator{}
gen := &serverBlockingThumbGenerator{
started: make(chan string, 1),
release: make(chan struct{}),
}
drv := &serverFakeDrive{}
worker := preview.NewWorker(gen, cat, drv)
thumbWorker := preview.NewThumbWorker(gen, cat, drv)
@@ -120,38 +126,36 @@ func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, nil, func() {})
select {
case got := <-gen.started:
if got != video.ID {
t.Fatalf("thumbnail started for %q, want %q", got, video.ID)
}
case <-time.After(2 * time.Second):
t.Fatal("thumbnail generation did not start")
}
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
first, err := cat.GetVideo(ctx, "video-1")
got, err := cat.GetVideo(ctx, video.ID)
if err != nil {
t.Fatalf("get video-1: %v", err)
t.Fatalf("get video: %v", err)
}
second, err := cat.GetVideo(ctx, "video-2")
if err != nil {
t.Fatalf("get video-2: %v", err)
}
if first.ThumbnailURL != "" && second.ThumbnailURL != "" &&
first.PreviewStatus == "ready" && second.PreviewStatus == "ready" {
events := gen.Events()
if len(events) != 4 {
t.Fatalf("events = %#v, want 4 generation events", events)
}
for i, event := range events[:2] {
if event[:6] != "thumb:" {
t.Fatalf("event %d = %q, want thumbnail before previews; all events=%#v", i, event, events)
}
}
for i, event := range events[2:] {
if event[:8] != "preview:" {
t.Fatalf("event %d = %q, want previews after thumbnails; all events=%#v", i+2, event, events)
}
if got.PreviewStatus == "ready" {
if got.ThumbnailURL != "" {
t.Fatalf("thumbnail url = %q, want preview ready while thumbnail is still blocked", got.ThumbnailURL)
}
close(gen.release)
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("generation did not finish, events=%#v", gen.Events())
got, err := cat.GetVideo(ctx, video.ID)
if err != nil {
t.Fatalf("get video after timeout: %v", err)
}
t.Fatalf("preview status=%q thumbnail=%q, want preview ready before thumbnail finishes", got.PreviewStatus, got.ThumbnailURL)
}
func TestRegisterPreviewWorkersBackfillsHistoricalFingerprints(t *testing.T) {
@@ -220,6 +224,126 @@ func TestRegisterPreviewWorkersBackfillsHistoricalFingerprints(t *testing.T) {
t.Fatalf("fingerprint status=%q sampled=%q, want ready with hash", got.FingerprintStatus, got.SampledSHA256)
}
func TestRunScanStartsFingerprintBeforeThumbnailAndPreviewDrain(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
seedDriveWithTeaser(t, cat, "drive-id", true)
dataPath := filepath.Join(t.TempDir(), "scan-video.mp4")
data := []byte("scan video content for independent fingerprint")
if err := os.WriteFile(dataPath, data, 0o644); err != nil {
t.Fatalf("write video data: %v", err)
}
drv := &serverScanFingerprintFakeDrive{
serverFingerprintFakeDrive: serverFingerprintFakeDrive{path: dataPath},
entries: []drives.Entry{{
ID: "file-id",
Name: "scan-video.mp4",
Size: int64(len(data)),
ParentID: "root",
}},
}
registry := proxy.NewRegistry()
registry.Set("drive-id", drv)
gen := &serverFakeTeaserGenerator{}
worker := preview.NewWorker(gen, cat, drv)
thumbWorker := preview.NewThumbWorker(gen, cat, drv)
fingerprintWorker := fingerprint.NewWorker(cat, drv, fingerprint.Config{})
go fingerprintWorker.Run(ctx)
app := &App{
cfg: &config.Config{
Scanner: config.Scanner{VideoExtensions: []string{".mp4"}},
},
cat: cat,
registry: registry,
workers: map[string]*preview.Worker{"drive-id": worker},
thumbWorkers: map[string]*preview.ThumbWorker{"drive-id": thumbWorker},
fingerprintWorkers: map[string]*fingerprint.Worker{"drive-id": fingerprintWorker},
}
done := make(chan struct{})
go func() {
defer close(done)
app.runScan(ctx, "drive-id")
}()
videoID := "fake-drive-id-file-id"
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
got, err := cat.GetVideo(ctx, videoID)
if err == nil && got.SampledSHA256 != "" && got.FingerprintStatus == "ready" {
cancel()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("scan did not stop after context cancel")
}
if got.ThumbnailURL != "" {
t.Fatalf("thumbnail url = %q, want fingerprint before thumbnail generation", got.ThumbnailURL)
}
return
}
time.Sleep(10 * time.Millisecond)
}
cancel()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("scan did not stop after context cancel")
}
got, err := cat.GetVideo(context.Background(), videoID)
if err != nil {
t.Fatalf("get video after timeout: %v", err)
}
t.Fatalf("fingerprint status=%q sampled=%q, want ready before thumbnail/preview drain", got.FingerprintStatus, got.SampledSHA256)
}
func TestNightlyTargetsComeFromCatalogBeforeDriveAttach(t *testing.T) {
ctx := context.Background()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
for _, d := range []*catalog.Drive{
{ID: "115", Kind: "p115", Name: "115", RootID: "0", TeaserEnabled: true},
{ID: "pikpak", Kind: "pikpak", Name: "PikPak", RootID: "0", TeaserEnabled: true},
{ID: "91-spider", Kind: "spider91", Name: "91 Spider", RootID: "0", TeaserEnabled: true},
} {
if err := cat.UpsertDrive(ctx, d); err != nil {
t.Fatalf("seed drive %s: %v", d.ID, err)
}
}
app := &App{cat: cat}
scanIDs := app.listScanTargetIDs(ctx)
if len(scanIDs) != 2 || scanIDs[0] != "115" || scanIDs[1] != "pikpak" {
t.Fatalf("scan target ids = %#v, want 115 and pikpak from catalog", scanIDs)
}
spiderIDs := app.listSpider91DriveIDs(ctx)
if len(spiderIDs) != 1 || spiderIDs[0] != "91-spider" {
t.Fatalf("spider91 ids = %#v, want catalog spider drive", spiderIDs)
}
}
func TestFailedThumbnailsDoNotBlockPreviewGeneration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -687,6 +811,28 @@ func (g *serverFakeTeaserGenerator) GenerateThumbnail(_ context.Context, _ *driv
return "/tmp/" + videoID + ".jpg", nil
}
type serverBlockingThumbGenerator struct {
serverFakeTeaserGenerator
started chan string
release chan struct{}
}
func (g *serverBlockingThumbGenerator) GenerateThumbnail(ctx context.Context, _ *drives.StreamLink, videoID string, _ float64) (string, error) {
g.record("thumb:" + videoID)
if g.started != nil {
select {
case g.started <- videoID:
default:
}
}
select {
case <-g.release:
return "/tmp/" + videoID + ".jpg", nil
case <-ctx.Done():
return "", ctx.Err()
}
}
type serverFakeDrive struct{}
func (d *serverFakeDrive) Kind() string { return "fake" }
@@ -720,6 +866,15 @@ func (d *serverFingerprintFakeDrive) StreamURL(context.Context, string) (*drives
return &drives.StreamLink{URL: d.path}, nil
}
type serverScanFingerprintFakeDrive struct {
serverFingerprintFakeDrive
entries []drives.Entry
}
func (d *serverScanFingerprintFakeDrive) List(context.Context, string) ([]drives.Entry, error) {
return d.entries, nil
}
type serverLocalUploadFakeDrive struct {
serverFakeDrive
}
+41 -23
View File
@@ -78,8 +78,9 @@ type GenerationStatus struct {
}
type DriveGenerationStatuses struct {
Thumbnail GenerationStatus `json:"thumbnail"`
Preview GenerationStatus `json:"preview"`
Thumbnail GenerationStatus `json:"thumbnail"`
Preview GenerationStatus `json:"preview"`
Fingerprint GenerationStatus `json:"fingerprint"`
}
func (a *AdminServer) Register(r chi.Router) {
@@ -346,6 +347,11 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusInternalServerError, err)
return
}
fingerprintCounts, err := a.Catalog.CountFingerprintsByDrive(r.Context())
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
return
}
generationStatuses := map[string]DriveGenerationStatuses{}
if a.GetDriveGenerationStatuses != nil {
generationStatuses = a.GetDriveGenerationStatuses()
@@ -368,20 +374,25 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
SkipDirIDs []string `json:"skipDirIds"`
// LastCrawlAt 是 spider91 上次成功爬取的 unix 秒(来自 credentials.last_crawl_at)。
// 其它 kind 留 0;前端用它显示"上次抓取: N 小时前"。
LastCrawlAt int64 `json:"lastCrawlAt,omitempty"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
ThumbnailReadyCount int `json:"thumbnailReadyCount"`
ThumbnailPendingCount int `json:"thumbnailPendingCount"`
ThumbnailFailedCount int `json:"thumbnailFailedCount"`
TeaserReadyCount int `json:"teaserReadyCount"`
TeaserPendingCount int `json:"teaserPendingCount"`
TeaserFailedCount int `json:"teaserFailedCount"`
LastCrawlAt int64 `json:"lastCrawlAt,omitempty"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
FingerprintGenerationStatus GenerationStatus `json:"fingerprintGenerationStatus"`
ThumbnailReadyCount int `json:"thumbnailReadyCount"`
ThumbnailPendingCount int `json:"thumbnailPendingCount"`
ThumbnailFailedCount int `json:"thumbnailFailedCount"`
TeaserReadyCount int `json:"teaserReadyCount"`
TeaserPendingCount int `json:"teaserPendingCount"`
TeaserFailedCount int `json:"teaserFailedCount"`
FingerprintReadyCount int `json:"fingerprintReadyCount"`
FingerprintPendingCount int `json:"fingerprintPendingCount"`
FingerprintFailedCount int `json:"fingerprintFailedCount"`
}
list := make([]out, 0, len(drives))
for _, d := range drives {
counts := teaserCounts[d.ID]
thumbCounts := thumbnailCounts[d.ID]
fingerprintCount := fingerprintCounts[d.ID]
generation := generationStatuses[d.ID]
if generation.Thumbnail.State == "" {
generation.Thumbnail.State = "idle"
@@ -389,6 +400,9 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
if generation.Preview.State == "" {
generation.Preview.State = "idle"
}
if generation.Fingerprint.State == "" {
generation.Fingerprint.State = "idle"
}
// spider91 没有用户凭证概念;只要存在 drive 行就视为"已配置"。
// last_crawl_at 是后端自动写入的运行状态字段,不计入 hasCredential 判定。
hasCred := false
@@ -414,18 +428,22 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
ID: d.ID, Kind: d.Kind, Name: d.Name,
RootID: d.RootID, ScanRootID: d.ScanRootID,
Status: d.Status, LastError: d.LastError,
HasCredential: hasCred,
TeaserEnabled: d.TeaserEnabled,
SkipDirIDs: append([]string{}, d.SkipDirIDs...),
LastCrawlAt: lastCrawlAt,
ThumbnailGenerationStatus: generation.Thumbnail,
PreviewGenerationStatus: generation.Preview,
ThumbnailReadyCount: thumbCounts.Ready,
ThumbnailPendingCount: thumbCounts.Pending,
ThumbnailFailedCount: thumbCounts.Failed,
TeaserReadyCount: counts.Ready,
TeaserPendingCount: counts.Pending,
TeaserFailedCount: counts.Failed,
HasCredential: hasCred,
TeaserEnabled: d.TeaserEnabled,
SkipDirIDs: append([]string{}, d.SkipDirIDs...),
LastCrawlAt: lastCrawlAt,
ThumbnailGenerationStatus: generation.Thumbnail,
PreviewGenerationStatus: generation.Preview,
FingerprintGenerationStatus: generation.Fingerprint,
ThumbnailReadyCount: thumbCounts.Ready,
ThumbnailPendingCount: thumbCounts.Pending,
ThumbnailFailedCount: thumbCounts.Failed,
TeaserReadyCount: counts.Ready,
TeaserPendingCount: counts.Pending,
TeaserFailedCount: counts.Failed,
FingerprintReadyCount: fingerprintCount.Ready,
FingerprintPendingCount: fingerprintCount.Pending,
FingerprintFailedCount: fingerprintCount.Failed,
})
}
writeJSON(w, http.StatusOK, list)
+73 -41
View File
@@ -323,11 +323,11 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
now := time.Now()
videos := []*catalog.Video{
{ID: "od-ready-1", DriveID: "OneDrive", FileID: "od-file-1", Title: "OD Ready 1", ThumbnailURL: "/p/thumb/od-ready-1", PreviewStatus: "ready", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "od-ready-2", DriveID: "OneDrive", FileID: "od-file-2", Title: "OD Ready 2", PreviewStatus: "ready", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "od-pending", DriveID: "OneDrive", FileID: "od-file-3", Title: "OD Pending", PreviewStatus: "pending", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "pp-pending", DriveID: "PikPak", FileID: "pp-file-1", Title: "PP Pending", PreviewStatus: "pending", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "pp-failed", DriveID: "PikPak", FileID: "pp-file-2", Title: "PP Failed", ThumbnailURL: "/p/thumb/pp-failed", PreviewStatus: "failed", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "od-ready-1", DriveID: "OneDrive", FileID: "od-file-1", Title: "OD Ready 1", Size: 100, ThumbnailURL: "/p/thumb/od-ready-1", PreviewStatus: "ready", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "od-ready-2", DriveID: "OneDrive", FileID: "od-file-2", Title: "OD Ready 2", Size: 100, PreviewStatus: "ready", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "od-pending", DriveID: "OneDrive", FileID: "od-file-3", Title: "OD Pending", Size: 100, PreviewStatus: "pending", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "pp-pending", DriveID: "PikPak", FileID: "pp-file-1", Title: "PP Pending", Size: 100, PreviewStatus: "pending", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
{ID: "pp-failed", DriveID: "PikPak", FileID: "pp-file-2", Title: "PP Failed", Size: 100, ThumbnailURL: "/p/thumb/pp-failed", PreviewStatus: "failed", PublishedAt: now, CreatedAt: now, UpdatedAt: now},
}
for _, v := range videos {
if err := cat.UpsertVideo(ctx, v); err != nil {
@@ -337,6 +337,12 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
if err := cat.UpdateVideoMeta(ctx, "od-ready-2", catalog.VideoMetaPatch{ThumbnailStatus: "failed"}); err != nil {
t.Fatalf("mark thumbnail failed: %v", err)
}
if err := cat.UpdateVideoFingerprint(ctx, "od-ready-1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "ready", ""); err != nil {
t.Fatalf("mark fingerprint ready: %v", err)
}
if err := cat.UpdateVideoFingerprint(ctx, "od-ready-2", "", "failed", "sample failed"); err != nil {
t.Fatalf("mark fingerprint failed: %v", err)
}
req := httptest.NewRequest(http.MethodGet, "/admin/api/drives", nil)
rr := httptest.NewRecorder()
@@ -345,8 +351,9 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
GetDriveGenerationStatuses: func() map[string]DriveGenerationStatuses {
return map[string]DriveGenerationStatuses{
"OneDrive": {
Thumbnail: GenerationStatus{State: "cooling", QueueLength: 3, CooldownUntil: "2026-05-16T21:00:00+08:00"},
Preview: GenerationStatus{State: "generating", CurrentTitle: "OD Pending"},
Thumbnail: GenerationStatus{State: "cooling", QueueLength: 3, CooldownUntil: "2026-05-16T21:00:00+08:00"},
Preview: GenerationStatus{State: "generating", CurrentTitle: "OD Pending"},
Fingerprint: GenerationStatus{State: "generating", CurrentTitle: "OD Pending"},
},
}
},
@@ -356,48 +363,64 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
t.Fatalf("status = %d, body = %s", rr.Code, rr.Body.String())
}
var got []struct {
ID string `json:"id"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
ThumbnailReadyCount int `json:"thumbnailReadyCount"`
ThumbnailPendingCount int `json:"thumbnailPendingCount"`
ThumbnailFailedCount int `json:"thumbnailFailedCount"`
TeaserReadyCount int `json:"teaserReadyCount"`
TeaserPendingCount int `json:"teaserPendingCount"`
TeaserFailedCount int `json:"teaserFailedCount"`
ID string `json:"id"`
ThumbnailGenerationStatus GenerationStatus `json:"thumbnailGenerationStatus"`
PreviewGenerationStatus GenerationStatus `json:"previewGenerationStatus"`
FingerprintGenerationStatus GenerationStatus `json:"fingerprintGenerationStatus"`
ThumbnailReadyCount int `json:"thumbnailReadyCount"`
ThumbnailPendingCount int `json:"thumbnailPendingCount"`
ThumbnailFailedCount int `json:"thumbnailFailedCount"`
TeaserReadyCount int `json:"teaserReadyCount"`
TeaserPendingCount int `json:"teaserPendingCount"`
TeaserFailedCount int `json:"teaserFailedCount"`
FingerprintReadyCount int `json:"fingerprintReadyCount"`
FingerprintPendingCount int `json:"fingerprintPendingCount"`
FingerprintFailedCount int `json:"fingerprintFailedCount"`
}
if err := json.NewDecoder(rr.Body).Decode(&got); err != nil {
t.Fatalf("decode: %v", err)
}
byID := map[string]struct {
TeaserReady int
TeaserPending int
TeaserFailed int
ThumbnailReady int
ThumbnailPending int
ThumbnailFailed int
Thumbnail GenerationStatus
Preview GenerationStatus
TeaserReady int
TeaserPending int
TeaserFailed int
ThumbnailReady int
ThumbnailPending int
ThumbnailFailed int
FingerprintReady int
FingerprintPending int
FingerprintFailed int
Thumbnail GenerationStatus
Preview GenerationStatus
Fingerprint GenerationStatus
}{}
for _, d := range got {
byID[d.ID] = struct {
TeaserReady int
TeaserPending int
TeaserFailed int
ThumbnailReady int
ThumbnailPending int
ThumbnailFailed int
Thumbnail GenerationStatus
Preview GenerationStatus
TeaserReady int
TeaserPending int
TeaserFailed int
ThumbnailReady int
ThumbnailPending int
ThumbnailFailed int
FingerprintReady int
FingerprintPending int
FingerprintFailed int
Thumbnail GenerationStatus
Preview GenerationStatus
Fingerprint GenerationStatus
}{
TeaserReady: d.TeaserReadyCount,
TeaserPending: d.TeaserPendingCount,
TeaserFailed: d.TeaserFailedCount,
ThumbnailReady: d.ThumbnailReadyCount,
ThumbnailPending: d.ThumbnailPendingCount,
ThumbnailFailed: d.ThumbnailFailedCount,
Thumbnail: d.ThumbnailGenerationStatus,
Preview: d.PreviewGenerationStatus,
TeaserReady: d.TeaserReadyCount,
TeaserPending: d.TeaserPendingCount,
TeaserFailed: d.TeaserFailedCount,
ThumbnailReady: d.ThumbnailReadyCount,
ThumbnailPending: d.ThumbnailPendingCount,
ThumbnailFailed: d.ThumbnailFailedCount,
FingerprintReady: d.FingerprintReadyCount,
FingerprintPending: d.FingerprintPendingCount,
FingerprintFailed: d.FingerprintFailedCount,
Thumbnail: d.ThumbnailGenerationStatus,
Preview: d.PreviewGenerationStatus,
Fingerprint: d.FingerprintGenerationStatus,
}
}
if byID["OneDrive"].TeaserReady != 2 || byID["OneDrive"].TeaserPending != 1 || byID["OneDrive"].TeaserFailed != 0 {
@@ -409,13 +432,22 @@ func TestHandleListDrivesIncludesTeaserCounts(t *testing.T) {
if byID["OneDrive"].Thumbnail.State != "cooling" || byID["OneDrive"].Preview.State != "generating" {
t.Fatalf("OneDrive generation statuses = %#v, want thumbnail cooling and preview generating", byID["OneDrive"])
}
if byID["OneDrive"].FingerprintReady != 1 || byID["OneDrive"].FingerprintPending != 1 || byID["OneDrive"].FingerprintFailed != 1 {
t.Fatalf("OneDrive fingerprint counts = %#v, want ready=1 pending=1 failed=1", byID["OneDrive"])
}
if byID["OneDrive"].Fingerprint.State != "generating" {
t.Fatalf("OneDrive fingerprint status = %#v, want generating", byID["OneDrive"].Fingerprint)
}
if byID["PikPak"].TeaserReady != 0 || byID["PikPak"].TeaserPending != 1 || byID["PikPak"].TeaserFailed != 1 {
t.Fatalf("PikPak counts = %#v, want ready=0 pending=1 failed=1", byID["PikPak"])
}
if byID["PikPak"].ThumbnailReady != 1 || byID["PikPak"].ThumbnailPending != 1 || byID["PikPak"].ThumbnailFailed != 0 {
t.Fatalf("PikPak thumbnail counts = %#v, want ready=1 pending=1 failed=0", byID["PikPak"])
}
if byID["PikPak"].Thumbnail.State != "idle" || byID["PikPak"].Preview.State != "idle" {
if byID["PikPak"].FingerprintPending != 2 {
t.Fatalf("PikPak fingerprint counts = %#v, want pending=2", byID["PikPak"])
}
if byID["PikPak"].Thumbnail.State != "idle" || byID["PikPak"].Preview.State != "idle" || byID["PikPak"].Fingerprint.State != "idle" {
t.Fatalf("PikPak generation statuses = %#v, want idle defaults", byID["PikPak"])
}
}
+52
View File
@@ -924,6 +924,12 @@ type DriveThumbnailCounts struct {
Failed int
}
type DriveFingerprintCounts struct {
Ready int
Pending int
Failed int
}
func (c *Catalog) CountTeasersByDrive(ctx context.Context) (map[string]DriveTeaserCounts, error) {
rows, err := c.db.QueryContext(ctx,
`SELECT drive_id,
@@ -986,6 +992,52 @@ func (c *Catalog) CountThumbnailsByDrive(ctx context.Context) (map[string]DriveT
return out, nil
}
func (c *Catalog) CountFingerprintsByDrive(ctx context.Context) (map[string]DriveFingerprintCounts, error) {
rows, err := c.db.QueryContext(ctx,
`SELECT drive_id,
COUNT(CASE WHEN COALESCE(sampled_sha256, '') != ''
OR COALESCE(fingerprint_status, 'pending') = 'ready' THEN 1 END) AS ready_count,
COUNT(CASE WHEN size_bytes > 0
AND COALESCE(sampled_sha256, '') = ''
AND COALESCE(fingerprint_status, 'pending') = 'pending' THEN 1 END) AS pending_count,
COUNT(CASE WHEN COALESCE(sampled_sha256, '') = ''
AND COALESCE(fingerprint_status, 'pending') = 'failed' THEN 1 END) AS failed_count
FROM videos
WHERE COALESCE(hidden, 0) = 0
GROUP BY drive_id`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make(map[string]DriveFingerprintCounts)
for rows.Next() {
var driveID string
var counts DriveFingerprintCounts
if err := rows.Scan(&driveID, &counts.Ready, &counts.Pending, &counts.Failed); err != nil {
return nil, err
}
out[driveID] = counts
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
func (c *Catalog) CountVideosNeedingFingerprint(ctx context.Context, driveID string) (int, error) {
var count int
err := c.db.QueryRowContext(ctx,
`SELECT COUNT(*) FROM videos
WHERE drive_id = ?
AND size_bytes > 0
AND COALESCE(sampled_sha256, '') = ''
AND COALESCE(fingerprint_status, 'pending') = 'pending'
AND COALESCE(hidden, 0) = 0`,
driveID).Scan(&count)
return count, err
}
type LocalMediaRef struct {
DriveID string
VideoID string
+1 -2
View File
@@ -463,8 +463,7 @@ func (c *Crawler) processOne(ctx context.Context, videoID string, item spiderVid
// 网站封面下载失败的视频:spider91 drive 的 thumb worker 按设计不
// 处理 spider91 视频(封面应是网站原图直接保存),所以没人接手。
// 显式标 'failed' 让 CountVideosNeedingThumbnail 排除(条件 status
// != 'failed'),否则 enqueueDriveGeneration → waitForThumbnailsBeforePreview
// 会因为 count > 0 把 teaser 入队永远卡在等待循环里。
// != 'failed'),避免后续封面补队列一直重复捞到这条视频。
_ = c.cfg.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{
ThumbnailStatus: "failed",
})
@@ -234,13 +234,13 @@ func TestCrawlerRunOnceMissingScript(t *testing.T) {
}
// TestCrawlerThumbDownloadFailureMarksStatusFailed 验证:网站封面下载失败时
// crawler 把 thumbnail_status 显式标 'failed',避免 enqueueDriveGeneration 的
// waitForThumbnailsBeforePreview 因为 count > 0 把 teaser 卡死等待
// crawler 把 thumbnail_status 显式标 'failed',避免后续封面补队列一直重复
// 捞到这条 spider91 视频
//
// 历史 bug:之前 thumb 下载失败仅打 logurl=”, status 走 schema DEFAULT 'pending'。
// CountVideosNeedingThumbnail 条件是 url=” AND status != 'failed' → count=1。
// spider91 drive 的 thumb worker 按设计不处理 spider91 视频 → 没人会改 status
// 结果 teaser 永远卡在 [preview] waiting for 1 thumbnails before teaser generation
// spider91 drive 的 thumb worker 按设计不处理 spider91 视频 → 没人会改 status
// 后续补队列会一直认为它还缺封面
func TestCrawlerThumbDownloadFailureMarksStatusFailed(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("shell-based fake script only on unix")
@@ -317,8 +317,7 @@ func TestCrawlerThumbDownloadFailureMarksStatusFailed(t *testing.T) {
// 关键断言:CountVideosNeedingThumbnail 应该返回 0。
// 该函数的 SQL 条件是 `url = '' AND status != 'failed'`;如果 crawler 没把
// status 标 'failed'schema DEFAULT 'pending'),count 就会是 1,外层
// waitForThumbnailsBeforePreview 会因为 count > 0 把 teaser 卡死等待。
// status 标 'failed'schema DEFAULT 'pending'),count 就会是 1
count, err := cat.CountVideosNeedingThumbnail(context.Background(), driveID)
if err != nil {
t.Fatalf("count: %v", err)
+126 -8
View File
@@ -24,12 +24,14 @@ const (
defaultSampleSizeBytes int64 = 512 * 1024
defaultFullHashMaxSize int64 = 8 * 1024 * 1024
defaultCooldown = 5 * time.Minute
defaultWorkerQueueSize = 10000
)
type Config struct {
SampleSizeBytes int64
FullHashMaxSize int64
HTTPClient *http.Client
SampleSizeBytes int64
FullHashMaxSize int64
RateLimitCooldown time.Duration
HTTPClient *http.Client
}
type Worker struct {
@@ -37,9 +39,18 @@ type Worker struct {
Drive drives.Drive
Config Config
ch chan *catalog.Video
queue videoQueue
http *http.Client
ch chan *catalog.Video
queue videoQueue
activity taskActivity
cooldown cooldownState
http *http.Client
}
type TaskStatus struct {
State string
CurrentTitle string
QueueLength int
CooldownUntil time.Time
}
func NewWorker(cat *catalog.Catalog, drv drives.Drive, cfg Config) *Worker {
@@ -53,11 +64,14 @@ func NewWorker(cat *catalog.Catalog, drv drives.Drive, cfg Config) *Worker {
if cfg.FullHashMaxSize <= 0 {
cfg.FullHashMaxSize = defaultFullHashMaxSize
}
if cfg.RateLimitCooldown <= 0 {
cfg.RateLimitCooldown = defaultCooldown
}
return &Worker{
Catalog: cat,
Drive: drv,
Config: cfg,
ch: make(chan *catalog.Video, 4096),
ch: make(chan *catalog.Video, defaultWorkerQueueSize),
http: hc,
}
}
@@ -110,6 +124,31 @@ func (w *Worker) Run(ctx context.Context) {
}
}
func (w *Worker) Status() TaskStatus {
if w == nil {
return TaskStatus{State: "idle"}
}
currentID, currentTitle := w.activity.current()
status := TaskStatus{
State: "idle",
CurrentTitle: currentTitle,
QueueLength: w.queue.lengthExcluding(currentID),
}
if until, ok := w.cooldown.active(time.Now()); ok {
status.State = "cooling"
status.CooldownUntil = until
return status
}
if currentID != "" {
status.State = "generating"
return status
}
if status.QueueLength > 0 {
status.State = "queued"
}
return status
}
func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
defer w.queue.release(v.ID)
if w.Catalog == nil || w.Drive == nil || v == nil || v.ID == "" {
@@ -122,16 +161,21 @@ func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
if current.SampledSHA256 != "" || current.FingerprintStatus == "ready" || current.Hidden {
return
}
w.activity.start(current)
defer w.activity.done()
sum, err := Compute(ctx, w.Drive, current, w.Config, w.http)
if err != nil {
var rl *drives.RateLimitError
if errors.As(err, &rl) {
wait := rl.RetryAfter
if wait <= 0 {
wait = defaultCooldown
wait = w.Config.RateLimitCooldown
}
until := time.Now().Add(wait)
w.cooldown.set(until)
log.Printf("[fingerprint] drive=%s rate limited; keep video=%s pending and cool down for %s: %v", w.Drive.ID(), current.ID, wait, err)
sleepContext(ctx, wait)
w.cooldown.clear(until)
return
}
log.Printf("[fingerprint] video=%s failed: %v", current.ID, err)
@@ -319,6 +363,65 @@ func sleepContext(ctx context.Context, d time.Duration) bool {
}
}
type taskActivity struct {
mu sync.Mutex
currentID string
currentTitle string
}
func (a *taskActivity) start(v *catalog.Video) {
a.mu.Lock()
defer a.mu.Unlock()
if v == nil {
a.currentID = ""
a.currentTitle = ""
return
}
a.currentID = v.ID
a.currentTitle = v.Title
}
func (a *taskActivity) done() {
a.mu.Lock()
a.currentID = ""
a.currentTitle = ""
a.mu.Unlock()
}
func (a *taskActivity) current() (string, string) {
a.mu.Lock()
defer a.mu.Unlock()
return a.currentID, a.currentTitle
}
type cooldownState struct {
mu sync.Mutex
until time.Time
}
func (s *cooldownState) set(until time.Time) {
s.mu.Lock()
s.until = until
s.mu.Unlock()
}
func (s *cooldownState) clear(until time.Time) {
s.mu.Lock()
if s.until.Equal(until) {
s.until = time.Time{}
}
s.mu.Unlock()
}
func (s *cooldownState) active(now time.Time) (time.Time, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.until.IsZero() || !s.until.After(now) {
return time.Time{}, false
}
return s.until, true
}
type videoQueue struct {
mu sync.Mutex
ids map[string]struct{}
@@ -348,3 +451,18 @@ func (q *videoQueue) release(id string) {
delete(q.ids, id)
q.mu.Unlock()
}
func (q *videoQueue) lengthExcluding(currentID string) int {
q.mu.Lock()
defer q.mu.Unlock()
n := len(q.ids)
if currentID != "" {
if _, ok := q.ids[currentID]; ok {
n--
}
}
if n < 0 {
return 0
}
return n
}
+3 -7
View File
@@ -975,7 +975,6 @@ type Worker struct {
queue videoQueue
RateLimitCooldown time.Duration
BeforeTask func(context.Context) bool
rateLimit rateLimitState
activity taskActivity
}
@@ -985,7 +984,7 @@ func NewWorker(gen TeaserGenerator, cat *catalog.Catalog, drv drives.Drive) *Wor
Gen: gen,
Catalog: cat,
Drive: drv,
ch: make(chan *catalog.Video, 4096),
ch: make(chan *catalog.Video, defaultWorkerQueueSize),
}
}
@@ -1036,6 +1035,7 @@ type ThumbWorker struct {
const (
defaultTransientMediaCooldown = 5 * time.Minute
defaultGenerationRateLimitCooldown = 5 * time.Minute
defaultWorkerQueueSize = 10000
maxPreviewTeaserSizeBytes int64 = 5 * 1024 * 1024 * 1024
previewStatusSkipped = "skipped"
)
@@ -1175,7 +1175,7 @@ func NewThumbWorker(gen ThumbnailGenerator, cat *catalog.Catalog, drv drives.Dri
Gen: gen,
Catalog: cat,
Drive: drv,
ch: make(chan *catalog.Video, 4096),
ch: make(chan *catalog.Video, defaultWorkerQueueSize),
}
}
@@ -1330,10 +1330,6 @@ func (w *ThumbWorker) Run(ctx context.Context) {
func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
defer w.queue.release(v)
if w.BeforeTask != nil && !w.BeforeTask(ctx) {
return
}
w.activity.start(v)
defer w.activity.done()
if !waitForRateLimitCooldown(ctx, &w.rateLimit, "preview", w.Drive) {
+27 -2
View File
@@ -427,13 +427,13 @@ export function DrivesPage() {
)}
</div>
{/* 右栏:Teaser / 封面 与 缓存占用 */}
{/* 右栏:Teaser / 封面 / 指纹 与 缓存占用 */}
<div>
<div className="admin-detail-card">
<header className="admin-detail-card__title">
<div className="admin-detail-card__title-left">
<PlayCircle size={16} />
<span>Teaser </span>
<span></span>
</div>
<div className="admin-detail-actions-inline">
<button
@@ -481,6 +481,22 @@ export function DrivesPage() {
/>
</div>
</div>
<div className="admin-detail-row">
<span className="admin-detail-label"></span>
<div className="admin-detail-value">
<GenerationStatusLine label="指纹" status={d.fingerprintGenerationStatus} />
</div>
</div>
<div className="admin-detail-row">
<span className="admin-detail-label"></span>
<div className="admin-detail-value">
<GenerationCounts
ready={d.fingerprintReadyCount}
pending={d.fingerprintPendingCount}
failed={d.fingerprintFailedCount}
/>
</div>
</div>
</div>
<div className="admin-detail-actions">
@@ -625,6 +641,15 @@ export function DrivesPage() {
</span>
</strong>
</div>
<div className="admin-drive-card__metric">
<span> (/)</span>
<strong>
{d.fingerprintReadyCount ?? 0}
<span style={{ fontSize: "11px", fontWeight: "normal", color: "var(--text-faint)" }}>
{" "}/ {d.fingerprintFailedCount ?? 0}
</span>
</strong>
</div>
</div>
<div className="admin-drive-card__footer">
+4
View File
@@ -96,12 +96,16 @@ export type AdminDrive = {
lastCrawlAt?: number;
thumbnailGenerationStatus?: DriveGenerationStatus;
previewGenerationStatus?: DriveGenerationStatus;
fingerprintGenerationStatus?: DriveGenerationStatus;
thumbnailReadyCount: number;
thumbnailPendingCount: number;
thumbnailFailedCount: number;
teaserReadyCount: number;
teaserPendingCount: number;
teaserFailedCount: number;
fingerprintReadyCount: number;
fingerprintPendingCount: number;
fingerprintFailedCount: number;
};
export type DriveGenerationStatus = {