Add sampled fingerprint deduplication

This commit is contained in:
nianzhibai
2026-05-29 23:19:52 +08:00
parent 1a1282382e
commit da0683344e
9 changed files with 743 additions and 47 deletions
+10
View File
@@ -128,6 +128,16 @@ OneDrive 按 OpenList 默认应用方式调用 `https://api.oplist.org/onedrive/
标签分隔符支持 `, ` 和空格。解析结果会和系统标签池匹配,常见番号类噪声会归并到 `AV` 等系统标签,避免把每个番号都变成独立标签。解析结果可在管理后台覆盖。
## 视频去重
项目有三层去重:
1. 同一网盘同一文件按 `(drive_id, file_id)` 形成稳定视频 ID,重复扫描只更新同一行。
2. 扫描时优先按网盘侧 `content_hash` 去重;没有 hash 时退化为 `file_name + size_bytes`。
3. 扫描、爬虫或本地上传完成后,后台指纹 worker 会异步读取视频的少量 Range 片段,生成 `sampled_sha256`。前台列表、首页、搜索、推荐会按 `size_bytes + sampled_sha256` 只展示最早入库的 canonical 视频。
`sampled_sha256` 是文件级去重:适合识别同一个视频文件被复制到 115 / PikPak / OneDrive 等不同网盘的情况。它不会删除任何网盘文件,也不用于识别转码、裁剪、加水印后的同源视频。
## 管理能力
- `/admin/drives`:新增、编辑、删除网盘,触发扫描。
+58 -13
View File
@@ -32,6 +32,7 @@ import (
"github.com/video-site/backend/internal/drives/quark"
"github.com/video-site/backend/internal/drives/spider91"
"github.com/video-site/backend/internal/drives/wopan"
"github.com/video-site/backend/internal/fingerprint"
"github.com/video-site/backend/internal/nightly"
"github.com/video-site/backend/internal/preview"
"github.com/video-site/backend/internal/proxy"
@@ -63,12 +64,13 @@ func main() {
defer cat.Close()
app := &App{
cfg: cfg,
cat: cat,
registry: proxy.NewRegistry(),
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
spider91Crawlers: make(map[string]*spider91.Crawler),
cfg: cfg,
cat: cat,
registry: proxy.NewRegistry(),
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
fingerprintWorkers: make(map[string]*fingerprint.Worker),
spider91Crawlers: make(map[string]*spider91.Crawler),
}
app.proxy = proxy.New(app.registry)
app.spider91Migrator = spider91migrate.New(spider91migrate.Config{
@@ -272,10 +274,11 @@ type App struct {
registry *proxy.Registry
proxy *proxy.Proxy
mu sync.Mutex
workers map[string]*preview.Worker
thumbWorkers map[string]*preview.ThumbWorker
cancels map[string]context.CancelFunc
mu sync.Mutex
workers map[string]*preview.Worker
thumbWorkers map[string]*preview.ThumbWorker
fingerprintWorkers map[string]*fingerprint.Worker
cancels map[string]context.CancelFunc
// spider91Crawlers 按 driveID 索引,每个 spider91 drive 独立一个 Crawler
spider91Crawlers map[string]*spider91.Crawler
@@ -579,12 +582,14 @@ func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprint.Config{})
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
go thumbWorker.Run(workerCtx)
go fingerprintWorker.Run(workerCtx)
a.registerPreviewWorkers(ctx, d.ID, worker, thumbWorker, cancel)
a.registerPreviewWorkers(ctx, d.ID, worker, thumbWorker, fingerprintWorker, cancel)
// spider91 driver 还需要一个 crawler,挂在专用 map 里供 crawlerLoop 调用
if sd, ok := drv.(*spider91.Driver); ok {
@@ -611,12 +616,14 @@ func (a *App) attachLocalUpload(ctx context.Context) error {
})
worker := preview.NewWorker(gen, a.cat, drv)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
fingerprintWorker := fingerprint.NewWorker(a.cat, drv, fingerprint.Config{})
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
go thumbWorker.Run(workerCtx)
go fingerprintWorker.Run(workerCtx)
a.registerPreviewWorkers(ctx, drv.ID(), worker, thumbWorker, cancel)
a.registerPreviewWorkers(ctx, drv.ID(), worker, thumbWorker, fingerprintWorker, cancel)
return nil
}
@@ -708,7 +715,7 @@ func (a *App) attachSpider91Crawler(d *catalog.Drive, drv *spider91.Driver) {
}()
}
func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker, cancel context.CancelFunc) {
func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker *preview.Worker, thumbWorker *preview.ThumbWorker, fingerprintWorker *fingerprint.Worker, cancel context.CancelFunc) {
a.mu.Lock()
if a.cancels == nil {
a.cancels = make(map[string]context.CancelFunc)
@@ -719,6 +726,9 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
if a.thumbWorkers == nil {
a.thumbWorkers = make(map[string]*preview.ThumbWorker)
}
if a.fingerprintWorkers == nil {
a.fingerprintWorkers = make(map[string]*fingerprint.Worker)
}
if old, ok := a.cancels[driveID]; ok && old != nil {
old()
}
@@ -732,6 +742,11 @@ func (a *App) registerPreviewWorkers(ctx context.Context, driveID string, worker
} else {
delete(a.thumbWorkers, driveID)
}
if fingerprintWorker != nil {
a.fingerprintWorkers[driveID] = fingerprintWorker
} else {
delete(a.fingerprintWorkers, driveID)
}
if cancel != nil {
a.cancels[driveID] = cancel
} else {
@@ -830,6 +845,27 @@ func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.
}
}
func (a *App) enqueueFingerprints(ctx context.Context, driveID string, w *fingerprint.Worker) {
if w == nil {
return
}
pending, err := a.cat.ListVideosNeedingFingerprint(ctx, driveID, 0)
if err != nil {
log.Printf("[fingerprint] list pending %s: %v", driveID, err)
return
}
if len(pending) == 0 {
return
}
log.Printf("[fingerprint] enqueue %d videos for drive=%s", len(pending), driveID)
for _, v := range pending {
if !w.EnqueueBlocking(ctx, v) {
log.Printf("[fingerprint] enqueue canceled for drive=%s", driveID)
return
}
}
}
func (a *App) detachDrive(id string) {
a.registry.Remove(id)
a.mu.Lock()
@@ -839,6 +875,7 @@ func (a *App) detachDrive(id string) {
}
delete(a.workers, id)
delete(a.thumbWorkers, id)
delete(a.fingerprintWorkers, id)
delete(a.spider91Crawlers, id)
a.mu.Unlock()
}
@@ -942,6 +979,7 @@ func (a *App) runScan(ctx context.Context, driveID string) {
a.mu.Lock()
worker := a.workers[driveID]
thumbWorker := a.thumbWorkers[driveID]
fingerprintWorker := a.fingerprintWorkers[driveID]
a.mu.Unlock()
var onNew func(v *catalog.Video)
@@ -994,6 +1032,7 @@ func (a *App) runScan(ctx context.Context, driveID string) {
}
}
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
}
func (a *App) cleanupMissingDriveVideos(ctx context.Context, driveID string, liveFileIDs map[string]struct{}, visitedDirIDs map[string]struct{}, fullDriveScan bool) (int, error) {
@@ -1089,6 +1128,7 @@ func (a *App) enqueueUploadedVideo(ctx context.Context, v *catalog.Video) {
a.mu.Lock()
worker := a.workers[v.DriveID]
thumbWorker := a.thumbWorkers[v.DriveID]
fingerprintWorker := a.fingerprintWorkers[v.DriveID]
a.mu.Unlock()
if thumbWorker != nil && v.ThumbnailURL == "" {
@@ -1097,6 +1137,9 @@ func (a *App) enqueueUploadedVideo(ctx context.Context, v *catalog.Video) {
if worker != nil && a.teaserEnabledForDrive(ctx, v.DriveID) {
worker.Enqueue(v)
}
if fingerprintWorker != nil {
fingerprintWorker.Enqueue(v)
}
}
func (a *App) regenPreview(ctx context.Context, videoID string) {
@@ -1348,8 +1391,10 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
a.mu.Lock()
worker := a.workers[driveID]
thumbWorker := a.thumbWorkers[driveID]
fingerprintWorker := a.fingerprintWorkers[driveID]
a.mu.Unlock()
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
a.enqueueFingerprints(ctx, driveID, fingerprintWorker)
}
// spider91IntCred 解析 credentials 中的整数字段,缺省时返回 def。
+3 -3
View File
@@ -53,7 +53,7 @@ func TestRegisterPreviewWorkerBackfillsPendingWhenDriveTeaserEnabled(t *testing.
worker := preview.NewWorker(&serverFakeTeaserGenerator{}, cat, &serverFakeDrive{})
go worker.Run(ctx)
app.registerPreviewWorkers(ctx, "drive-id", worker, nil, func() {})
app.registerPreviewWorkers(ctx, "drive-id", worker, nil, nil, func() {})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
@@ -117,7 +117,7 @@ func TestRegisterPreviewWorkersGenerateThumbnailsBeforePreviews(t *testing.T) {
go worker.Run(ctx)
go thumbWorker.Run(ctx)
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, func() {})
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, nil, func() {})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
@@ -205,7 +205,7 @@ func TestFailedThumbnailsDoNotBlockPreviewGeneration(t *testing.T) {
go worker.Run(ctx)
go thumbWorker.Run(ctx)
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, func() {})
app.registerPreviewWorkers(ctx, "drive-id", worker, thumbWorker, nil, func() {})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
+118 -31
View File
@@ -41,35 +41,38 @@ func (c *Catalog) Close() error { return c.db.Close() }
// ---------- Video ----------
type Video struct {
ID string `json:"id"`
DriveID string `json:"driveId"`
FileID string `json:"fileId"`
FileName string `json:"fileName"`
ContentHash string `json:"contentHash"`
ParentID string `json:"parentId"`
Title string `json:"title"`
Author string `json:"author"`
Tags []string `json:"tags"`
DurationSeconds int `json:"durationSeconds"`
Size int64 `json:"size"`
Ext string `json:"ext"`
Quality string `json:"quality"`
ThumbnailURL string `json:"thumbnailUrl"`
PreviewFileID string `json:"previewFileId"`
PreviewLocal string `json:"previewLocal"`
PreviewStatus string `json:"previewStatus"`
Views int `json:"views"`
Favorites int `json:"favorites"`
Comments int `json:"comments"`
Likes int `json:"likes"`
Dislikes int `json:"dislikes"`
Category string `json:"category"`
Hidden bool `json:"hidden"`
Badges []string `json:"badges"`
Description string `json:"description"`
PublishedAt time.Time `json:"publishedAt"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
ID string `json:"id"`
DriveID string `json:"driveId"`
FileID string `json:"fileId"`
FileName string `json:"fileName"`
ContentHash string `json:"contentHash"`
SampledSHA256 string `json:"sampledSha256"`
FingerprintStatus string `json:"fingerprintStatus"`
FingerprintError string `json:"fingerprintError"`
ParentID string `json:"parentId"`
Title string `json:"title"`
Author string `json:"author"`
Tags []string `json:"tags"`
DurationSeconds int `json:"durationSeconds"`
Size int64 `json:"size"`
Ext string `json:"ext"`
Quality string `json:"quality"`
ThumbnailURL string `json:"thumbnailUrl"`
PreviewFileID string `json:"previewFileId"`
PreviewLocal string `json:"previewLocal"`
PreviewStatus string `json:"previewStatus"`
Views int `json:"views"`
Favorites int `json:"favorites"`
Comments int `json:"comments"`
Likes int `json:"likes"`
Dislikes int `json:"dislikes"`
Category string `json:"category"`
Hidden bool `json:"hidden"`
Badges []string `json:"badges"`
Description string `json:"description"`
PublishedAt time.Time `json:"publishedAt"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
func (c *Catalog) UpsertVideo(ctx context.Context, v *Video) error {
@@ -109,6 +112,18 @@ ON CONFLICT(id) DO UPDATE SET
WHEN excluded.content_hash != '' THEN excluded.content_hash
ELSE videos.content_hash
END,
sampled_sha256 = CASE
WHEN videos.size_bytes != excluded.size_bytes THEN ''
ELSE videos.sampled_sha256
END,
fingerprint_status = CASE
WHEN videos.size_bytes != excluded.size_bytes THEN 'pending'
ELSE COALESCE(videos.fingerprint_status, 'pending')
END,
fingerprint_error = CASE
WHEN videos.size_bytes != excluded.size_bytes THEN ''
ELSE COALESCE(videos.fingerprint_error, '')
END,
duration_seconds= excluded.duration_seconds,
size_bytes = excluded.size_bytes,
ext = excluded.ext,
@@ -668,6 +683,60 @@ func (c *Catalog) FindVideoByFileSignature(ctx context.Context, fileName string,
return scanVideo(row)
}
func (c *Catalog) ListVideosNeedingFingerprint(ctx context.Context, driveID string, limit int) ([]*Video, error) {
if limit <= 0 {
limit = 10000
}
rows, err := c.db.QueryContext(ctx,
`SELECT `+allVideoCols+` FROM videos
WHERE drive_id = ?
AND size_bytes > 0
AND COALESCE(sampled_sha256, '') = ''
AND COALESCE(fingerprint_status, 'pending') = 'pending'
AND COALESCE(hidden, 0) = 0
ORDER BY created_at ASC, id ASC
LIMIT ?`,
driveID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var out []*Video
for rows.Next() {
v, err := scanVideo(rows)
if err != nil {
return nil, err
}
out = append(out, v)
}
return out, rows.Err()
}
func (c *Catalog) UpdateVideoFingerprint(ctx context.Context, id, sampledSHA256, status, errText string) error {
sampledSHA256 = normalizeContentHash(sampledSHA256)
if status == "" {
status = "pending"
}
if len(errText) > 500 {
errText = errText[:500]
}
res, err := c.db.ExecContext(ctx,
`UPDATE videos
SET sampled_sha256 = ?,
fingerprint_status = ?,
fingerprint_error = ?,
updated_at = ?
WHERE id = ?`,
sampledSHA256, status, errText, time.Now().UnixMilli(), id)
if err != nil {
return err
}
if rows, err := res.RowsAffected(); err == nil && rows == 0 {
return sql.ErrNoRows
}
return nil
}
type ListParams struct {
Keyword string
DriveID string
@@ -1171,7 +1240,9 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.upd
// ---------- helpers ----------
const allVideoCols = `
id, drive_id, file_id, COALESCE(file_name, ''), COALESCE(content_hash, ''), COALESCE(parent_id, ''), title, COALESCE(author, ''), COALESCE(tags, '[]'),
id, drive_id, file_id, COALESCE(file_name, ''), COALESCE(content_hash, ''),
COALESCE(sampled_sha256, ''), COALESCE(fingerprint_status, 'pending'), COALESCE(fingerprint_error, ''),
COALESCE(parent_id, ''), title, COALESCE(author, ''), COALESCE(tags, '[]'),
duration_seconds, size_bytes, COALESCE(ext, ''), COALESCE(quality, ''), COALESCE(thumbnail_url, ''),
COALESCE(preview_file_id, ''), COALESCE(preview_local, ''), COALESCE(preview_status, 'pending'),
views, favorites, comments, likes, dislikes,
@@ -1190,6 +1261,20 @@ const uniqueVideoWhereSQL = `((COALESCE(videos.content_hash, '') = ''
OR (dup.created_at = videos.created_at AND dup.id < videos.id)
)
))
AND (COALESCE(videos.sampled_sha256, '') = ''
OR videos.size_bytes <= 0
OR NOT EXISTS (
SELECT 1
FROM videos AS dup
WHERE dup.sampled_sha256 = videos.sampled_sha256
AND dup.size_bytes = videos.size_bytes
AND COALESCE(dup.sampled_sha256, '') != ''
AND dup.size_bytes > 0
AND (
dup.created_at < videos.created_at
OR (dup.created_at = videos.created_at AND dup.id < videos.id)
)
))
AND (COALESCE(videos.file_name, '') = ''
OR videos.size_bytes <= 0
OR NOT EXISTS (
@@ -1215,7 +1300,9 @@ func scanVideo(row rowScanner) (*Video, error) {
var publishedAt, createdAt, updatedAt int64
var hidden int
err := row.Scan(
&v.ID, &v.DriveID, &v.FileID, &v.FileName, &v.ContentHash, &v.ParentID, &v.Title, &v.Author, &tagsJSON,
&v.ID, &v.DriveID, &v.FileID, &v.FileName, &v.ContentHash,
&v.SampledSHA256, &v.FingerprintStatus, &v.FingerprintError,
&v.ParentID, &v.Title, &v.Author, &tagsJSON,
&v.DurationSeconds, &v.Size, &v.Ext, &v.Quality, &v.ThumbnailURL,
&v.PreviewFileID, &v.PreviewLocal, &v.PreviewStatus,
&v.Views, &v.Favorites, &v.Comments, &v.Likes, &v.Dislikes,
@@ -0,0 +1,77 @@
package catalog
import (
"context"
"testing"
"time"
)
func TestListVideosDeduplicatesBySampledSHA256(t *testing.T) {
ctx := context.Background()
cat, err := Open(t.TempDir() + "/catalog.db")
if err != nil {
t.Fatalf("open catalog: %v", err)
}
t.Cleanup(func() {
if err := cat.Close(); err != nil {
t.Fatalf("close catalog: %v", err)
}
})
now := time.Now()
for _, v := range []*Video{
{
ID: "drive-a-file-a",
DriveID: "drive-a",
FileID: "file-a",
FileName: "first-name.mp4",
Title: "First",
Size: 1234,
PublishedAt: now.Add(-time.Minute),
CreatedAt: now.Add(-time.Minute),
UpdatedAt: now.Add(-time.Minute),
},
{
ID: "drive-b-file-b",
DriveID: "drive-b",
FileID: "file-b",
FileName: "second-name.mp4",
Title: "Second",
Size: 1234,
PublishedAt: now,
CreatedAt: now,
UpdatedAt: now,
},
} {
if err := cat.UpsertVideo(ctx, v); err != nil {
t.Fatalf("upsert %s: %v", v.ID, err)
}
}
items, total, err := cat.ListVideos(ctx, ListParams{Page: 1, PageSize: 10})
if err != nil {
t.Fatalf("list before fingerprint: %v", err)
}
if total != 2 || len(items) != 2 {
t.Fatalf("before fingerprint total=%d len=%d, want 2", total, len(items))
}
const sampled = "abc123"
if err := cat.UpdateVideoFingerprint(ctx, "drive-a-file-a", sampled, "ready", ""); err != nil {
t.Fatalf("update a fingerprint: %v", err)
}
if err := cat.UpdateVideoFingerprint(ctx, "drive-b-file-b", sampled, "ready", ""); err != nil {
t.Fatalf("update b fingerprint: %v", err)
}
items, total, err = cat.ListVideos(ctx, ListParams{Page: 1, PageSize: 10})
if err != nil {
t.Fatalf("list after fingerprint: %v", err)
}
if total != 1 || len(items) != 1 {
t.Fatalf("after fingerprint total=%d len=%d, want 1", total, len(items))
}
if items[0].ID != "drive-a-file-a" {
t.Fatalf("canonical id = %q, want earliest created video", items[0].ID)
}
}
+3
View File
@@ -5,6 +5,9 @@ CREATE TABLE IF NOT EXISTS videos (
file_id TEXT NOT NULL,
file_name TEXT DEFAULT '', -- 网盘侧原始文件名,用于同名同大小去重
content_hash TEXT DEFAULT '',
sampled_sha256 TEXT DEFAULT '', -- 跨网盘统一采样指纹(size + sampled bytes
fingerprint_status TEXT DEFAULT 'pending', -- pending / ready / failed
fingerprint_error TEXT DEFAULT '',
parent_id TEXT,
title TEXT NOT NULL,
author TEXT,
+12
View File
@@ -43,6 +43,15 @@ func (c *Catalog) migrate(ctx context.Context) error {
if err := c.addColumnIfMissing(ctx, "videos", "content_hash", "TEXT DEFAULT ''"); err != nil {
return err
}
if err := c.addColumnIfMissing(ctx, "videos", "sampled_sha256", "TEXT DEFAULT ''"); err != nil {
return err
}
if err := c.addColumnIfMissing(ctx, "videos", "fingerprint_status", "TEXT DEFAULT 'pending'"); err != nil {
return err
}
if err := c.addColumnIfMissing(ctx, "videos", "fingerprint_error", "TEXT DEFAULT ''"); err != nil {
return err
}
if err := c.addColumnIfMissing(ctx, "videos", "file_name", "TEXT DEFAULT ''"); err != nil {
return err
}
@@ -83,6 +92,9 @@ func (c *Catalog) migrate(ctx context.Context) error {
if _, err := c.db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_videos_content_hash ON videos(content_hash)`); err != nil {
return err
}
if _, err := c.db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_videos_sampled_sha256 ON videos(size_bytes, sampled_sha256)`); err != nil {
return err
}
if _, err := c.db.ExecContext(ctx, `CREATE INDEX IF NOT EXISTS idx_videos_hidden ON videos(hidden)`); err != nil {
return err
}
+350
View File
@@ -0,0 +1,350 @@
package fingerprint
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/video-site/backend/internal/catalog"
"github.com/video-site/backend/internal/drives"
)
const (
defaultSampleSizeBytes int64 = 512 * 1024
defaultFullHashMaxSize int64 = 8 * 1024 * 1024
defaultCooldown = 5 * time.Minute
)
type Config struct {
SampleSizeBytes int64
FullHashMaxSize int64
HTTPClient *http.Client
}
type Worker struct {
Catalog *catalog.Catalog
Drive drives.Drive
Config Config
ch chan *catalog.Video
queue videoQueue
http *http.Client
}
func NewWorker(cat *catalog.Catalog, drv drives.Drive, cfg Config) *Worker {
hc := cfg.HTTPClient
if hc == nil {
hc = &http.Client{Timeout: 0}
}
if cfg.SampleSizeBytes <= 0 {
cfg.SampleSizeBytes = defaultSampleSizeBytes
}
if cfg.FullHashMaxSize <= 0 {
cfg.FullHashMaxSize = defaultFullHashMaxSize
}
return &Worker{
Catalog: cat,
Drive: drv,
Config: cfg,
ch: make(chan *catalog.Video, 4096),
http: hc,
}
}
func (w *Worker) Enqueue(v *catalog.Video) bool {
if v == nil {
return false
}
if !w.queue.reserve(v.ID) {
return true
}
select {
case w.ch <- v:
return true
default:
w.queue.release(v.ID)
return false
}
}
func (w *Worker) EnqueueBlocking(ctx context.Context, v *catalog.Video) bool {
if v == nil {
return false
}
if !w.queue.reserve(v.ID) {
return true
}
select {
case w.ch <- v:
return true
case <-ctx.Done():
w.queue.release(v.ID)
return false
}
}
func (w *Worker) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case v := <-w.ch:
w.processQueued(ctx, v)
select {
case <-ctx.Done():
return
case <-time.After(500 * time.Millisecond):
}
}
}
}
func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
defer w.queue.release(v.ID)
if w.Catalog == nil || w.Drive == nil || v == nil || v.ID == "" {
return
}
current, err := w.Catalog.GetVideo(ctx, v.ID)
if err != nil {
return
}
if current.SampledSHA256 != "" || current.FingerprintStatus == "ready" || current.Hidden {
return
}
sum, err := Compute(ctx, w.Drive, current, w.Config, w.http)
if err != nil {
var rl *drives.RateLimitError
if errors.As(err, &rl) {
wait := rl.RetryAfter
if wait <= 0 {
wait = defaultCooldown
}
log.Printf("[fingerprint] drive=%s rate limited; keep video=%s pending and cool down for %s: %v", w.Drive.ID(), current.ID, wait, err)
sleepContext(ctx, wait)
return
}
log.Printf("[fingerprint] video=%s failed: %v", current.ID, err)
_ = w.Catalog.UpdateVideoFingerprint(ctx, current.ID, "", "failed", err.Error())
return
}
if err := w.Catalog.UpdateVideoFingerprint(ctx, current.ID, sum, "ready", ""); err != nil {
log.Printf("[fingerprint] update video=%s: %v", current.ID, err)
return
}
log.Printf("[fingerprint] video=%s ready sampled_sha256=%s", current.ID, sum)
}
func Compute(ctx context.Context, drv drives.Drive, v *catalog.Video, cfg Config, hc *http.Client) (string, error) {
if drv == nil {
return "", errors.New("fingerprint: nil drive")
}
if v == nil {
return "", errors.New("fingerprint: nil video")
}
if v.Size <= 0 {
return "", errors.New("fingerprint: video size is empty")
}
if cfg.SampleSizeBytes <= 0 {
cfg.SampleSizeBytes = defaultSampleSizeBytes
}
if cfg.FullHashMaxSize <= 0 {
cfg.FullHashMaxSize = defaultFullHashMaxSize
}
if hc == nil {
hc = &http.Client{Timeout: 0}
}
link, err := drv.StreamURL(ctx, v.FileID)
if err != nil {
return "", fmt.Errorf("fingerprint: stream url: %w", err)
}
if link == nil || strings.TrimSpace(link.URL) == "" {
return "", errors.New("fingerprint: empty stream url")
}
ranges := sampleRanges(v.Size, cfg.SampleSizeBytes, cfg.FullHashMaxSize)
h := sha256.New()
writeHashHeader(h, v.Size, ranges)
for _, r := range ranges {
data, err := readRange(ctx, hc, link, r)
if err != nil {
return "", err
}
if int64(len(data)) != r.length {
return "", fmt.Errorf("fingerprint: short sample at %d: got %d want %d", r.start, len(data), r.length)
}
_, _ = h.Write([]byte(fmt.Sprintf("offset=%d length=%d\n", r.start, r.length)))
_, _ = h.Write(data)
_, _ = h.Write([]byte("\n"))
}
return hex.EncodeToString(h.Sum(nil)), nil
}
type byteRange struct {
start int64
length int64
}
func sampleRanges(size, sampleSize, fullHashMax int64) []byteRange {
if size <= fullHashMax {
return []byteRange{{start: 0, length: size}}
}
if sampleSize > size {
sampleSize = size
}
maxStart := size - sampleSize
percents := []int64{0, 20, 40, 60, 80}
out := make([]byteRange, 0, len(percents))
seen := make(map[int64]struct{}, len(percents))
for _, pct := range percents {
start := maxStart * pct / 100
if _, ok := seen[start]; ok {
continue
}
seen[start] = struct{}{}
out = append(out, byteRange{start: start, length: sampleSize})
}
return out
}
func writeHashHeader(w io.Writer, size int64, ranges []byteRange) {
_, _ = fmt.Fprintf(w, "video-site-sampled-sha256-v1\nsize=%d\nsamples=%d\n", size, len(ranges))
}
func readRange(ctx context.Context, hc *http.Client, link *drives.StreamLink, r byteRange) ([]byte, error) {
u, err := url.Parse(link.URL)
if err == nil && (u.Scheme == "http" || u.Scheme == "https") {
return readHTTPRange(ctx, hc, link, r)
}
path := link.URL
if err == nil && u.Scheme == "file" {
path = u.Path
}
return readLocalRange(path, r)
}
func readLocalRange(path string, r byteRange) ([]byte, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("fingerprint: open local stream: %w", err)
}
defer f.Close()
buf := make([]byte, r.length)
n, err := f.ReadAt(buf, r.start)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("fingerprint: read local sample: %w", err)
}
if int64(n) != r.length {
return nil, fmt.Errorf("fingerprint: read local sample at %d: got %d want %d", r.start, n, r.length)
}
return buf, nil
}
func readHTTPRange(ctx context.Context, hc *http.Client, link *drives.StreamLink, r byteRange) ([]byte, error) {
end := r.start + r.length - 1
req, err := http.NewRequestWithContext(ctx, http.MethodGet, link.URL, nil)
if err != nil {
return nil, err
}
for k, vs := range link.Headers {
for _, v := range vs {
req.Header.Add(k, v)
}
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", r.start, end))
resp, err := hc.Do(req)
if err != nil {
return nil, fmt.Errorf("fingerprint: read remote sample: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusTooManyRequests {
return nil, &drives.RateLimitError{
Provider: "fingerprint",
RetryAfter: parseRetryAfter(resp.Header.Get("Retry-After")),
Err: fmt.Errorf("remote sample rate limited: status=%d", resp.StatusCode),
}
}
if resp.StatusCode != http.StatusPartialContent {
if resp.StatusCode == http.StatusOK && r.start == 0 {
data, err := io.ReadAll(io.LimitReader(resp.Body, r.length+1))
if err != nil {
return nil, err
}
if int64(len(data)) == r.length {
return data, nil
}
}
return nil, fmt.Errorf("fingerprint: range request got status=%d for bytes=%d-%d", resp.StatusCode, r.start, end)
}
return io.ReadAll(io.LimitReader(resp.Body, r.length))
}
func parseRetryAfter(raw string) time.Duration {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0
}
if seconds, err := strconv.Atoi(raw); err == nil && seconds > 0 {
return time.Duration(seconds) * time.Second
}
if when, err := http.ParseTime(raw); err == nil {
d := time.Until(when)
if d > 0 {
return d
}
}
return 0
}
func sleepContext(ctx context.Context, d time.Duration) bool {
if d <= 0 {
return true
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
type videoQueue struct {
mu sync.Mutex
ids map[string]struct{}
}
func (q *videoQueue) reserve(id string) bool {
if id == "" {
return true
}
q.mu.Lock()
defer q.mu.Unlock()
if q.ids == nil {
q.ids = make(map[string]struct{})
}
if _, ok := q.ids[id]; ok {
return false
}
q.ids[id] = struct{}{}
return true
}
func (q *videoQueue) release(id string) {
if id == "" {
return
}
q.mu.Lock()
delete(q.ids, id)
q.mu.Unlock()
}
+112
View File
@@ -0,0 +1,112 @@
package fingerprint
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/video-site/backend/internal/catalog"
"github.com/video-site/backend/internal/drives"
)
func TestComputeLocalFilesWithSameContentMatch(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
body := []byte("same video bytes")
a := filepath.Join(dir, "a.mp4")
b := filepath.Join(dir, "b.mp4")
if err := os.WriteFile(a, body, 0o644); err != nil {
t.Fatalf("write a: %v", err)
}
if err := os.WriteFile(b, body, 0o644); err != nil {
t.Fatalf("write b: %v", err)
}
sumA, err := Compute(ctx, &fakeDrive{paths: map[string]string{"a": a}}, &catalog.Video{ID: "a", FileID: "a", Size: int64(len(body))}, Config{}, nil)
if err != nil {
t.Fatalf("compute a: %v", err)
}
sumB, err := Compute(ctx, &fakeDrive{paths: map[string]string{"b": b}}, &catalog.Video{ID: "b", FileID: "b", Size: int64(len(body))}, Config{}, nil)
if err != nil {
t.Fatalf("compute b: %v", err)
}
if sumA == "" || sumA != sumB {
t.Fatalf("fingerprints = %q / %q, want same non-empty", sumA, sumB)
}
}
func TestComputeRemoteUsesRangeSamples(t *testing.T) {
ctx := context.Background()
data := make([]byte, 10*1024*1024)
for i := range data {
data[i] = byte(i % 251)
}
var ranges []string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawRange := r.Header.Get("Range")
ranges = append(ranges, rawRange)
var start, end int
if _, err := fmt.Sscanf(rawRange, "bytes=%d-%d", &start, &end); err != nil {
t.Fatalf("bad range %q: %v", rawRange, err)
}
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(data)))
w.WriteHeader(http.StatusPartialContent)
_, _ = w.Write(data[start : end+1])
}))
defer srv.Close()
drv := &fakeDrive{paths: map[string]string{"remote": srv.URL + "/video.mp4"}}
sum, err := Compute(ctx, drv, &catalog.Video{ID: "remote", FileID: "remote", Size: int64(len(data))}, Config{
SampleSizeBytes: 4,
FullHashMaxSize: 8,
HTTPClient: srv.Client(),
}, srv.Client())
if err != nil {
t.Fatalf("compute remote: %v", err)
}
if sum == "" {
t.Fatal("fingerprint should not be empty")
}
want := []string{
"bytes=0-3",
"bytes=2097151-2097154",
"bytes=4194302-4194305",
"bytes=6291453-6291456",
"bytes=8388604-8388607",
}
if fmt.Sprint(ranges) != fmt.Sprint(want) {
t.Fatalf("ranges = %#v, want %#v", ranges, want)
}
}
type fakeDrive struct {
paths map[string]string
}
func (d *fakeDrive) Kind() string { return "fake" }
func (d *fakeDrive) ID() string { return "fake" }
func (d *fakeDrive) Init(context.Context) error {
return nil
}
func (d *fakeDrive) List(context.Context, string) ([]drives.Entry, error) {
return nil, drives.ErrNotSupported
}
func (d *fakeDrive) Stat(context.Context, string) (*drives.Entry, error) {
return nil, drives.ErrNotSupported
}
func (d *fakeDrive) StreamURL(_ context.Context, fileID string) (*drives.StreamLink, error) {
return &drives.StreamLink{URL: d.paths[fileID], Expires: time.Now().Add(time.Minute)}, nil
}
func (d *fakeDrive) Upload(context.Context, string, string, io.Reader, int64) (string, error) {
return "", drives.ErrNotSupported
}
func (d *fakeDrive) EnsureDir(context.Context, string) (string, error) {
return "", drives.ErrNotSupported
}
func (d *fakeDrive) RootID() string { return "root" }