Prioritize thumbnail generation before teasers

This commit is contained in:
Codex
2026-05-17 00:05:38 +08:00
parent 3dcfc5c8f6
commit 7057927f1d
5 changed files with 204 additions and 116 deletions
+63 -22
View File
@@ -212,10 +212,9 @@ func (a *App) SetPreviewEnabled(ctx context.Context, enabled bool) error {
for _, d := range a.registry.All() {
a.mu.Lock()
w := a.workers[d.ID()]
tw := a.thumbWorkers[d.ID()]
a.mu.Unlock()
if w != nil {
a.enqueuePending(ctx, d.ID(), w)
}
a.enqueueDriveGeneration(ctx, d.ID(), w, tw)
}
}()
}
@@ -262,6 +261,15 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
for id, worker := range thumbWorkers {
status := out[id]
status.Thumbnail = generationStatusFromPreview(worker.Status())
missing, err := a.cat.CountVideosNeedingThumbnail(context.Background(), id)
if err != nil {
log.Printf("[thumb] count missing thumbnails %s: %v", id, err)
} else {
status.Thumbnail.QueueLength = missing
if missing > 0 && status.Thumbnail.State == "idle" {
status.Thumbnail.State = "queued"
}
}
out[id] = status
}
return out
@@ -383,9 +391,6 @@ func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
})
worker := preview.NewWorker(gen, a.cat, drv, "")
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
if drv.Kind() == "p115" {
preview.ShareGenerationState(worker, thumbWorker)
}
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
@@ -456,15 +461,19 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
} else {
delete(a.cancels, driveID)
}
previewEnabled := a.previewEnabled
a.mu.Unlock()
if thumbWorker != nil {
go a.enqueueThumbnails(ctx, driveID, thumbWorker)
}
if previewEnabled && worker != nil {
go a.enqueuePending(ctx, driveID, worker)
if worker != nil {
if thumbWorker != nil {
worker.BeforeTask = func(taskCtx context.Context) bool {
return a.waitForThumbnailsBeforePreview(taskCtx, driveID)
}
} else {
worker.BeforeTask = nil
}
}
go a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
}
func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Worker) {
@@ -485,6 +494,46 @@ func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Wor
}
}
func (a *App) enqueueDriveGeneration(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker) {
if thumbWorker != nil {
a.enqueueThumbnails(ctx, driveID, thumbWorker)
}
if !a.PreviewEnabled() || worker == nil {
return
}
if thumbWorker != nil && !a.waitForThumbnailsBeforePreview(ctx, driveID) {
return
}
a.enqueuePending(ctx, driveID, worker)
}
func (a *App) waitForThumbnailsBeforePreview(ctx context.Context, driveID string) bool {
const pollInterval = time.Second
var lastLog time.Time
for {
missing, err := a.cat.CountVideosNeedingThumbnail(ctx, driveID)
if err != nil {
log.Printf("[preview] count missing thumbnails drive=%s: %v", driveID, err)
return false
}
if missing == 0 {
return true
}
now := time.Now()
if lastLog.IsZero() || now.Sub(lastLog) >= time.Minute {
log.Printf("[preview] drive=%s waiting for %d thumbnails before teaser generation", driveID, missing)
lastLog = now
}
timer := time.NewTimer(pollInterval)
select {
case <-ctx.Done():
timer.Stop()
return false
case <-timer.C:
}
}
}
func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.ThumbWorker) {
pending, err := a.cat.ListVideosNeedingThumbnail(ctx, driveID, 0)
if err != nil {
@@ -528,14 +577,11 @@ func (a *App) runScan(ctx context.Context, driveID string) {
a.mu.Unlock()
var onNew func(v *catalog.Video)
if thumbWorker != nil || (a.PreviewEnabled() && worker != nil) {
if thumbWorker != nil {
onNew = func(v *catalog.Video) {
if thumbWorker != nil && v.ThumbnailURL == "" {
thumbWorker.Enqueue(v)
}
if a.PreviewEnabled() && worker != nil {
worker.Enqueue(v)
}
}
}
@@ -571,12 +617,7 @@ func (a *App) runScan(ctx context.Context, driveID string) {
}
}
}
if thumbWorker != nil {
a.enqueueThumbnails(ctx, driveID, thumbWorker)
}
if a.PreviewEnabled() && worker != nil {
go a.enqueuePending(ctx, driveID, worker)
}
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
}
func (a *App) cleanupMissingDriveVideos(ctx context.Context, driveID string, liveFileIDs map[string]struct{}, visitedDirIDs map[string]struct{}, fullDriveScan bool) (int, error) {
+104 -1
View File
@@ -6,6 +6,7 @@ import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
@@ -76,6 +77,82 @@ func TestRegisterPreviewWorkerBackfillsPendingWhenPreviewEnabled(t *testing.T) {
t.Fatalf("preview status = %q, want ready", got.PreviewStatus)
}
func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now()
for _, v := range []*catalog.Video{
{ID: "video-1", DriveID: "drive-id", FileID: "file-1", Title: "Clip 1", PreviewStatus: "pending"},
{ID: "video-2", DriveID: "drive-id", FileID: "file-2", Title: "Clip 2", PreviewStatus: "pending"},
} {
v.PublishedAt = now
v.CreatedAt = now
v.UpdatedAt = now
if err := cat.UpsertVideo(ctx, v); err != nil {
t.Fatalf("seed video %s: %v", v.ID, err)
}
}
app := &App{
cat: cat,
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
previewEnabled: true,
}
gen := &serverFakeTeaserGenerator{}
drv := &serverFakeDrive{}
worker := preview.NewWorker(gen, cat, drv, "")
thumbWorker := preview.NewThumbWorker(gen, cat, drv)
go worker.Run(ctx)
go thumbWorker.Run(ctx)
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, func() {})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
first, err := cat.GetVideo(ctx, "video-1")
if err != nil {
t.Fatalf("get video-1: %v", err)
}
second, err := cat.GetVideo(ctx, "video-2")
if err != nil {
t.Fatalf("get video-2: %v", err)
}
if first.ThumbnailURL != "" && second.ThumbnailURL != "" &&
first.PreviewStatus == "ready" && second.PreviewStatus == "ready" {
events := gen.Events()
if len(events) != 4 {
t.Fatalf("events = %#v, want 4 generation events", events)
}
for i, event := range events[:2] {
if event[:6] != "thumb:" {
t.Fatalf("event %d = %q, want thumbnail before previews; all events=%#v", i, event, events)
}
}
for i, event := range events[2:] {
if event[:8] != "preview:" {
t.Fatalf("event %d = %q, want previews after thumbnails; all events=%#v", i+2, event, events)
}
}
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("generation did not finish, events=%#v", gen.Events())
}
func TestRegenFailedPreviewsQueuesOnlyFailedVideosForDrive(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -360,20 +437,46 @@ func TestCleanupMissingPikPakVideosRemovesDatabaseRowsAndLocalAssets(t *testing.
}
}
type serverFakeTeaserGenerator struct{}
type serverFakeTeaserGenerator struct {
mu sync.Mutex
events []string
}
func (g *serverFakeTeaserGenerator) record(event string) {
g.mu.Lock()
g.events = append(g.events, event)
g.mu.Unlock()
}
func (g *serverFakeTeaserGenerator) Events() []string {
g.mu.Lock()
defer g.mu.Unlock()
return append([]string(nil), g.events...)
}
func (g *serverFakeTeaserGenerator) Probe(context.Context, *drives.StreamLink) (float64, error) {
return 30, nil
}
func (g *serverFakeTeaserGenerator) Generate(context.Context, *drives.StreamLink, float64) (string, error) {
g.record("preview")
return "/tmp/source-teaser.mp4", nil
}
func (g *serverFakeTeaserGenerator) MoveToLocal(_ string, videoID string) (string, error) {
g.mu.Lock()
if len(g.events) > 0 && g.events[len(g.events)-1] == "preview" {
g.events[len(g.events)-1] = "preview:" + videoID
}
g.mu.Unlock()
return "/tmp/" + videoID + ".mp4", nil
}
func (g *serverFakeTeaserGenerator) GenerateThumbnail(_ context.Context, _ *drives.StreamLink, videoID string, _ float64) (string, error) {
g.record("thumb:" + videoID)
return "/tmp/" + videoID + ".jpg", nil
}
type serverFakeDrive struct{}
func (d *serverFakeDrive) Kind() string { return "fake" }
+12
View File
@@ -349,6 +349,18 @@ func (c *Catalog) ListVideosNeedingThumbnail(ctx context.Context, driveID string
return out, nil
}
func (c *Catalog) CountVideosNeedingThumbnail(ctx context.Context, driveID string) (int, error) {
var count int
err := c.db.QueryRowContext(ctx,
`SELECT COUNT(*) FROM videos
WHERE drive_id = ?
AND COALESCE(thumbnail_url, '') = ''
AND COALESCE(hidden, 0) = 0
AND `+uniqueVideoWhereSQL,
driveID).Scan(&count)
return count, err
}
func (c *Catalog) GetVideo(ctx context.Context, id string) (*Video, error) {
row := c.db.QueryRowContext(ctx, `SELECT `+allVideoCols+` FROM videos WHERE id = ?`, id)
return scanVideo(row)
+23 -92
View File
@@ -847,7 +847,8 @@ type Worker struct {
ch chan *catalog.Video
RateLimitCooldown time.Duration
shared *sharedGenerationState
BeforeTask func(context.Context) bool
rateLimit rateLimitState
activity taskActivity
}
@@ -857,7 +858,6 @@ func NewWorker(gen TeaserGenerator, cat *catalog.Catalog, drv drives.Drive, _ st
Catalog: cat,
Drive: drv,
ch: make(chan *catalog.Video, 4096),
shared: newSharedGenerationState(),
}
}
@@ -892,7 +892,7 @@ type ThumbWorker struct {
ch chan *catalog.Video
RateLimitCooldown time.Duration
shared *sharedGenerationState
rateLimit rateLimitState
activity taskActivity
}
@@ -908,11 +908,6 @@ type rateLimitState struct {
lastSkipLog time.Time
}
type sharedGenerationState struct {
rateLimit *rateLimitState
gate sync.Mutex
}
type TaskStatus struct {
State string
CurrentTitle string
@@ -995,31 +990,6 @@ func NewThumbWorker(gen ThumbnailGenerator, cat *catalog.Catalog, drv drives.Dri
Catalog: cat,
Drive: drv,
ch: make(chan *catalog.Video, 4096),
shared: newSharedGenerationState(),
}
}
func newSharedGenerationState() *sharedGenerationState {
return &sharedGenerationState{rateLimit: &rateLimitState{}}
}
// ShareGenerationState makes preview and thumbnail generation for one drive
// share cooldowns and avoid concurrent reads from the same remote media source.
func ShareGenerationState(worker *Worker, thumbWorker *ThumbWorker) {
state := newSharedGenerationState()
if worker != nil && worker.shared != nil {
state = worker.shared
} else if thumbWorker != nil && thumbWorker.shared != nil {
state = thumbWorker.shared
}
if state.rateLimit == nil {
state.rateLimit = &rateLimitState{}
}
if worker != nil {
worker.shared = state
}
if thumbWorker != nil {
thumbWorker.shared = state
}
}
@@ -1051,14 +1021,14 @@ func (w *Worker) Status() TaskStatus {
if w == nil {
return TaskStatus{State: "idle"}
}
return taskStatus(&w.activity, w.generationState().rateLimit, len(w.ch))
return taskStatus(&w.activity, &w.rateLimit, len(w.ch))
}
func (w *ThumbWorker) Status() TaskStatus {
if w == nil {
return TaskStatus{State: "idle"}
}
return taskStatus(&w.activity, w.generationState().rateLimit, len(w.ch))
return taskStatus(&w.activity, &w.rateLimit, len(w.ch))
}
func taskStatus(activity *taskActivity, rateLimit *rateLimitState, queueLength int) TaskStatus {
@@ -1121,71 +1091,27 @@ func (w *ThumbWorker) Run(ctx context.Context) {
}
func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
state := w.generationState()
unlock, ok := waitForGenerationTurn(ctx, state, "preview", w.Drive)
if !ok {
if w.BeforeTask != nil && !w.BeforeTask(ctx) {
return
}
defer unlock()
w.activity.start(v)
defer w.activity.done()
if !waitForRateLimitCooldown(ctx, &w.rateLimit, "preview", w.Drive) {
return
}
w.process(ctx, v)
}
func (w *ThumbWorker) processQueued(ctx context.Context, v *catalog.Video) {
state := w.generationState()
unlock, ok := waitForGenerationTurn(ctx, state, "thumb", w.Drive)
if !ok {
return
}
defer unlock()
w.activity.start(v)
defer w.activity.done()
if !waitForRateLimitCooldown(ctx, &w.rateLimit, "thumb", w.Drive) {
return
}
w.process(ctx, v)
}
func (w *Worker) generationState() *sharedGenerationState {
if w.shared == nil {
w.shared = newSharedGenerationState()
}
if w.shared.rateLimit == nil {
w.shared.rateLimit = &rateLimitState{}
}
return w.shared
}
func (w *ThumbWorker) generationState() *sharedGenerationState {
if w.shared == nil {
w.shared = newSharedGenerationState()
}
if w.shared.rateLimit == nil {
w.shared.rateLimit = &rateLimitState{}
}
return w.shared
}
func waitForGenerationTurn(ctx context.Context, state *sharedGenerationState, label string, drive drives.Drive) (func(), bool) {
if state == nil {
state = newSharedGenerationState()
}
if state.rateLimit == nil {
state.rateLimit = &rateLimitState{}
}
for {
if !waitForRateLimitCooldown(ctx, state.rateLimit, label, drive) {
return nil, false
}
state.gate.Lock()
if _, ok := state.rateLimit.coolingUntil(time.Now()); ok {
state.gate.Unlock()
continue
}
return state.gate.Unlock, true
}
}
func waitForRateLimitCooldown(ctx context.Context, state *rateLimitState, label string, drive drives.Drive) bool {
driveID := ""
if drive != nil {
@@ -1212,7 +1138,7 @@ func waitForRateLimitCooldown(ctx context.Context, state *rateLimitState, label
}
func (w *Worker) skipIfRateLimited(v *catalog.Video) bool {
until, ok, shouldLog := w.generationState().rateLimit.active(time.Now())
until, ok, shouldLog := w.rateLimit.active(time.Now())
if !ok {
return false
}
@@ -1230,7 +1156,7 @@ func (w *Worker) pauseForRateLimit(err error, step, title string) bool {
if retryAfter <= 0 {
retryAfter = w.RateLimitCooldown
}
until := w.generationState().rateLimit.pause(time.Now(), retryAfter)
until := w.rateLimit.pause(time.Now(), retryAfter)
log.Printf("[preview] drive=%s rate-limited until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
return true
}
@@ -1242,13 +1168,13 @@ func (w *Worker) pauseForRecoverableError(err error, step, title string) bool {
if !driveErrorShouldCooldown(w.Drive, err) {
return false
}
until := w.generationState().rateLimit.pause(time.Now(), w.RateLimitCooldown)
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
log.Printf("[preview] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
return true
}
func (w *ThumbWorker) skipIfRateLimited(v *catalog.Video) bool {
until, ok, shouldLog := w.generationState().rateLimit.active(time.Now())
until, ok, shouldLog := w.rateLimit.active(time.Now())
if !ok {
return false
}
@@ -1266,7 +1192,7 @@ func (w *ThumbWorker) pauseForRateLimit(err error, step, title string) bool {
if retryAfter <= 0 {
retryAfter = w.RateLimitCooldown
}
until := w.generationState().rateLimit.pause(time.Now(), retryAfter)
until := w.rateLimit.pause(time.Now(), retryAfter)
log.Printf("[thumb] drive=%s rate-limited until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
return true
}
@@ -1278,7 +1204,7 @@ func (w *ThumbWorker) pauseForRecoverableError(err error, step, title string) bo
if !driveErrorShouldCooldown(w.Drive, err) {
return false
}
until := w.generationState().rateLimit.pause(time.Now(), w.RateLimitCooldown)
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
log.Printf("[thumb] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
return true
}
@@ -1303,6 +1229,11 @@ func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
if w.skipIfRateLimited(v) {
return
}
if current, err := w.Catalog.GetVideo(ctx, v.ID); err == nil {
if current.ThumbnailURL != "" {
return
}
}
link, err := w.Drive.StreamURL(ctx, v.FileID)
if err != nil {
if localLink, ok := localPreviewLink(v); ok {
+2 -1
View File
@@ -333,6 +333,7 @@ function GenerationStatusLine({
const queueLength = status?.queueLength ?? 0;
const detail = generationDetail(status);
const title = generationTitle(status, detail);
const countText = queueLength > 0 ? `${label === "封面" ? "剩余" : "队列"} ${queueLength}` : "";
return (
<div className="admin-generation-row" title={title}>
@@ -342,7 +343,7 @@ function GenerationStatusLine({
</span>
{(detail || queueLength > 0) && (
<span className="admin-generation-detail">
{[detail, queueLength > 0 ? `队列 ${queueLength}` : ""].filter(Boolean).join(" / ")}
{[detail, countText].filter(Boolean).join(" / ")}
</span>
)}
</div>