perf: speed up catalog startup migrations

This commit is contained in:
nianzhibai
2026-06-01 18:03:21 +08:00
parent c78f22aedb
commit e01b7cc3b7
2 changed files with 165 additions and 15 deletions
+74 -15
View File
@@ -302,7 +302,15 @@ func (c *Catalog) classifySystemTags(ctx context.Context) error {
}
func (c *Catalog) backfillVideoTags(ctx context.Context) error {
rows, err := c.db.QueryContext(ctx, `SELECT id, COALESCE(tags, '[]') FROM videos`)
rows, err := c.db.QueryContext(ctx, `
SELECT id, COALESCE(tags, '[]')
FROM videos
WHERE COALESCE(tags, '') NOT IN ('', '[]', 'null')
AND NOT EXISTS (
SELECT 1
FROM video_tags vt
WHERE vt.video_id = videos.id
)`)
if err != nil {
return err
}
@@ -319,11 +327,14 @@ func (c *Catalog) backfillVideoTags(ctx context.Context) error {
if len(labels) == 0 {
continue
}
if err := c.addVideoTags(ctx, videoID, labels, "legacy", true); err != nil {
added, err := c.addVideoTags(ctx, videoID, labels, "legacy", true)
if err != nil {
return err
}
if err := c.syncVideoTagsJSON(ctx, videoID, false); err != nil {
return err
if added {
if err := c.syncVideoTagsJSON(ctx, videoID, false); err != nil {
return err
}
}
}
return nil
@@ -592,6 +603,10 @@ func (c *Catalog) getTagByLabel(ctx context.Context, label string) (Tag, error)
}
func (c *Catalog) classifyTag(ctx context.Context, tag Tag) (int, error) {
existingIDs, err := c.videoIDSetForTagID(ctx, tag.ID)
if err != nil {
return 0, err
}
rows, err := c.db.QueryContext(ctx, `
SELECT id, title, COALESCE(author, ''), COALESCE(category, ''), COALESCE(tags_manual, 0)
FROM videos`)
@@ -623,13 +638,14 @@ FROM videos`)
continue
}
}
added, err := c.addVideoTag(ctx, videoID, tag.Label, "auto", false)
if err != nil {
if existingIDs[videoID] {
continue
}
if err := c.insertVideoTag(ctx, videoID, tag.ID, "auto"); err != nil {
return 0, err
}
if added {
classified++
}
existingIDs[videoID] = true
classified++
if err := c.syncVideoTagsJSON(ctx, videoID, false); err != nil {
return 0, err
}
@@ -689,17 +705,22 @@ func (c *Catalog) replaceVideoTags(ctx context.Context, videoID string, labels [
return c.syncVideoTagsJSON(ctx, videoID, manual)
}
func (c *Catalog) addVideoTags(ctx context.Context, videoID string, labels []string, source string, createMissing bool) error {
func (c *Catalog) addVideoTags(ctx context.Context, videoID string, labels []string, source string, createMissing bool) (bool, error) {
labels = uniqueStrings(cleanLabels(labels))
if source != "manual" {
labels = c.filterDeletedTagLabels(ctx, labels)
}
changed := false
for _, label := range labels {
if _, err := c.addVideoTag(ctx, videoID, label, source, createMissing); err != nil {
return err
added, err := c.addVideoTag(ctx, videoID, label, source, createMissing)
if err != nil {
return false, err
}
if added {
changed = true
}
}
return nil
return changed, nil
}
func (c *Catalog) addVideoTag(ctx context.Context, videoID, label, source string, createMissing bool) (bool, error) {
@@ -729,12 +750,33 @@ func (c *Catalog) addVideoTag(ctx context.Context, videoID, label, source string
return n > 0, nil
}
func (c *Catalog) insertVideoTag(ctx context.Context, videoID string, tagID int64, source string) error {
_, err := c.db.ExecContext(ctx,
`INSERT OR IGNORE INTO video_tags (video_id, tag_id, source, created_at) VALUES (?, ?, ?, ?)`,
videoID, tagID, source, time.Now().UnixMilli())
return err
}
func (c *Catalog) addCollectionTagToVideos(ctx context.Context, category string) error {
return c.addTagToVideosByCategory(ctx, category, category, "auto")
}
func (c *Catalog) addTagToVideosByCategory(ctx context.Context, category, label, source string) error {
rows, err := c.db.QueryContext(ctx, `SELECT id FROM videos WHERE category = ? AND COALESCE(tags_manual, 0) = 0`, category)
tag, err := c.getTagByLabel(ctx, label)
if err != nil {
return err
}
rows, err := c.db.QueryContext(ctx, `
SELECT v.id
FROM videos v
WHERE v.category = ?
AND COALESCE(v.tags_manual, 0) = 0
AND NOT EXISTS (
SELECT 1
FROM video_tags vt
WHERE vt.video_id = v.id
AND vt.tag_id = ?
)`, category, tag.ID)
if err != nil {
return err
}
@@ -753,7 +795,7 @@ func (c *Catalog) addTagToVideosByCategory(ctx context.Context, category, label,
return err
}
for _, videoID := range videoIDs {
if _, err := c.addVideoTag(ctx, videoID, label, source, false); err != nil {
if err := c.insertVideoTag(ctx, videoID, tag.ID, source); err != nil {
return err
}
if err := c.syncVideoTagsJSON(ctx, videoID, false); err != nil {
@@ -837,6 +879,23 @@ func (c *Catalog) videoIDsForTagID(ctx context.Context, tagID int64) ([]string,
return videoIDs, rows.Err()
}
func (c *Catalog) videoIDSetForTagID(ctx context.Context, tagID int64) (map[string]bool, error) {
rows, err := c.db.QueryContext(ctx, `SELECT video_id FROM video_tags WHERE tag_id = ?`, tagID)
if err != nil {
return nil, err
}
defer rows.Close()
out := map[string]bool{}
for rows.Next() {
var videoID string
if err := rows.Scan(&videoID); err != nil {
return nil, err
}
out[videoID] = true
}
return out, rows.Err()
}
func (c *Catalog) validateTagsExist(ctx context.Context, labels []string) error {
for _, label := range labels {
if _, err := c.getTagByLabel(ctx, label); err != nil {
+91
View File
@@ -398,6 +398,84 @@ VALUES
}
}
func TestMigrateDoesNotRewriteAlreadySyncedVideoTags(t *testing.T) {
ctx := context.Background()
cat, err := Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now()
for _, id := range []string{"video-1", "video-2", "video-3"} {
if err := cat.UpsertVideo(ctx, &Video{
ID: id,
DriveID: "drive",
FileID: id,
Title: "巨乳后入合集",
Category: "Better Call Saul S03",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
}); err != nil {
t.Fatalf("seed %s: %v", id, err)
}
}
if err := cat.migrate(ctx); err != nil {
t.Fatalf("first migrate: %v", err)
}
before := videoUpdatedAtByID(t, ctx, cat, "video-1", "video-2", "video-3")
time.Sleep(5 * time.Millisecond)
if err := cat.migrate(ctx); err != nil {
t.Fatalf("second migrate: %v", err)
}
after := videoUpdatedAtByID(t, ctx, cat, "video-1", "video-2", "video-3")
for id, want := range before {
if got := after[id]; got != want {
t.Fatalf("%s updated_at changed on no-op migrate: got %d, want %d", id, got, want)
}
}
}
func TestMigrateBackfillsLegacyTagsWithoutRelations(t *testing.T) {
ctx := context.Background()
cat, err := Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now().UnixMilli()
if _, err := cat.db.ExecContext(ctx, `
INSERT INTO videos (id, drive_id, file_id, title, tags, tags_manual, published_at, created_at, updated_at)
VALUES ('legacy-video', 'drive', 'file-legacy', 'legacy title', '["legacy-tag"]', 0, ?, ?, ?)`,
now, now, now); err != nil {
t.Fatalf("seed legacy video: %v", err)
}
if err := cat.migrate(ctx); err != nil {
t.Fatalf("migrate: %v", err)
}
tag := mustTagByLabel(t, ctx, cat, "legacy-tag")
var count int
if err := cat.db.QueryRowContext(ctx,
`SELECT COUNT(*) FROM video_tags WHERE video_id = 'legacy-video' AND tag_id = ?`, tag.ID).Scan(&count); err != nil {
t.Fatalf("count video tag: %v", err)
}
if count != 1 {
t.Fatalf("legacy video tag relation count = %d, want 1", count)
}
}
func TestOpenMigratesLegacyVideosWithoutFileName(t *testing.T) {
path := t.TempDir() + "/catalog.db"
db, err := sql.Open("sqlite", path)
@@ -944,6 +1022,19 @@ func mustTagByLabel(t *testing.T, ctx context.Context, cat *Catalog, label strin
return Tag{}
}
func videoUpdatedAtByID(t *testing.T, ctx context.Context, cat *Catalog, ids ...string) map[string]int64 {
t.Helper()
out := make(map[string]int64, len(ids))
for _, id := range ids {
var updatedAt int64
if err := cat.db.QueryRowContext(ctx, `SELECT updated_at FROM videos WHERE id = ?`, id).Scan(&updatedAt); err != nil {
t.Fatalf("read updated_at for %s: %v", id, err)
}
out[id] = updatedAt
}
return out
}
// 删除 collection 标签的最后一个引用视频后,标签应当自动从 tags 表里消失。
// user/system 标签不受影响:用户/系统标签的语义由人维护,孤儿状态保留。
func TestDeleteVideoPrunesOrphanCollectionTag(t *testing.T) {