mirror of
https://github.com/nianzhibai/91.git
synced 2026-06-15 08:45:41 +08:00
feat: probe video duration during thumbnail generation
This commit is contained in:
@@ -459,7 +459,7 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
|
||||
status.Thumbnail = generationStatusFromPreview(worker.Status())
|
||||
missing, err := a.cat.CountVideosNeedingThumbnail(context.Background(), id)
|
||||
if err != nil {
|
||||
log.Printf("[thumb] count missing thumbnails %s: %v", id, err)
|
||||
log.Printf("[thumb] count thumbnail work %s: %v", id, err)
|
||||
} else {
|
||||
status.Thumbnail.QueueLength = missing
|
||||
if missing > 0 && status.Thumbnail.State == "idle" {
|
||||
@@ -897,10 +897,10 @@ func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.
|
||||
if len(pending) == 0 {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] enqueue %d missing thumbnails for drive=%s", len(pending), driveID)
|
||||
log.Printf("[thumb] enqueue %d thumbnail/duration tasks for drive=%s", len(pending), driveID)
|
||||
for _, v := range pending {
|
||||
if !w.EnqueueBlocking(ctx, v) {
|
||||
log.Printf("[thumb] enqueue missing thumbnails canceled for drive=%s", driveID)
|
||||
log.Printf("[thumb] enqueue thumbnail/duration tasks canceled for drive=%s", driveID)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -501,7 +501,10 @@ func (c *Catalog) ListVideosByThumbnailStatus(ctx context.Context, driveID, stat
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ListVideosNeedingThumbnail returns videos that still need a thumbnail attempt.
|
||||
// ListVideosNeedingThumbnail returns videos that still need thumbnail-worker work.
|
||||
// Besides missing thumbnails, this includes videos with an existing thumbnail but
|
||||
// missing duration metadata, because the thumbnail worker probes duration while
|
||||
// it already has a stream link.
|
||||
// Failed thumbnails are reported separately and should not block teaser generation.
|
||||
// Videos whose local assets were cleared because they are fingerprint duplicates
|
||||
// stay pending in the DB, but uniqueVideoWhereSQL keeps them out of this queue
|
||||
@@ -513,7 +516,10 @@ func (c *Catalog) ListVideosNeedingThumbnail(ctx context.Context, driveID string
|
||||
rows, err := c.db.QueryContext(ctx,
|
||||
`SELECT `+allVideoCols+` FROM videos
|
||||
WHERE drive_id = ?
|
||||
AND COALESCE(thumbnail_url, '') = ''
|
||||
AND (
|
||||
COALESCE(thumbnail_url, '') = ''
|
||||
OR COALESCE(duration_seconds, 0) <= 0
|
||||
)
|
||||
AND COALESCE(thumbnail_status, 'pending') NOT IN ('failed', 'skipped')
|
||||
AND COALESCE(hidden, 0) = 0
|
||||
AND `+uniqueVideoWhereSQL+`
|
||||
@@ -540,7 +546,10 @@ func (c *Catalog) CountVideosNeedingThumbnail(ctx context.Context, driveID strin
|
||||
err := c.db.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM videos
|
||||
WHERE drive_id = ?
|
||||
AND COALESCE(thumbnail_url, '') = ''
|
||||
AND (
|
||||
COALESCE(thumbnail_url, '') = ''
|
||||
OR COALESCE(duration_seconds, 0) <= 0
|
||||
)
|
||||
AND COALESCE(thumbnail_status, 'pending') NOT IN ('failed', 'skipped')
|
||||
AND COALESCE(hidden, 0) = 0
|
||||
AND `+uniqueVideoWhereSQL,
|
||||
|
||||
@@ -7,6 +7,90 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestListVideosNeedingThumbnailIncludesExistingThumbnailMissingDuration(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, err := Open(t.TempDir() + "/catalog.db")
|
||||
if err != nil {
|
||||
t.Fatalf("open catalog: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := cat.Close(); err != nil {
|
||||
t.Fatalf("close catalog: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
now := time.Now()
|
||||
videos := []*Video{
|
||||
{
|
||||
ID: "duration-only",
|
||||
DriveID: "drive",
|
||||
FileID: "file-duration-only",
|
||||
Title: "Duration Only",
|
||||
ThumbnailURL: "/p/thumb/duration-only",
|
||||
PublishedAt: now,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
},
|
||||
{
|
||||
ID: "complete",
|
||||
DriveID: "drive",
|
||||
FileID: "file-complete",
|
||||
Title: "Complete",
|
||||
DurationSeconds: 12,
|
||||
ThumbnailURL: "/p/thumb/complete",
|
||||
PublishedAt: now.Add(time.Second),
|
||||
CreatedAt: now.Add(time.Second),
|
||||
UpdatedAt: now.Add(time.Second),
|
||||
},
|
||||
{
|
||||
ID: "missing-thumb",
|
||||
DriveID: "drive",
|
||||
FileID: "file-missing-thumb",
|
||||
Title: "Missing Thumb",
|
||||
DurationSeconds: 18,
|
||||
PublishedAt: now.Add(2 * time.Second),
|
||||
CreatedAt: now.Add(2 * time.Second),
|
||||
UpdatedAt: now.Add(2 * time.Second),
|
||||
},
|
||||
{
|
||||
ID: "failed",
|
||||
DriveID: "drive",
|
||||
FileID: "file-failed",
|
||||
Title: "Failed",
|
||||
PublishedAt: now.Add(3 * time.Second),
|
||||
CreatedAt: now.Add(3 * time.Second),
|
||||
UpdatedAt: now.Add(3 * time.Second),
|
||||
},
|
||||
}
|
||||
for _, v := range videos {
|
||||
if err := cat.UpsertVideo(ctx, v); err != nil {
|
||||
t.Fatalf("seed %s: %v", v.ID, err)
|
||||
}
|
||||
}
|
||||
if err := cat.UpdateVideoMeta(ctx, "failed", VideoMetaPatch{ThumbnailStatus: "failed"}); err != nil {
|
||||
t.Fatalf("mark failed thumbnail: %v", err)
|
||||
}
|
||||
|
||||
items, err := cat.ListVideosNeedingThumbnail(ctx, "drive", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("list videos needing thumbnail: %v", err)
|
||||
}
|
||||
if len(items) != 2 {
|
||||
t.Fatalf("items = %#v, want duration-only and missing-thumb", items)
|
||||
}
|
||||
if items[0].ID != "duration-only" || items[1].ID != "missing-thumb" {
|
||||
t.Fatalf("item ids = %q, %q; want duration-only, missing-thumb", items[0].ID, items[1].ID)
|
||||
}
|
||||
|
||||
count, err := cat.CountVideosNeedingThumbnail(ctx, "drive")
|
||||
if err != nil {
|
||||
t.Fatalf("count videos needing thumbnail: %v", err)
|
||||
}
|
||||
if count != 2 {
|
||||
t.Fatalf("count = %d, want 2", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateTagAndClassifyAddsTagToMatchingExistingVideos(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, err := Open(t.TempDir() + "/catalog.db")
|
||||
|
||||
@@ -1485,29 +1485,53 @@ func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
|
||||
if w.skipIfRateLimited(v) {
|
||||
return
|
||||
}
|
||||
if current, err := w.Catalog.GetVideo(ctx, v.ID); err == nil {
|
||||
if current.ThumbnailURL != "" {
|
||||
queued := v
|
||||
current := v
|
||||
if loaded, err := w.Catalog.GetVideo(ctx, v.ID); err == nil {
|
||||
if loaded.PreviewLocal == "" {
|
||||
loaded.PreviewLocal = queued.PreviewLocal
|
||||
}
|
||||
current = loaded
|
||||
v = loaded
|
||||
if loaded.ThumbnailURL != "" && loaded.DurationSeconds > 0 {
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{ThumbnailStatus: "ready"})
|
||||
return
|
||||
}
|
||||
}
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{ThumbnailStatus: "pending"})
|
||||
link, err := w.Drive.StreamURL(ctx, v.FileID)
|
||||
if err != nil {
|
||||
if localLink, ok := localPreviewLink(v); ok {
|
||||
link = localLink
|
||||
} else {
|
||||
if w.pauseForRecoverableError(err, "streamURL", v.Title) {
|
||||
if current.ThumbnailURL != "" {
|
||||
if current.DurationSeconds <= 0 {
|
||||
link, err := w.streamLink(ctx, current)
|
||||
if err != nil {
|
||||
if w.pauseForRecoverableError(err, "streamURL", current.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] probe streamURL %s: %v", current.Title, err)
|
||||
} else if w.probeDuration(ctx, current, link) {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] streamURL %s: %v", v.Title, err)
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{ThumbnailStatus: "failed"})
|
||||
}
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, current.ID, catalog.VideoMetaPatch{ThumbnailStatus: "ready"})
|
||||
return
|
||||
}
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{ThumbnailStatus: "pending"})
|
||||
link, err := w.streamLink(ctx, v)
|
||||
if err != nil {
|
||||
if w.pauseForRecoverableError(err, "streamURL", v.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] streamURL %s: %v", v.Title, err)
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{ThumbnailStatus: "failed"})
|
||||
return
|
||||
}
|
||||
if w.probeDuration(ctx, v, link) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.generateThumbnailFromLink(ctx, v, link); err != nil {
|
||||
if localLink, ok := localPreviewLink(v); ok && link.URL != localLink.URL {
|
||||
if w.probeDuration(ctx, v, localLink) {
|
||||
return
|
||||
}
|
||||
if localErr := w.generateThumbnailFromLink(ctx, v, localLink); localErr == nil {
|
||||
return
|
||||
}
|
||||
@@ -1521,6 +1545,38 @@ func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) streamLink(ctx context.Context, v *catalog.Video) (*drives.StreamLink, error) {
|
||||
link, err := w.Drive.StreamURL(ctx, v.FileID)
|
||||
if err == nil {
|
||||
return link, nil
|
||||
}
|
||||
if localLink, ok := localPreviewLink(v); ok {
|
||||
return localLink, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) probeDuration(ctx context.Context, v *catalog.Video, link *drives.StreamLink) bool {
|
||||
if v.DurationSeconds > 0 {
|
||||
return false
|
||||
}
|
||||
dur, err := w.Gen.Probe(ctx, link)
|
||||
if err == nil {
|
||||
if dur > 0 {
|
||||
v.DurationSeconds = int(dur)
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{
|
||||
DurationSeconds: int(dur),
|
||||
})
|
||||
}
|
||||
return false
|
||||
}
|
||||
if w.pauseForRecoverableError(err, "probe", v.Title) {
|
||||
return true
|
||||
}
|
||||
log.Printf("[thumb] probe %s: %v", v.Title, err)
|
||||
return false
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) generateThumbnailFromLink(ctx context.Context, v *catalog.Video, link *drives.StreamLink) error {
|
||||
if _, err := w.Gen.GenerateThumbnail(ctx, link, v.ID, 0); err != nil {
|
||||
return err
|
||||
|
||||
@@ -13,11 +13,11 @@ import (
|
||||
"github.com/video-site/backend/internal/drives"
|
||||
)
|
||||
|
||||
func TestThumbWorkerUpdatesThumbnailWithoutChangingPreviewStatus(t *testing.T) {
|
||||
func TestThumbWorkerUpdatesThumbnailAndDurationWithoutChangingPreviewStatus(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, video := seedPreviewTestVideo(t, "thumb-worker-video")
|
||||
|
||||
gen := &fakeThumbGenerator{}
|
||||
gen := &fakeThumbGenerator{probeDuration: 42}
|
||||
drv := &previewFakeDrive{}
|
||||
worker := NewThumbWorker(gen, cat, drv)
|
||||
|
||||
@@ -33,8 +33,8 @@ func TestThumbWorkerUpdatesThumbnailWithoutChangingPreviewStatus(t *testing.T) {
|
||||
if got.PreviewStatus != "pending" {
|
||||
t.Fatalf("preview status = %q, want pending", got.PreviewStatus)
|
||||
}
|
||||
if got.DurationSeconds != 0 {
|
||||
t.Fatalf("duration = %d, want unchanged", got.DurationSeconds)
|
||||
if got.DurationSeconds != 42 {
|
||||
t.Fatalf("duration = %d, want probed duration", got.DurationSeconds)
|
||||
}
|
||||
if gen.thumbnailVideoID != video.ID {
|
||||
t.Fatalf("thumbnail video id = %q, want %q", gen.thumbnailVideoID, video.ID)
|
||||
@@ -42,14 +42,53 @@ func TestThumbWorkerUpdatesThumbnailWithoutChangingPreviewStatus(t *testing.T) {
|
||||
if gen.thumbnailDuration != 0 {
|
||||
t.Fatalf("thumbnail duration = %.1f, want fixed-offset thumbnail generation", gen.thumbnailDuration)
|
||||
}
|
||||
if gen.probeCalls != 0 {
|
||||
t.Fatalf("probe calls = %d, want 0 for thumbnail generation", gen.probeCalls)
|
||||
if gen.probeCalls != 1 {
|
||||
t.Fatalf("probe calls = %d, want 1 for thumbnail generation", gen.probeCalls)
|
||||
}
|
||||
if drv.streamFileID != video.FileID {
|
||||
t.Fatalf("stream file id = %q, want %q", drv.streamFileID, video.FileID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestThumbWorkerBackfillsDurationWhenThumbnailAlreadyExists(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, video := seedPreviewTestVideo(t, "thumb-worker-existing-thumbnail")
|
||||
video.ThumbnailURL = "/p/thumb/" + video.ID
|
||||
if err := cat.UpsertVideo(ctx, video); err != nil {
|
||||
t.Fatalf("update video: %v", err)
|
||||
}
|
||||
|
||||
gen := &fakeThumbGenerator{probeDuration: 19}
|
||||
drv := &previewFakeDrive{}
|
||||
worker := NewThumbWorker(gen, cat, drv)
|
||||
|
||||
worker.process(ctx, video)
|
||||
|
||||
got, err := cat.GetVideo(ctx, video.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get video: %v", err)
|
||||
}
|
||||
if got.DurationSeconds != 19 {
|
||||
t.Fatalf("duration = %d, want probed duration", got.DurationSeconds)
|
||||
}
|
||||
if got.ThumbnailURL != "/p/thumb/"+video.ID {
|
||||
t.Fatalf("thumbnail = %q, want unchanged existing thumbnail", got.ThumbnailURL)
|
||||
}
|
||||
ready, err := cat.ListVideosByThumbnailStatus(ctx, video.DriveID, "ready", 0)
|
||||
if err != nil {
|
||||
t.Fatalf("list ready thumbnails: %v", err)
|
||||
}
|
||||
if len(ready) != 1 || ready[0].ID != video.ID {
|
||||
t.Fatalf("ready thumbnails = %#v, want only %s", ready, video.ID)
|
||||
}
|
||||
if gen.probeCalls != 1 {
|
||||
t.Fatalf("probe calls = %d, want 1", gen.probeCalls)
|
||||
}
|
||||
if gen.thumbnailVideoID != "" {
|
||||
t.Fatalf("thumbnail generation video id = %q, want no regeneration", gen.thumbnailVideoID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestThumbWorkerFallsBackToLocalPreviewWhenDriveStreamFails(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, video := seedPreviewTestVideo(t, "thumb-worker-local-preview")
|
||||
@@ -469,12 +508,17 @@ type fakeThumbGenerator struct {
|
||||
thumbnailDuration float64
|
||||
thumbnailURL string
|
||||
probeCalls int
|
||||
probeDuration float64
|
||||
probeErr error
|
||||
generateErr error
|
||||
}
|
||||
|
||||
func (g *fakeThumbGenerator) Probe(context.Context, *drives.StreamLink) (float64, error) {
|
||||
g.probeCalls++
|
||||
return 42, nil
|
||||
if g.probeErr != nil {
|
||||
return 0, g.probeErr
|
||||
}
|
||||
return g.probeDuration, nil
|
||||
}
|
||||
|
||||
func (g *fakeThumbGenerator) GenerateThumbnail(_ context.Context, link *drives.StreamLink, videoID string, duration float64) (string, error) {
|
||||
@@ -568,7 +612,6 @@ func (d *previewFakeDrive) EnsureDir(context.Context, string) (string, error) {
|
||||
}
|
||||
func (d *previewFakeDrive) RootID() string { return "root" }
|
||||
|
||||
|
||||
func TestWorkerWaitIdleReturnsImmediatelyWhenQueueEmpty(t *testing.T) {
|
||||
worker := NewWorker(&fakeTeaserGenerator{}, nil, &previewFakeDrive{})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
|
||||
Reference in New Issue
Block a user