Improve fingerprint dedupe maintenance

This commit is contained in:
nianzhibai
2026-05-29 23:58:36 +08:00
parent da0683344e
commit 039ec2a988
10 changed files with 771 additions and 11 deletions
+4 -2
View File
@@ -134,10 +134,12 @@ OneDrive 按 OpenList 默认应用方式调用 `https://api.oplist.org/onedrive/
1. 同一网盘同一文件按 `(drive_id, file_id)` 形成稳定视频 ID,重复扫描只更新同一行。
2. 扫描时优先按网盘侧 `content_hash` 去重;没有 hash 时退化为 `file_name + size_bytes`。
3. 扫描、爬虫本地上传完成后,后台指纹 worker 会异步读取视频的少量 Range 片段,生成 `sampled_sha256`。前台列表、首页、搜索、推荐会按 `size_bytes + sampled_sha256` 只展示最早入库的 canonical 视频。
3. 扫描、爬虫本地上传或服务启动挂载网盘后,后台指纹 worker 会异步读取视频的少量 Range 片段,生成 `sampled_sha256`。前台列表、首页、搜索、推荐会按 `size_bytes + sampled_sha256` 只展示最早入库的 canonical 视频。
`sampled_sha256` 是文件级去重:适合识别同一个视频文件被复制到 115 / PikPak / OneDrive 等不同网盘的情况。它不会删除任何网盘文件,也不用于识别转码、裁剪、加水印后的同源视频。
封面和 teaser 仍然优先生成,不等待指纹完成。夜间流水线最后会做一次重复资产清理:对 `size_bytes + sampled_sha256` 命中的非 canonical 视频,只删除本机生成的重复封面和 teaser,并把对应字段重置为 `pending`。网盘原文件和视频元数据记录不会被删除;如果 canonical 视频以后被移除,这些重复项会重新进入生成队列。
## 管理能力
- `/admin/drives`:新增、编辑、删除网盘,触发扫描。
@@ -157,7 +159,7 @@ ffmpeg -ss <起点> -headers "UA/Cookie/Referer" -i <直链> \
当前策略是每段固定 3 秒;30 秒以下最多 3 段,30 秒及以上固定 4 段;长视频在 20% 到 80% 区间均匀取段。生成的 teaser 和封面都只保存在本地 `data/previews/`,不会回写到网盘;旧数据中的 `preview_file_id` 会被忽略。
服务启动或网盘重新挂载时,如果 Teaser 开关已开启,后端会把历史 `pending` 任务重新入队,避免重启后长期停在“待生成”。OneDrive 直链生成 teaser 时可能触发 Microsoft 429 限流;后端会识别这类错误并让当前网盘进入冷却期,保留任务为 `pending`,避免连续请求触发更严重限流。
服务启动或网盘重新挂载时,如果 Teaser 开关已开启,后端会把历史 `pending` 任务重新入队,避免重启后长期停在“待生成”。OneDrive 扫盘和直链生成 teaser / 封面时可能触发 Microsoft Graph 429、`TooManyRequests`、`activityLimitReached` 或 throttled 文本;后端会识别这类错误并让当前网盘进入冷却期,保留任务为 `pending`,避免连续请求触发更严重限流。扫盘阶段会按 `Retry-After` 或默认冷却时间等待后继续当前目录。
前端卡片的 `previewSrc` 统一指向 `/p/preview/<videoID>`,后端只从本地 `preview_local` 文件读取。
+124
View File
@@ -242,6 +242,7 @@ func main() {
RunSpider91Crawl: app.runSpider91Crawl,
WaitPreviewQueuesIdle: app.waitAllPreviewQueuesIdle,
RunMigration: app.spider91Migrator.RunOnce,
RunDedupeAssetCleanup: app.cleanupDuplicateVideoAssets,
})
go app.nightlyRunner.Run(ctx)
@@ -765,6 +766,9 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
}
go a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
if fingerprintWorker != nil {
go a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
}
}
func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Worker) {
@@ -1102,6 +1106,126 @@ func removeLocalVideoAssets(localDir string, v *catalog.Video) error {
return nil
}
type duplicateAssetCleanupStats struct {
Candidates int
VideosUpdated int
PreviewFiles int
ThumbnailFiles int
MissingFiles int
SkippedUnsafeRef int
}
func (a *App) cleanupDuplicateVideoAssets(ctx context.Context) error {
if a == nil || a.cat == nil {
return nil
}
localDir := ""
if a.cfg != nil {
localDir = a.cfg.Storage.LocalPreviewDir
}
if strings.TrimSpace(localDir) == "" {
return nil
}
items, err := a.cat.ListDuplicateAssetCleanupCandidates(ctx, 0)
if err != nil {
return err
}
if len(items) == 0 {
log.Printf("[dedupe-cleanup] no duplicate local assets to clean")
return nil
}
stats := duplicateAssetCleanupStats{Candidates: len(items)}
for _, item := range items {
if err := ctx.Err(); err != nil {
return err
}
clearPreview, removedPreview, missingPreview, skippedPreview, err := cleanupDuplicatePreviewAsset(localDir, item.PreviewLocal)
if err != nil {
return fmt.Errorf("cleanup duplicate preview video=%s canonical=%s: %w", item.VideoID, item.CanonicalID, err)
}
clearThumb, removedThumb, missingThumb, err := cleanupDuplicateThumbnailAsset(localDir, item.VideoID, item.ThumbnailURL)
if err != nil {
return fmt.Errorf("cleanup duplicate thumbnail video=%s canonical=%s: %w", item.VideoID, item.CanonicalID, err)
}
if skippedPreview {
stats.SkippedUnsafeRef++
}
if removedPreview {
stats.PreviewFiles++
}
if removedThumb {
stats.ThumbnailFiles++
}
if missingPreview {
stats.MissingFiles++
}
if missingThumb {
stats.MissingFiles++
}
if !clearPreview && !clearThumb {
continue
}
if err := a.cat.ClearGeneratedAssets(ctx, item.VideoID, clearPreview, clearThumb); err != nil {
return fmt.Errorf("mark duplicate assets cleaned video=%s canonical=%s: %w", item.VideoID, item.CanonicalID, err)
}
stats.VideosUpdated++
}
log.Printf("[dedupe-cleanup] candidates=%d updated=%d preview_files=%d thumbnail_files=%d missing=%d skipped_unsafe_refs=%d",
stats.Candidates, stats.VideosUpdated, stats.PreviewFiles, stats.ThumbnailFiles, stats.MissingFiles, stats.SkippedUnsafeRef)
return nil
}
func cleanupDuplicatePreviewAsset(localDir, previewLocal string) (clear bool, removed bool, missing bool, skippedUnsafe bool, err error) {
clean, ok := localPathWithin(localDir, previewLocal)
if !ok {
if strings.TrimSpace(previewLocal) != "" {
return false, false, false, true, nil
}
return false, false, false, false, nil
}
removed, missing, err = removeRegularFileIfExists(clean)
if err != nil {
return false, false, false, false, err
}
return true, removed, missing, false, nil
}
func cleanupDuplicateThumbnailAsset(localDir, videoID, thumbnailURL string) (clear bool, removed bool, missing bool, err error) {
if thumbnailURL != "/p/thumb/"+videoID {
return false, false, false, nil
}
clean, ok := localPathWithin(localDir, filepath.Join(localDir, "thumbs", videoID+".jpg"))
if !ok {
return false, false, false, nil
}
removed, missing, err = removeRegularFileIfExists(clean)
if err != nil {
return false, false, false, err
}
return true, removed, missing, nil
}
func removeRegularFileIfExists(path string) (removed bool, missing bool, err error) {
info, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, true, nil
}
return false, false, err
}
if !info.Mode().IsRegular() {
return false, false, nil
}
if err := os.Remove(path); err != nil {
if os.IsNotExist(err) {
return false, true, nil
}
return false, false, err
}
return true, false, nil
}
func localPathWithin(root, path string) (string, bool) {
if strings.TrimSpace(root) == "" || strings.TrimSpace(path) == "" {
return "", false
+176
View File
@@ -13,6 +13,7 @@ import (
"github.com/video-site/backend/internal/catalog"
"github.com/video-site/backend/internal/config"
"github.com/video-site/backend/internal/drives"
"github.com/video-site/backend/internal/fingerprint"
"github.com/video-site/backend/internal/preview"
)
@@ -153,6 +154,72 @@ func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
t.Fatalf("generation did not finish, events=%#v", gen.Events())
}
func TestRegisterPreviewWorkersBackfillsHistoricalFingerprints(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cat, err := catalog.Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
dataPath := filepath.Join(t.TempDir(), "video.mp4")
data := []byte("historical video content for fingerprint")
if err := os.WriteFile(dataPath, data, 0o644); err != nil {
t.Fatalf("write video data: %v", err)
}
now := time.Now()
video := &catalog.Video{
ID: "historical-video",
DriveID: "drive-id",
FileID: "file-id",
Title: "Historical",
Size: int64(len(data)),
FingerprintStatus: "pending",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
}
if err := cat.UpsertVideo(ctx, video); err != nil {
t.Fatalf("seed video: %v", err)
}
app := &App{
cat: cat,
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
fingerprintWorkers: make(map[string]*fingerprint.Worker),
}
drv := &serverFingerprintFakeDrive{path: dataPath}
fingerprintWorker := fingerprint.NewWorker(cat, drv, fingerprint.Config{})
go fingerprintWorker.Run(ctx)
app.registerPreviewWorkers(ctx, "drive-id", nil, nil, fingerprintWorker, func() {})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
got, err := cat.GetVideo(ctx, video.ID)
if err != nil {
t.Fatalf("get video: %v", err)
}
if got.SampledSHA256 != "" && got.FingerprintStatus == "ready" {
return
}
time.Sleep(10 * time.Millisecond)
}
got, err := cat.GetVideo(ctx, video.ID)
if err != nil {
t.Fatalf("get video after timeout: %v", err)
}
t.Fatalf("fingerprint status=%q sampled=%q, want ready with hash", got.FingerprintStatus, got.SampledSHA256)
}
func TestFailedThumbnailsDoNotBlockPreviewGeneration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -480,6 +547,106 @@ func TestCleanupMissingPikPakVideosRemovesDatabaseRowsAndLocalAssets(t *testing.
}
}
func TestCleanupDuplicateVideoAssetsRemovesOnlyDuplicateLocalAssets(t *testing.T) {
ctx := context.Background()
localDir := t.TempDir()
cat, err := catalog.Open(filepath.Join(t.TempDir(), "catalog.db"))
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
canonicalPreview := filepath.Join(localDir, "canonical.mp4")
duplicatePreview := filepath.Join(localDir, "duplicate.mp4")
canonicalThumb := filepath.Join(localDir, "thumbs", "canonical-video.jpg")
duplicateThumb := filepath.Join(localDir, "thumbs", "duplicate-video.jpg")
for _, path := range []string{canonicalPreview, duplicatePreview, canonicalThumb, duplicateThumb} {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
t.Fatalf("mkdir %s: %v", path, err)
}
if err := os.WriteFile(path, []byte("asset"), 0o644); err != nil {
t.Fatalf("write %s: %v", path, err)
}
}
now := time.Date(2026, 5, 29, 12, 0, 0, 0, time.UTC)
for _, v := range []*catalog.Video{
{
ID: "canonical-video",
DriveID: "115",
FileID: "file-a",
Title: "Canonical",
Size: 2048,
ThumbnailURL: "/p/thumb/canonical-video",
PreviewLocal: canonicalPreview,
PreviewStatus: "ready",
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
},
{
ID: "duplicate-video",
DriveID: "onedrive",
FileID: "file-b",
Title: "Duplicate",
Size: 2048,
ThumbnailURL: "/p/thumb/duplicate-video",
PreviewLocal: duplicatePreview,
PreviewStatus: "ready",
PublishedAt: now.Add(time.Second),
CreatedAt: now.Add(time.Second),
UpdatedAt: now.Add(time.Second),
},
} {
if err := cat.UpsertVideo(ctx, v); err != nil {
t.Fatalf("seed %s: %v", v.ID, err)
}
if err := cat.UpdateVideoFingerprint(ctx, v.ID, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "ready", ""); err != nil {
t.Fatalf("fingerprint %s: %v", v.ID, err)
}
}
app := &App{
cfg: &config.Config{Storage: config.Storage{LocalPreviewDir: localDir}},
cat: cat,
}
if err := app.cleanupDuplicateVideoAssets(ctx); err != nil {
t.Fatalf("cleanup duplicate video assets: %v", err)
}
for _, path := range []string{canonicalPreview, canonicalThumb} {
if _, err := os.Stat(path); err != nil {
t.Fatalf("canonical asset %s missing: %v", path, err)
}
}
for _, path := range []string{duplicatePreview, duplicateThumb} {
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Fatalf("duplicate asset %s still exists, stat err=%v", path, err)
}
}
dup, err := cat.GetVideo(ctx, "duplicate-video")
if err != nil {
t.Fatalf("get duplicate: %v", err)
}
if dup.PreviewLocal != "" || dup.PreviewStatus != "pending" {
t.Fatalf("duplicate preview local=%q status=%q, want empty pending", dup.PreviewLocal, dup.PreviewStatus)
}
if dup.ThumbnailURL != "" {
t.Fatalf("duplicate thumbnail url = %q, want empty", dup.ThumbnailURL)
}
canon, err := cat.GetVideo(ctx, "canonical-video")
if err != nil {
t.Fatalf("get canonical: %v", err)
}
if canon.PreviewLocal != canonicalPreview || canon.ThumbnailURL != "/p/thumb/canonical-video" {
t.Fatalf("canonical changed: preview=%q thumb=%q", canon.PreviewLocal, canon.ThumbnailURL)
}
}
type serverFakeTeaserGenerator struct {
mu sync.Mutex
events []string
@@ -544,6 +711,15 @@ func (d *serverFakeDrive) EnsureDir(context.Context, string) (string, error) {
}
func (d *serverFakeDrive) RootID() string { return "root" }
type serverFingerprintFakeDrive struct {
serverFakeDrive
path string
}
func (d *serverFingerprintFakeDrive) StreamURL(context.Context, string) (*drives.StreamLink, error) {
return &drives.StreamLink{URL: d.path}, nil
}
type serverLocalUploadFakeDrive struct {
serverFakeDrive
}
+123 -2
View File
@@ -503,6 +503,9 @@ func (c *Catalog) ListVideosByThumbnailStatus(ctx context.Context, driveID, stat
// ListVideosNeedingThumbnail returns videos that still need a thumbnail attempt.
// Failed thumbnails are reported separately and should not block teaser generation.
// Videos whose local assets were cleared because they are fingerprint duplicates
// stay pending in the DB, but uniqueVideoWhereSQL keeps them out of this queue
// while their canonical sibling still exists.
func (c *Catalog) ListVideosNeedingThumbnail(ctx context.Context, driveID string, limit int) ([]*Video, error) {
if limit <= 0 {
limit = 10000
@@ -511,7 +514,7 @@ func (c *Catalog) ListVideosNeedingThumbnail(ctx context.Context, driveID string
`SELECT `+allVideoCols+` FROM videos
WHERE drive_id = ?
AND COALESCE(thumbnail_url, '') = ''
AND COALESCE(thumbnail_status, 'pending') != 'failed'
AND COALESCE(thumbnail_status, 'pending') NOT IN ('failed', 'skipped')
AND COALESCE(hidden, 0) = 0
AND `+uniqueVideoWhereSQL+`
ORDER BY created_at ASC
@@ -538,7 +541,7 @@ func (c *Catalog) CountVideosNeedingThumbnail(ctx context.Context, driveID strin
`SELECT COUNT(*) FROM videos
WHERE drive_id = ?
AND COALESCE(thumbnail_url, '') = ''
AND COALESCE(thumbnail_status, 'pending') != 'failed'
AND COALESCE(thumbnail_status, 'pending') NOT IN ('failed', 'skipped')
AND COALESCE(hidden, 0) = 0
AND `+uniqueVideoWhereSQL,
driveID).Scan(&count)
@@ -1012,6 +1015,124 @@ func (c *Catalog) ListLocalMediaRefs(ctx context.Context) ([]LocalMediaRef, erro
return out, nil
}
// DuplicateAssetCleanupCandidate points at a non-canonical video in a
// size+sampled_sha256 duplicate group that still owns generated local assets.
// The cleanup job uses this to remove duplicate thumbnails/teasers without
// touching the original cloud file or deleting the catalog row.
type DuplicateAssetCleanupCandidate struct {
VideoID string
DriveID string
Title string
PreviewLocal string
ThumbnailURL string
CanonicalID string
SampledSHA256 string
Size int64
}
// ListDuplicateAssetCleanupCandidates returns duplicate videos whose own local
// generated assets can be cleared. A group canonical is the same representative
// used by uniqueVideoWhereSQL: earliest created_at, then lexicographically
// smallest id.
func (c *Catalog) ListDuplicateAssetCleanupCandidates(ctx context.Context, limit int) ([]DuplicateAssetCleanupCandidate, error) {
if limit <= 0 {
limit = 10000
}
rows, err := c.db.QueryContext(ctx, `
WITH canonical AS (
SELECT v.id, v.size_bytes, v.sampled_sha256
FROM videos v
WHERE v.size_bytes > 0
AND COALESCE(v.sampled_sha256, '') != ''
AND NOT EXISTS (
SELECT 1
FROM videos earlier
WHERE earlier.size_bytes = v.size_bytes
AND earlier.sampled_sha256 = v.sampled_sha256
AND COALESCE(earlier.sampled_sha256, '') != ''
AND earlier.size_bytes > 0
AND (
earlier.created_at < v.created_at
OR (earlier.created_at = v.created_at AND earlier.id < v.id)
)
)
)
SELECT dup.id,
dup.drive_id,
dup.title,
COALESCE(dup.preview_local, ''),
COALESCE(dup.thumbnail_url, ''),
canonical.id,
dup.sampled_sha256,
dup.size_bytes
FROM videos dup
JOIN canonical
ON canonical.size_bytes = dup.size_bytes
AND canonical.sampled_sha256 = dup.sampled_sha256
WHERE dup.id != canonical.id
AND dup.size_bytes > 0
AND COALESCE(dup.sampled_sha256, '') != ''
AND (
COALESCE(dup.preview_local, '') != ''
OR COALESCE(dup.thumbnail_url, '') = '/p/thumb/' || dup.id
)
ORDER BY dup.created_at ASC, dup.id ASC
LIMIT ?`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []DuplicateAssetCleanupCandidate
for rows.Next() {
var item DuplicateAssetCleanupCandidate
if err := rows.Scan(
&item.VideoID,
&item.DriveID,
&item.Title,
&item.PreviewLocal,
&item.ThumbnailURL,
&item.CanonicalID,
&item.SampledSHA256,
&item.Size,
); err != nil {
return nil, err
}
out = append(out, item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
// ClearGeneratedAssets clears DB references to generated local assets for a
// video. The statuses go back to pending so the video can regenerate assets if
// it later becomes the canonical item after its older duplicate is removed.
func (c *Catalog) ClearGeneratedAssets(ctx context.Context, videoID string, clearPreview, clearThumbnail bool) error {
parts := []string{}
args := []any{}
if clearPreview {
parts = append(parts, "preview_file_id = ''", "preview_local = ''", "preview_status = 'pending'")
}
if clearThumbnail {
parts = append(parts, "thumbnail_url = ''", "thumbnail_status = 'pending'")
}
if len(parts) == 0 {
return nil
}
parts = append(parts, "updated_at = ?")
args = append(args, time.Now().UnixMilli(), videoID)
res, err := c.db.ExecContext(ctx, `UPDATE videos SET `+strings.Join(parts, ", ")+` WHERE id = ?`, args...)
if err != nil {
return err
}
if rows, err := res.RowsAffected(); err == nil && rows == 0 {
return sql.ErrNoRows
}
return nil
}
// ---------- Drive ----------
type Drive struct {
@@ -75,3 +75,105 @@ func TestListVideosDeduplicatesBySampledSHA256(t *testing.T) {
t.Fatalf("canonical id = %q, want earliest created video", items[0].ID)
}
}
func TestDuplicateAssetCleanupCandidates(t *testing.T) {
ctx := context.Background()
cat, err := Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
base := time.Date(2026, 5, 29, 12, 0, 0, 0, time.UTC)
videos := []*Video{
{
ID: "drive-a-canonical",
DriveID: "drive-a",
FileID: "file-a",
FileName: "canonical.mp4",
Title: "Canonical",
Size: 1234,
ThumbnailURL: "/p/thumb/drive-a-canonical",
PreviewLocal: "/tmp/previews/canonical.mp4",
PreviewStatus: "ready",
PublishedAt: base,
CreatedAt: base,
UpdatedAt: base,
},
{
ID: "drive-b-duplicate",
DriveID: "drive-b",
FileID: "file-b",
FileName: "duplicate.mp4",
Title: "Duplicate",
Size: 1234,
ThumbnailURL: "/p/thumb/drive-b-duplicate",
PreviewLocal: "/tmp/previews/duplicate.mp4",
PreviewStatus: "ready",
PublishedAt: base.Add(time.Second),
CreatedAt: base.Add(time.Second),
UpdatedAt: base.Add(time.Second),
},
{
ID: "drive-c-remote-thumb",
DriveID: "drive-c",
FileID: "file-c",
FileName: "remote-thumb.mp4",
Title: "Remote Thumbnail",
Size: 1234,
ThumbnailURL: "https://thumb.example/file-c.jpg",
PreviewStatus: "ready",
PublishedAt: base.Add(2 * time.Second),
CreatedAt: base.Add(2 * time.Second),
UpdatedAt: base.Add(2 * time.Second),
},
}
for _, v := range videos {
if err := cat.UpsertVideo(ctx, v); err != nil {
t.Fatalf("seed %s: %v", v.ID, err)
}
}
const sampled = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
for _, v := range videos {
if err := cat.UpdateVideoFingerprint(ctx, v.ID, sampled, "ready", ""); err != nil {
t.Fatalf("fingerprint %s: %v", v.ID, err)
}
}
items, err := cat.ListDuplicateAssetCleanupCandidates(ctx, 0)
if err != nil {
t.Fatalf("list cleanup candidates: %v", err)
}
if len(items) != 1 {
t.Fatalf("candidates = %#v, want only local duplicate", items)
}
item := items[0]
if item.VideoID != "drive-b-duplicate" || item.CanonicalID != "drive-a-canonical" {
t.Fatalf("candidate = %#v, want duplicate with canonical", item)
}
if err := cat.ClearGeneratedAssets(ctx, item.VideoID, true, true); err != nil {
t.Fatalf("clear generated assets: %v", err)
}
got, err := cat.GetVideo(ctx, item.VideoID)
if err != nil {
t.Fatalf("get duplicate: %v", err)
}
if got.PreviewLocal != "" || got.PreviewStatus != "pending" {
t.Fatalf("preview after cleanup local=%q status=%q, want empty pending", got.PreviewLocal, got.PreviewStatus)
}
if got.ThumbnailURL != "" {
t.Fatalf("thumbnail after cleanup = %q, want empty", got.ThumbnailURL)
}
var thumbStatus string
if err := cat.db.QueryRowContext(ctx, `SELECT thumbnail_status FROM videos WHERE id = ?`, item.VideoID).Scan(&thumbStatus); err != nil {
t.Fatalf("query thumbnail status: %v", err)
}
if thumbStatus != "pending" {
t.Fatalf("thumbnail_status = %q, want pending", thumbStatus)
}
}
+109 -6
View File
@@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/go-resty/resty/v2"
@@ -18,8 +20,10 @@ import (
)
const (
maxSmallUploadSize = 250 * 1024 * 1024
defaultRenewAPIURL = "https://api.oplist.org/onedrive/renewapi"
maxSmallUploadSize = 250 * 1024 * 1024
defaultRenewAPIURL = "https://api.oplist.org/onedrive/renewapi"
onedriveListCooldown = 5 * time.Minute
onedriveListInterval = 1 * time.Second
)
type Driver struct {
@@ -34,6 +38,11 @@ type Driver struct {
renewAPIURL string
client *resty.Client
onTokenUpdate func(access, refresh string)
listMu sync.Mutex
lastListAt time.Time
listInterval time.Duration
listCooldown time.Duration
}
type Config struct {
@@ -85,6 +94,8 @@ func New(c Config) *Driver {
client: resty.New().
SetTimeout(30*time.Second).
SetHeader("Accept", "application/json, text/plain, */*"),
listInterval: onedriveListInterval,
listCooldown: onedriveListCooldown,
}
}
@@ -106,10 +117,16 @@ func (d *Driver) List(ctx context.Context, dirID string) ([]drives.Entry, error)
if dirID == "" {
dirID = d.rootID
}
d.listMu.Lock()
defer d.listMu.Unlock()
nextLink := d.childrenURL(dirID)
first := true
out := make([]drives.Entry, 0)
for nextLink != "" {
if err := d.waitForListSlotLocked(ctx); err != nil {
return nil, err
}
var resp filesResp
err := d.request(ctx, nextLink, http.MethodGet, func(req *resty.Request) {
if first {
@@ -120,6 +137,19 @@ func (d *Driver) List(ctx context.Context, dirID string) ([]drives.Entry, error)
}
}, &resp)
if err != nil {
if wait, ok := drives.RateLimitRetryAfter(err); ok {
if wait <= 0 {
wait = d.listCooldown
if wait <= 0 {
wait = onedriveListCooldown
}
}
log.Printf("[onedrive] list cooling down drive=%s dir=%s cooldown=%s err=%v", d.id, dirID, wait, err)
if err := sleepContext(ctx, wait); err != nil {
return nil, err
}
continue
}
return nil, fmt.Errorf("onedrive list: %w", err)
}
for _, item := range resp.Value {
@@ -131,6 +161,36 @@ func (d *Driver) List(ctx context.Context, dirID string) ([]drives.Entry, error)
return out, nil
}
func (d *Driver) waitForListSlotLocked(ctx context.Context) error {
if d.listInterval <= 0 || d.lastListAt.IsZero() {
d.lastListAt = time.Now()
return ctx.Err()
}
next := d.lastListAt.Add(d.listInterval)
now := time.Now()
if now.Before(next) {
if err := sleepContext(ctx, next.Sub(now)); err != nil {
return err
}
}
d.lastListAt = time.Now()
return ctx.Err()
}
func sleepContext(ctx context.Context, d time.Duration) error {
if d <= 0 {
return ctx.Err()
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func (d *Driver) Stat(ctx context.Context, fileID string) (*drives.Entry, error) {
var item graphItem
if err := d.request(ctx, d.itemURL(fileID), http.MethodGet, nil, &item); err != nil {
@@ -265,7 +325,7 @@ func (d *Driver) requestOnce(ctx context.Context, rawURL, method string, configu
if err != nil {
return err
}
if isRateLimitResponse(res, graphErr.Error.Code) {
if isRateLimitResponse(res, graphErr.Error.Code, graphErr.Error.Message) {
return onedriveRateLimitError(res, graphErr.Error.Message)
}
if graphErr.Error.Code != "" {
@@ -327,11 +387,54 @@ func (d *Driver) refresh(ctx context.Context) error {
return nil
}
func isRateLimitResponse(res *resty.Response, code string) bool {
if code == "TooManyRequests" || code == "activityLimitReached" {
func isRateLimitResponse(res *resty.Response, code, message string) bool {
if isRateLimitCode(code) || isRateLimitMessage(message) {
return true
}
return res != nil && res.StatusCode() == http.StatusTooManyRequests
if res == nil {
return false
}
if res.StatusCode() == http.StatusTooManyRequests {
return true
}
if res.Header().Get("Retry-After") == "" {
return false
}
switch res.StatusCode() {
case http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
default:
return false
}
}
func isRateLimitCode(code string) bool {
normalized := strings.ToLower(strings.ReplaceAll(strings.TrimSpace(code), "_", ""))
normalized = strings.ReplaceAll(normalized, "-", "")
switch normalized {
case "toomanyrequests",
"activitylimitreached",
"throttledrequest",
"requestthrottled",
"resourcethrottled",
"applicationthrottled",
"tenantthrottled":
return true
default:
return false
}
}
func isRateLimitMessage(message string) bool {
text := strings.ToLower(strings.TrimSpace(message))
if text == "" {
return false
}
return strings.Contains(text, "too many requests") ||
strings.Contains(text, "throttl") ||
strings.Contains(text, "rate limit") ||
strings.Contains(text, "activity limit") ||
strings.Contains(text, "temporarily blocked")
}
func onedriveRateLimitError(res *resty.Response, message string) error {
@@ -199,7 +199,7 @@ func TestGraph429ReturnsRateLimitErrorWithRetryAfter(t *testing.T) {
APIBaseURL: srv.URL,
})
_, err := d.List(context.Background(), "root")
_, err := d.StreamURL(context.Background(), "file-id")
if err == nil {
t.Fatal("list succeeded, want rate limit error")
}
@@ -212,6 +212,92 @@ func TestGraph429ReturnsRateLimitErrorWithRetryAfter(t *testing.T) {
}
}
func TestGraphThrottleMessageReturnsRateLimitError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
if err := json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{
"code": "generalException",
"message": "The request has been throttled. Please try again later.",
},
}); err != nil {
t.Fatalf("write json: %v", err)
}
}))
defer srv.Close()
d := New(Config{
ID: "od-main",
AccessToken: "access-token",
RefreshToken: "refresh-token",
APIBaseURL: srv.URL,
})
_, err := d.StreamURL(context.Background(), "file-id")
if err == nil {
t.Fatal("list succeeded, want rate limit error")
}
var rateLimit *drives.RateLimitError
if !errors.As(err, &rateLimit) {
t.Fatalf("error = %T %[1]v, want RateLimitError", err)
}
}
func TestListCoolsDownAndRetriesOneDriveRateLimit(t *testing.T) {
var calls int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1.0/me/drive/items/root/children" {
t.Fatalf("unexpected request %s %s", r.Method, r.URL.String())
}
calls++
if calls == 1 {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusTooManyRequests)
if err := json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{
"code": "TooManyRequests",
"message": "throttled",
},
}); err != nil {
t.Fatalf("write json: %v", err)
}
return
}
writeJSON(t, w, map[string]any{
"value": []map[string]any{
{
"id": "file-id",
"name": "demo.mp4",
"size": 100,
"file": map[string]any{"mimeType": "video/mp4"},
},
},
})
}))
defer srv.Close()
d := New(Config{
ID: "od-main",
AccessToken: "access-token",
RefreshToken: "refresh-token",
APIBaseURL: srv.URL,
})
d.listInterval = 0
d.listCooldown = time.Millisecond
got, err := d.List(context.Background(), "root")
if err != nil {
t.Fatalf("list: %v", err)
}
if calls != 2 {
t.Fatalf("calls = %d, want retry after rate limit", calls)
}
if len(got) != 1 || got[0].ID != "file-id" {
t.Fatalf("entries = %#v, want retried file", got)
}
}
func TestStatAndStreamURLUseDriveItemMetadata(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := r.Header.Get("Authorization"); got != "Bearer access-token" {
+23
View File
@@ -12,6 +12,8 @@
// wait until teaser queues are idle
// Phase 3: spider91 → cloud migration (single sweep, captcha cooldown still
// honored within this call)
// Phase 4: cleanup duplicate local preview/thumbnail assets after sampled
// fingerprints have identified canonical videos
//
// A 6h soft deadline guards each pipeline run; phases check deadline at their
// boundaries and exit cleanly if exceeded (no in-flight ffmpeg / upload is
@@ -85,6 +87,11 @@ type Config struct {
// RunMigration runs spider91migrate.Migrator.RunOnce for Phase 3.
RunMigration func(ctx context.Context) error
// RunDedupeAssetCleanup removes generated local assets from non-canonical
// videos in size+sampled_sha256 duplicate groups. It must not delete cloud
// files or catalog rows.
RunDedupeAssetCleanup func(ctx context.Context) error
// Now is injected for tests; nil → time.Now.
Now func() time.Time
}
@@ -241,6 +248,7 @@ func (r *Runner) runPipeline(ctx context.Context) {
}
if len(spiderIDs) == 0 {
log.Printf("[nightly] phase 2/3 skipped: no spider91 drive configured")
r.runDedupeAssetCleanupPhase(ctx)
return
}
log.Printf("[nightly] phase 2: crawling %d spider91 drive(s)", len(spiderIDs))
@@ -267,6 +275,8 @@ func (r *Runner) runPipeline(ctx context.Context) {
log.Printf("[nightly] phase 3 migration: %v", err)
}
}
r.runDedupeAssetCleanupPhase(ctx)
}
// checkDeadline returns true when ctx is already done (runner shutting down or
@@ -292,6 +302,19 @@ func (r *Runner) waitIdle(ctx context.Context, phase string) error {
return nil
}
func (r *Runner) runDedupeAssetCleanupPhase(ctx context.Context) {
if r.checkDeadline(ctx, "phase 4") {
return
}
if r.cfg.RunDedupeAssetCleanup == nil {
return
}
log.Printf("[nightly] phase 4: duplicate asset cleanup")
if err := r.cfg.RunDedupeAssetCleanup(ctx); err != nil {
log.Printf("[nightly] phase 4 duplicate asset cleanup: %v", err)
}
}
// readLastRunDate reads the persisted last_run_date or returns "" when unset.
func (r *Runner) readLastRunDate(ctx context.Context) (string, error) {
if r.cfg.Settings == nil {
+22
View File
@@ -114,6 +114,10 @@ func TestRunPipelineHonoursPhaseOrder(t *testing.T) {
rec.push("migrate")
return nil
},
RunDedupeAssetCleanup: func(context.Context) error {
rec.push("dedupe-cleanup")
return nil
},
})
r.runPipeline(context.Background())
@@ -128,6 +132,7 @@ func TestRunPipelineHonoursPhaseOrder(t *testing.T) {
"crawl:sp-1",
"wait-idle", // after phase 2
"migrate",
"dedupe-cleanup",
}
if len(got) != len(want) {
t.Fatalf("call sequence len = %d, want %d; got=%v", len(got), len(want), got)
@@ -156,6 +161,10 @@ func TestRunPipelineSkipsMigrationWhenNoSpider91(t *testing.T) {
rec.push("migrate")
return nil
},
RunDedupeAssetCleanup: func(context.Context) error {
rec.push("dedupe-cleanup")
return nil
},
})
r.runPipeline(context.Background())
@@ -165,6 +174,15 @@ func TestRunPipelineSkipsMigrationWhenNoSpider91(t *testing.T) {
t.Fatalf("phase 2/3 should be skipped when no spider91 drive, got call %q", c)
}
}
foundCleanup := false
for _, c := range rec.snapshot() {
if c == "dedupe-cleanup" {
foundCleanup = true
}
}
if !foundCleanup {
t.Fatalf("dedupe cleanup should still run when spider91 is absent; calls=%v", rec.snapshot())
}
}
func TestRunPipelineExitsWhenContextCancelledMidPhase(t *testing.T) {
@@ -186,6 +204,7 @@ func TestRunPipelineExitsWhenContextCancelledMidPhase(t *testing.T) {
RunSpider91Crawl: func(context.Context, string) { rec.push("crawl") },
WaitPreviewQueuesIdle: func(context.Context) error { rec.push("wait-idle"); return nil },
RunMigration: func(context.Context) error { rec.push("migrate"); return nil },
RunDedupeAssetCleanup: func(context.Context) error { rec.push("dedupe-cleanup"); return nil },
})
r.runPipeline(ctx)
@@ -200,6 +219,9 @@ func TestRunPipelineExitsWhenContextCancelledMidPhase(t *testing.T) {
if c == "crawl" || c == "migrate" {
t.Fatalf("subsequent phase should not run after cancel, got call %q", c)
}
if c == "dedupe-cleanup" {
t.Fatalf("dedupe cleanup should not run after cancel, got call %q", c)
}
}
}
+1
View File
@@ -929,6 +929,7 @@ func ffmpegOutputLooksRateLimited(output []byte) bool {
return false
}
return strings.Contains(text, "too many requests") ||
strings.Contains(text, "throttl") ||
strings.Contains(text, "rate limit") ||
strings.Contains(text, "rate-limit") ||
strings.Contains(text, "server returned 429")