mirror of
https://github.com/nianzhibai/91.git
synced 2026-06-15 00:44:30 +08:00
feat: add per-storage manual transcode for browser-incompatible videos
Add a transcode control to each storage in the admin drives page, modeled after the cover/preview generation controls: - Manual start/stop button per storage; transcoding is off by default and never runs automatically (not triggered by scans or the nightly pipeline) - New transcode worker probes candidates (non mp4/webm extensions) with ffprobe: already-compatible files are marked skipped; AVI with H.264 is remuxed losslessly; incompatible codecs (MPEG-4 Part 2, WMV, RMVB, HEVC...) are transcoded to H.264/AAC MP4 with +faststart - Transcoded output is uploaded back to the same storage under a "91转码" directory which is auto-added to the drive's scan skip list so the scanner never re-imports the artifacts - Playback source automatically prefers the transcoded file once ready, keeping the 302 direct-link mode for cloud drives - videos table gains transcode_status/error/file_id/size columns via startup migration; counts and live task status surface in the admin drives API and generation panel UI - Stop semantics: per-drive stop button, drive-level "stop all tasks" and global stop all include the transcode task; interrupted videos keep their candidate status and resume on next start Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
+154
-4
@@ -43,6 +43,7 @@ import (
|
||||
"github.com/video-site/backend/internal/proxy"
|
||||
"github.com/video-site/backend/internal/scanner"
|
||||
"github.com/video-site/backend/internal/spider91migrate"
|
||||
"github.com/video-site/backend/internal/transcode"
|
||||
)
|
||||
|
||||
const fingerprintReconcileInterval = time.Minute
|
||||
@@ -218,6 +219,12 @@ func main() {
|
||||
OnRegenFailedFingerprints: func(driveID string) {
|
||||
go app.regenFailedFingerprints(ctx, driveID)
|
||||
},
|
||||
OnStartDriveTranscode: func(driveID string) (bool, string) {
|
||||
return app.startDriveTranscode(ctx, driveID)
|
||||
},
|
||||
OnStopDriveTranscode: func(driveID string) bool {
|
||||
return app.stopDriveTranscode(driveID)
|
||||
},
|
||||
OnDeleteVideo: func(reqCtx context.Context, videoID string, deleteSource bool) (api.DeleteVideoResult, error) {
|
||||
return app.deleteVideo(reqCtx, videoID, deleteSource)
|
||||
},
|
||||
@@ -368,6 +375,13 @@ type App struct {
|
||||
// uploadProgress 跟踪脚本爬虫迁移到云盘时的实时上传状态。
|
||||
uploadProgressMu sync.Mutex
|
||||
uploadProgress map[string]driveUploadProgress
|
||||
|
||||
// transcodeMu 保护 transcodeWorkers / transcodeCancels。
|
||||
// 浏览器兼容性转码每盘最多一个任务,且只能由管理员手动开启
|
||||
// (不随扫盘/夜间流水线自动运行),手动停止或处理完即从 map 清除。
|
||||
transcodeMu sync.Mutex
|
||||
transcodeWorkers map[string]*transcode.Worker
|
||||
transcodeCancels map[string]context.CancelFunc
|
||||
}
|
||||
|
||||
type driveScanProgress struct {
|
||||
@@ -557,7 +571,14 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
out := make(map[string]api.DriveGenerationStatuses, len(scanningDrives)+len(previewWorkers)+len(thumbWorkers)+len(fingerprintWorkers)+len(uploadProgresses))
|
||||
a.transcodeMu.Lock()
|
||||
transcodeWorkers := make(map[string]*transcode.Worker, len(a.transcodeWorkers))
|
||||
for id, worker := range a.transcodeWorkers {
|
||||
transcodeWorkers[id] = worker
|
||||
}
|
||||
a.transcodeMu.Unlock()
|
||||
|
||||
out := make(map[string]api.DriveGenerationStatuses, len(scanningDrives)+len(previewWorkers)+len(thumbWorkers)+len(fingerprintWorkers)+len(uploadProgresses)+len(transcodeWorkers))
|
||||
for id, running := range scanningDrives {
|
||||
if !running {
|
||||
continue
|
||||
@@ -601,6 +622,11 @@ func (a *App) driveGenerationStatuses() map[string]api.DriveGenerationStatuses {
|
||||
}
|
||||
out[id] = status
|
||||
}
|
||||
for id, worker := range transcodeWorkers {
|
||||
status := out[id]
|
||||
status.Transcode = generationStatusFromTranscode(worker.Status())
|
||||
out[id] = status
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -687,6 +713,126 @@ func generationStatusFromFingerprint(status fingerprint.TaskStatus) api.Generati
|
||||
return out
|
||||
}
|
||||
|
||||
func generationStatusFromTranscode(status transcode.TaskStatus) api.GenerationStatus {
|
||||
state := status.State
|
||||
if state == "" {
|
||||
state = "idle"
|
||||
}
|
||||
return api.GenerationStatus{
|
||||
State: state,
|
||||
CurrentTitle: status.CurrentTitle,
|
||||
QueueLength: status.QueueLength,
|
||||
DoneCount: status.DoneCount,
|
||||
TotalCount: status.TotalCount,
|
||||
}
|
||||
}
|
||||
|
||||
// transcodeWorkDir 返回转码用的本地临时目录(下载原片 / 写产物),与
|
||||
// localUploadDir 一样挂在数据目录下,避免 /tmp 空间不足。
|
||||
func (a *App) transcodeWorkDir() string {
|
||||
return filepath.Join(filepath.Dir(a.cfg.Storage.LocalPreviewDir), "transcode-tmp")
|
||||
}
|
||||
|
||||
// startDriveTranscode 手动开启某盘的浏览器兼容性转码。
|
||||
// 转码从不自动运行:扫盘、夜间流水线都不会触发,这里是唯一入口。
|
||||
// 任务跑完候选列表后自然结束;中途可用 stopDriveTranscode / 停止所有任务中断。
|
||||
func (a *App) startDriveTranscode(ctx context.Context, driveID string) (bool, string) {
|
||||
driveID = strings.TrimSpace(driveID)
|
||||
if driveID == "" {
|
||||
return false, "缺少存储 ID"
|
||||
}
|
||||
drv, ok := a.registry.Get(driveID)
|
||||
if !ok {
|
||||
return false, "存储未挂载或不可用"
|
||||
}
|
||||
switch drv.Kind() {
|
||||
case spider91.Kind, scriptcrawler.Kind:
|
||||
return false, "爬虫存储不支持转码"
|
||||
}
|
||||
workDir := a.transcodeWorkDir()
|
||||
if err := os.MkdirAll(workDir, 0o755); err != nil {
|
||||
return false, "创建转码临时目录失败: " + err.Error()
|
||||
}
|
||||
|
||||
a.transcodeMu.Lock()
|
||||
if a.transcodeWorkers == nil {
|
||||
a.transcodeWorkers = make(map[string]*transcode.Worker)
|
||||
a.transcodeCancels = make(map[string]context.CancelFunc)
|
||||
}
|
||||
if existing := a.transcodeWorkers[driveID]; existing != nil {
|
||||
a.transcodeMu.Unlock()
|
||||
return false, "该存储的转码任务已在运行"
|
||||
}
|
||||
worker := transcode.NewWorker(transcode.Config{
|
||||
FFmpegPath: a.cfg.Preview.FFmpegPath,
|
||||
FFprobePath: a.cfg.Preview.FFprobePath,
|
||||
WorkDir: workDir,
|
||||
}, a.cat, drv)
|
||||
taskCtx, done := a.registerDriveTaskContext(ctx, driveID)
|
||||
runCtx, cancel := context.WithCancel(taskCtx)
|
||||
a.transcodeWorkers[driveID] = worker
|
||||
a.transcodeCancels[driveID] = cancel
|
||||
a.transcodeMu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
cancel()
|
||||
done()
|
||||
a.transcodeMu.Lock()
|
||||
if a.transcodeWorkers[driveID] == worker {
|
||||
delete(a.transcodeWorkers, driveID)
|
||||
delete(a.transcodeCancels, driveID)
|
||||
}
|
||||
a.transcodeMu.Unlock()
|
||||
}()
|
||||
candidates, err := a.cat.ListTranscodeCandidates(runCtx, driveID, 0)
|
||||
if err != nil {
|
||||
log.Printf("[transcode] list candidates drive=%s: %v", driveID, err)
|
||||
return
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
log.Printf("[transcode] drive=%s no candidates", driveID)
|
||||
return
|
||||
}
|
||||
log.Printf("[transcode] drive=%s start, %d candidates", driveID, len(candidates))
|
||||
worker.Run(runCtx, candidates)
|
||||
}()
|
||||
return true, ""
|
||||
}
|
||||
|
||||
// stopAllDriveTranscodes 停掉所有盘的转码任务,返回被停的 driveID 列表。
|
||||
func (a *App) stopAllDriveTranscodes() []string {
|
||||
a.transcodeMu.Lock()
|
||||
cancels := a.transcodeCancels
|
||||
a.transcodeCancels = nil
|
||||
a.transcodeWorkers = nil
|
||||
a.transcodeMu.Unlock()
|
||||
ids := make([]string, 0, len(cancels))
|
||||
for id, cancel := range cancels {
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// stopDriveTranscode 手动停止某盘的转码任务。返回是否有任务被停。
|
||||
func (a *App) stopDriveTranscode(driveID string) bool {
|
||||
driveID = strings.TrimSpace(driveID)
|
||||
a.transcodeMu.Lock()
|
||||
cancel := a.transcodeCancels[driveID]
|
||||
delete(a.transcodeCancels, driveID)
|
||||
delete(a.transcodeWorkers, driveID)
|
||||
a.transcodeMu.Unlock()
|
||||
if cancel == nil {
|
||||
return false
|
||||
}
|
||||
cancel()
|
||||
log.Printf("[transcode] stop drive=%s", driveID)
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
|
||||
a.driveAttachMu.Lock()
|
||||
defer a.driveAttachMu.Unlock()
|
||||
@@ -1435,10 +1581,11 @@ func (a *App) stopDriveTasks(ctx context.Context, driveID string) bool {
|
||||
queued := a.clearQueuedDriveTask(driveID)
|
||||
fingerprintQueued := a.clearFingerprintQueueing(driveID)
|
||||
uploading := a.clearCrawlerUploadProgress(driveID)
|
||||
transcoding := a.stopDriveTranscode(driveID)
|
||||
hadWorkers := a.resetDriveGenerationWorkers(ctx, driveID)
|
||||
stopped := canceled > 0 || queued || fingerprintQueued || uploading || hadWorkers
|
||||
log.Printf("[tasks] stop drive=%s stopped=%v canceled_tasks=%d queued=%v fingerprint_queue=%v uploading=%v workers=%v",
|
||||
driveID, stopped, canceled, queued, fingerprintQueued, uploading, hadWorkers)
|
||||
stopped := canceled > 0 || queued || fingerprintQueued || uploading || transcoding || hadWorkers
|
||||
log.Printf("[tasks] stop drive=%s stopped=%v canceled_tasks=%d queued=%v fingerprint_queue=%v uploading=%v transcoding=%v workers=%v",
|
||||
driveID, stopped, canceled, queued, fingerprintQueued, uploading, transcoding, hadWorkers)
|
||||
return stopped
|
||||
}
|
||||
|
||||
@@ -1459,6 +1606,9 @@ func (a *App) stopAllDriveTasks(ctx context.Context) int {
|
||||
for _, id := range a.clearAllCrawlerUploadProgress() {
|
||||
stoppedIDs[id] = struct{}{}
|
||||
}
|
||||
for _, id := range a.stopAllDriveTranscodes() {
|
||||
stoppedIDs[id] = struct{}{}
|
||||
}
|
||||
for _, id := range a.resetAllDriveGenerationWorkers(ctx) {
|
||||
stoppedIDs[id] = struct{}{}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,13 @@ type AdminServer struct {
|
||||
OnRegenFailedPreviews func(driveID string)
|
||||
OnRegenFailedThumbnails func(driveID string)
|
||||
OnRegenFailedFingerprints func(driveID string)
|
||||
OnDeleteVideo func(ctx context.Context, videoID string, deleteSource bool) (DeleteVideoResult, error)
|
||||
// OnStartDriveTranscode 手动开启某盘的浏览器兼容性转码任务。
|
||||
// 返回 (是否接受, 拒绝原因)。转码从不自动运行,只能在这里手动触发;
|
||||
// 处理完候选列表后任务自然结束。
|
||||
OnStartDriveTranscode func(driveID string) (bool, string)
|
||||
// OnStopDriveTranscode 手动停止某盘正在进行的转码任务。返回是否有任务被停。
|
||||
OnStopDriveTranscode func(driveID string) bool
|
||||
OnDeleteVideo func(ctx context.Context, videoID string, deleteSource bool) (DeleteVideoResult, error)
|
||||
GetDriveGenerationStatuses func() map[string]DriveGenerationStatuses
|
||||
// OnTeaserEnabledChanged 在 per-drive 预览视频开关被切换后调用。
|
||||
// enabled=true 时上层应该重新把 pending 预览视频入队(类似旧的全局开关从关到开);
|
||||
@@ -118,6 +124,7 @@ type DriveGenerationStatuses struct {
|
||||
Preview GenerationStatus `json:"preview"`
|
||||
Fingerprint GenerationStatus `json:"fingerprint"`
|
||||
Upload GenerationStatus `json:"upload"`
|
||||
Transcode GenerationStatus `json:"transcode"`
|
||||
}
|
||||
|
||||
type NightlyJobStatus struct {
|
||||
@@ -169,6 +176,8 @@ func (a *AdminServer) Register(r chi.Router) {
|
||||
r.Post("/drives/{id}/previews/failed/regenerate", a.handleRegenFailedPreviews)
|
||||
r.Post("/drives/{id}/thumbnails/failed/regenerate", a.handleRegenFailedThumbnails)
|
||||
r.Post("/drives/{id}/fingerprints/failed/regenerate", a.handleRegenFailedFingerprints)
|
||||
r.Post("/drives/{id}/transcode/start", a.handleStartDriveTranscode)
|
||||
r.Post("/drives/{id}/transcode/stop", a.handleStopDriveTranscode)
|
||||
|
||||
// 爬虫
|
||||
r.Get("/crawlers", a.handleListCrawlers)
|
||||
@@ -431,6 +440,11 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
|
||||
writeErr(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
transcodeCounts, err := a.Catalog.CountTranscodesByDrive(r.Context())
|
||||
if err != nil {
|
||||
writeErr(w, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
generationStatuses := map[string]DriveGenerationStatuses{}
|
||||
if a.GetDriveGenerationStatuses != nil {
|
||||
generationStatuses = a.GetDriveGenerationStatuses()
|
||||
@@ -470,6 +484,11 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
|
||||
FingerprintReadyCount int `json:"fingerprintReadyCount"`
|
||||
FingerprintPendingCount int `json:"fingerprintPendingCount"`
|
||||
FingerprintFailedCount int `json:"fingerprintFailedCount"`
|
||||
TranscodeGenerationStatus GenerationStatus `json:"transcodeGenerationStatus"`
|
||||
TranscodePendingCount int `json:"transcodePendingCount"`
|
||||
TranscodeReadyCount int `json:"transcodeReadyCount"`
|
||||
TranscodeFailedCount int `json:"transcodeFailedCount"`
|
||||
TranscodeSkippedCount int `json:"transcodeSkippedCount"`
|
||||
}
|
||||
list := make([]out, 0, len(drives))
|
||||
for _, d := range drives {
|
||||
@@ -479,6 +498,7 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
|
||||
counts := teaserCounts[d.ID]
|
||||
thumbCounts := thumbnailCounts[d.ID]
|
||||
fingerprintCount := fingerprintCounts[d.ID]
|
||||
transcodeCount := transcodeCounts[d.ID]
|
||||
generation := generationStatuses[d.ID]
|
||||
if generation.Scan.State == "" {
|
||||
generation.Scan.State = "idle"
|
||||
@@ -492,6 +512,9 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
|
||||
if generation.Fingerprint.State == "" {
|
||||
generation.Fingerprint.State = "idle"
|
||||
}
|
||||
if generation.Transcode.State == "" {
|
||||
generation.Transcode.State = "idle"
|
||||
}
|
||||
// spider91 没有用户凭证概念;只要存在 drive 行就视为"已配置"。
|
||||
// last_crawl_at 是后端自动写入的运行状态字段,不计入 hasCredential 判定。
|
||||
hasCred := false
|
||||
@@ -537,6 +560,11 @@ func (a *AdminServer) handleListDrives(w http.ResponseWriter, r *http.Request) {
|
||||
FingerprintReadyCount: fingerprintCount.Ready,
|
||||
FingerprintPendingCount: fingerprintCount.Pending,
|
||||
FingerprintFailedCount: fingerprintCount.Failed,
|
||||
TranscodeGenerationStatus: generation.Transcode,
|
||||
TranscodePendingCount: transcodeCount.Pending,
|
||||
TranscodeReadyCount: transcodeCount.Ready,
|
||||
TranscodeFailedCount: transcodeCount.Failed,
|
||||
TranscodeSkippedCount: transcodeCount.Skipped,
|
||||
})
|
||||
}
|
||||
writeJSON(w, http.StatusOK, list)
|
||||
@@ -1547,6 +1575,35 @@ func (a *AdminServer) handleStopDriveTasks(w http.ResponseWriter, r *http.Reques
|
||||
})
|
||||
}
|
||||
|
||||
// handleStartDriveTranscode 手动开启某盘的浏览器兼容性转码。
|
||||
// 转码默认不开启、从不自动运行;本接口是唯一入口。
|
||||
func (a *AdminServer) handleStartDriveTranscode(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
if a.OnStartDriveTranscode == nil {
|
||||
writeErr(w, http.StatusNotImplemented, errors.New("transcode not supported"))
|
||||
return
|
||||
}
|
||||
accepted, message := a.OnStartDriveTranscode(id)
|
||||
writeJSON(w, http.StatusAccepted, map[string]any{
|
||||
"ok": true,
|
||||
"accepted": accepted,
|
||||
"message": message,
|
||||
})
|
||||
}
|
||||
|
||||
// handleStopDriveTranscode 手动停止某盘正在进行的转码任务。
|
||||
func (a *AdminServer) handleStopDriveTranscode(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
stopped := false
|
||||
if a.OnStopDriveTranscode != nil {
|
||||
stopped = a.OnStopDriveTranscode(id)
|
||||
}
|
||||
writeJSON(w, http.StatusAccepted, map[string]any{
|
||||
"ok": true,
|
||||
"stopped": stopped,
|
||||
})
|
||||
}
|
||||
|
||||
func (a *AdminServer) p123QRClient() *p123.QRClient {
|
||||
return p123.NewQRClient(p123.QRConfig{
|
||||
UserAPIBaseURL: a.P123UserAPIBaseURL,
|
||||
|
||||
@@ -970,6 +970,15 @@ func thumbnailURL(v *catalog.Video) string {
|
||||
return base + "?v=" + strconv.FormatInt(v.UpdatedAt.UnixMilli(), 10)
|
||||
}
|
||||
|
||||
// transcodedSource 在视频有就绪的浏览器兼容性转码产物时返回产物的播放地址。
|
||||
// 产物和原始文件在同一个 drive 上,走同一条 /p/stream 代理/302 链路。
|
||||
func transcodedSource(v *catalog.Video) (string, bool) {
|
||||
if v.TranscodeStatus == "ready" && v.TranscodedFileID != "" && v.DriveID != localUploadDriveID {
|
||||
return fmt.Sprintf("/p/stream/%s/%s", pathSegment(v.DriveID), pathSegment(v.TranscodedFileID)), true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (s *Server) videoSource(v *catalog.Video) string {
|
||||
if v.DriveID == localUploadDriveID {
|
||||
return "/p/upload/" + pathSegment(v.ID)
|
||||
@@ -982,6 +991,9 @@ func (s *Server) videoSource(v *catalog.Video) string {
|
||||
}
|
||||
}
|
||||
}
|
||||
if src, ok := transcodedSource(v); ok {
|
||||
return src
|
||||
}
|
||||
return fmt.Sprintf("/p/stream/%s/%s", pathSegment(v.DriveID), pathSegment(v.FileID))
|
||||
}
|
||||
|
||||
@@ -991,6 +1003,9 @@ func videoSource(v *catalog.Video) string {
|
||||
if v.DriveID == localUploadDriveID {
|
||||
return "/p/upload/" + pathSegment(v.ID)
|
||||
}
|
||||
if src, ok := transcodedSource(v); ok {
|
||||
return src
|
||||
}
|
||||
return fmt.Sprintf("/p/stream/%s/%s", pathSegment(v.DriveID), pathSegment(v.FileID))
|
||||
}
|
||||
|
||||
|
||||
@@ -71,6 +71,12 @@ type Video struct {
|
||||
PreviewFileID string `json:"previewFileId"`
|
||||
PreviewLocal string `json:"previewLocal"`
|
||||
PreviewStatus string `json:"previewStatus"`
|
||||
// TranscodeStatus:浏览器兼容性转码状态。
|
||||
// ''=未检测 / pending=已入队 / ready=已转码 / skipped=无需转码 / failed=失败。
|
||||
TranscodeStatus string `json:"transcodeStatus"`
|
||||
TranscodeError string `json:"transcodeError"`
|
||||
TranscodedFileID string `json:"transcodedFileId"`
|
||||
TranscodedSize int64 `json:"transcodedSize"`
|
||||
Views int `json:"views"`
|
||||
Favorites int `json:"favorites"`
|
||||
Comments int `json:"comments"`
|
||||
@@ -190,6 +196,84 @@ func (c *Catalog) UpdatePreview(ctx context.Context, id, previewLocal, status st
|
||||
return err
|
||||
}
|
||||
|
||||
// transcodeCandidateWhereSQL 圈定"可能需要浏览器兼容性转码"的视频:
|
||||
// mp4/webm/m4v 默认浏览器可播不进候选;strm 是远程引用没有本体。
|
||||
// 其余扩展名都先入候选,由转码 worker probe 实际编码后决定转码还是跳过
|
||||
// (skipped)。failed 也保留在候选里,重新点开始转码时会自动重试。
|
||||
const transcodeCandidateWhereSQL = `COALESCE(ext, '') NOT IN ('mp4', 'webm', 'm4v', 'strm')
|
||||
AND COALESCE(transcode_status, '') IN ('', 'pending', 'failed')`
|
||||
|
||||
// ListTranscodeCandidates 列出某盘所有转码候选视频。limit<=0 表示不限制。
|
||||
func (c *Catalog) ListTranscodeCandidates(ctx context.Context, driveID string, limit int) ([]*Video, error) {
|
||||
query := `SELECT ` + allVideoCols + ` FROM videos
|
||||
WHERE drive_id = ? AND ` + transcodeCandidateWhereSQL + `
|
||||
ORDER BY created_at ASC, id ASC`
|
||||
args := []any{driveID}
|
||||
if limit > 0 {
|
||||
query += ` LIMIT ?`
|
||||
args = append(args, limit)
|
||||
}
|
||||
rows, err := c.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []*Video
|
||||
for rows.Next() {
|
||||
v, err := scanVideo(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, v)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// UpdateVideoTranscode 写回单条视频的转码结果。
|
||||
// status=ready 时 transcodedFileID/transcodedSize 指向转码产物;
|
||||
// 其它 status 调用方应传空值,本函数会按传入值原样覆盖。
|
||||
func (c *Catalog) UpdateVideoTranscode(ctx context.Context, id, status, errMsg, transcodedFileID string, transcodedSize int64) error {
|
||||
_, err := c.db.ExecContext(ctx,
|
||||
`UPDATE videos SET transcode_status = ?, transcode_error = ?, transcoded_file_id = ?, transcoded_size = ?, updated_at = ? WHERE id = ?`,
|
||||
status, errMsg, transcodedFileID, transcodedSize, time.Now().UnixMilli(), id)
|
||||
return err
|
||||
}
|
||||
|
||||
// DriveTranscodeCounts 是单盘的转码进度统计。
|
||||
type DriveTranscodeCounts struct {
|
||||
// Pending 是仍在候选集合里、还没有出结果的数量(含从未检测过的)。
|
||||
Pending int
|
||||
Ready int
|
||||
Failed int
|
||||
Skipped int
|
||||
}
|
||||
|
||||
func (c *Catalog) CountTranscodesByDrive(ctx context.Context) (map[string]DriveTranscodeCounts, error) {
|
||||
rows, err := c.db.QueryContext(ctx, `
|
||||
SELECT drive_id,
|
||||
COUNT(CASE WHEN COALESCE(ext, '') NOT IN ('mp4', 'webm', 'm4v', 'strm')
|
||||
AND COALESCE(transcode_status, '') IN ('', 'pending') THEN 1 END) AS pending_count,
|
||||
COUNT(CASE WHEN COALESCE(transcode_status, '') = 'ready' THEN 1 END) AS ready_count,
|
||||
COUNT(CASE WHEN COALESCE(transcode_status, '') = 'failed' THEN 1 END) AS failed_count,
|
||||
COUNT(CASE WHEN COALESCE(transcode_status, '') = 'skipped' THEN 1 END) AS skipped_count
|
||||
FROM videos
|
||||
GROUP BY drive_id`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
out := make(map[string]DriveTranscodeCounts)
|
||||
for rows.Next() {
|
||||
var driveID string
|
||||
var counts DriveTranscodeCounts
|
||||
if err := rows.Scan(&driveID, &counts.Pending, &counts.Ready, &counts.Failed, &counts.Skipped); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[driveID] = counts
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (c *Catalog) HideVideo(ctx context.Context, id string) error {
|
||||
res, err := c.db.ExecContext(ctx,
|
||||
`UPDATE videos SET hidden = 1, updated_at = ? WHERE id = ?`,
|
||||
@@ -2165,6 +2249,7 @@ COALESCE(sampled_sha256, ''), COALESCE(fingerprint_status, 'pending'), COALESCE(
|
||||
COALESCE(parent_id, ''), title, COALESCE(author, ''), COALESCE(tags, '[]'),
|
||||
duration_seconds, size_bytes, COALESCE(ext, ''), COALESCE(quality, ''), COALESCE(thumbnail_url, ''),
|
||||
COALESCE(preview_file_id, ''), COALESCE(preview_local, ''), COALESCE(preview_status, 'pending'),
|
||||
COALESCE(transcode_status, ''), COALESCE(transcode_error, ''), COALESCE(transcoded_file_id, ''), COALESCE(transcoded_size, 0),
|
||||
views, favorites, comments, likes, dislikes,
|
||||
COALESCE(category, ''), COALESCE(hidden, 0), COALESCE(badges, '[]'), COALESCE(description, ''),
|
||||
published_at, created_at, updated_at
|
||||
@@ -2236,6 +2321,7 @@ func scanVideo(row rowScanner) (*Video, error) {
|
||||
&v.ParentID, &v.Title, &v.Author, &tagsJSON,
|
||||
&v.DurationSeconds, &v.Size, &v.Ext, &v.Quality, &v.ThumbnailURL,
|
||||
&v.PreviewFileID, &v.PreviewLocal, &v.PreviewStatus,
|
||||
&v.TranscodeStatus, &v.TranscodeError, &v.TranscodedFileID, &v.TranscodedSize,
|
||||
&v.Views, &v.Favorites, &v.Comments, &v.Likes, &v.Dislikes,
|
||||
&v.Category, &hidden, &badgesJSON, &v.Description,
|
||||
&publishedAt, &createdAt, &updatedAt,
|
||||
|
||||
@@ -22,6 +22,10 @@ CREATE TABLE IF NOT EXISTS videos (
|
||||
preview_file_id TEXT, -- deprecated: 旧版回写网盘后的预览视频 file id
|
||||
preview_local TEXT, -- 本地预览视频路径(兜底)
|
||||
preview_status TEXT DEFAULT 'pending', -- pending / ready / failed
|
||||
transcode_status TEXT DEFAULT '', -- '' / pending / ready / skipped / failed(浏览器兼容性转码)
|
||||
transcode_error TEXT DEFAULT '',
|
||||
transcoded_file_id TEXT DEFAULT '', -- 转码产物在同一 drive 上的 fileID,播放源优先用它
|
||||
transcoded_size INTEGER DEFAULT 0,
|
||||
views INTEGER DEFAULT 0,
|
||||
favorites INTEGER DEFAULT 0,
|
||||
comments INTEGER DEFAULT 0,
|
||||
|
||||
@@ -66,6 +66,21 @@ func (c *Catalog) migrate(ctx context.Context) error {
|
||||
if err := c.addColumnIfMissing(ctx, "videos", "thumbnail_failures", "INTEGER DEFAULT 0"); err != nil {
|
||||
return err
|
||||
}
|
||||
// videos.transcode_*:浏览器兼容性转码状态。
|
||||
// status:''=未检测 / pending=已入队 / ready=已转码 / skipped=检测后无需转码 / failed=失败。
|
||||
// transcoded_file_id 指向转码产物在同一 drive 上的 fileID,播放源优先使用它。
|
||||
if err := c.addColumnIfMissing(ctx, "videos", "transcode_status", "TEXT DEFAULT ''"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.addColumnIfMissing(ctx, "videos", "transcode_error", "TEXT DEFAULT ''"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.addColumnIfMissing(ctx, "videos", "transcoded_file_id", "TEXT DEFAULT ''"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.addColumnIfMissing(ctx, "videos", "transcoded_size", "INTEGER DEFAULT 0"); err != nil {
|
||||
return err
|
||||
}
|
||||
// drives.teaser_enabled:每盘预览视频开关,替代旧的全局 preview.enabled。
|
||||
// 升级路径:直接让 ALTER TABLE 的 DEFAULT 1 兜底 —— 每个现存 drive 都默认开启,
|
||||
// 不读旧的 settings.preview.enabled 字段。这样老用户即便之前关过全局开关,
|
||||
|
||||
@@ -0,0 +1,178 @@
|
||||
// Package transcode 实现"浏览器兼容性转码":把网盘/本地存储中浏览器
|
||||
// <video> 播不动的视频(AVI/WMV/FLV、MPEG-4 Part 2、RMVB 等)转成
|
||||
// H.264 + AAC 的 MP4,并把产物上传回同一存储,播放源切到产物文件。
|
||||
//
|
||||
// 与封面/预览生成不同,转码不会自动运行——只能由管理员在网盘管理页
|
||||
// 手动开启,也可以随时手动停止。
|
||||
package transcode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MediaInfo 是 ffprobe 探测出来的、做兼容性判定所需的最小信息。
|
||||
type MediaInfo struct {
|
||||
// FormatName 是 ffprobe 的 format_name,逗号分隔的 demuxer 别名,
|
||||
// 例如 "mov,mp4,m4a,3gp,3g2,mj2" / "avi" / "matroska,webm"。
|
||||
FormatName string
|
||||
VideoCodecs []string
|
||||
AudioCodecs []string
|
||||
}
|
||||
|
||||
// browserCompatibleVideoCodecs 是主流浏览器 <video> 普遍可解码的视频编码。
|
||||
// HEVC/H.265 只有部分平台支持,保守起见不算兼容。
|
||||
var browserCompatibleVideoCodecs = map[string]bool{
|
||||
"h264": true,
|
||||
"vp8": true,
|
||||
"vp9": true,
|
||||
"av1": true,
|
||||
}
|
||||
|
||||
// browserCompatibleAudioCodecs 是主流浏览器普遍可解码的音频编码。
|
||||
var browserCompatibleAudioCodecs = map[string]bool{
|
||||
"aac": true,
|
||||
"mp3": true,
|
||||
"opus": true,
|
||||
"vorbis": true,
|
||||
"flac": true,
|
||||
}
|
||||
|
||||
// NeedsTranscode 判断这个文件是否需要转码才能在浏览器里播放。
|
||||
// ext 是 catalog 里记录的扩展名(小写、不带点),用来区分 mkv 和 webm
|
||||
// (两者的 format_name 都是 "matroska,webm")。
|
||||
func NeedsTranscode(info MediaInfo, ext string) bool {
|
||||
if !containerCompatible(info.FormatName, ext) {
|
||||
return true
|
||||
}
|
||||
for _, codec := range info.VideoCodecs {
|
||||
if !browserCompatibleVideoCodecs[strings.ToLower(codec)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, codec := range info.AudioCodecs {
|
||||
if !browserCompatibleAudioCodecs[strings.ToLower(codec)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func containerCompatible(formatName, ext string) bool {
|
||||
format := strings.ToLower(formatName)
|
||||
for _, name := range strings.Split(format, ",") {
|
||||
if name == "mp4" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// matroska,webm:只有真 .webm 信任为浏览器可播容器;.mkv 保守转码。
|
||||
if strings.Contains(format, "webm") && strings.EqualFold(ext, "webm") {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ProbeFile 用 ffprobe 探测本地文件的容器与音视频编码。
|
||||
func ProbeFile(ctx context.Context, ffprobePath, path string) (MediaInfo, error) {
|
||||
ctx2, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(ctx2, ffprobePath,
|
||||
"-v", "error",
|
||||
"-show_entries", "format=format_name",
|
||||
"-show_entries", "stream=codec_type,codec_name",
|
||||
"-of", "json",
|
||||
path,
|
||||
)
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return MediaInfo{}, fmt.Errorf("transcode: ffprobe: %w", err)
|
||||
}
|
||||
var parsed struct {
|
||||
Format struct {
|
||||
FormatName string `json:"format_name"`
|
||||
} `json:"format"`
|
||||
Streams []struct {
|
||||
CodecType string `json:"codec_type"`
|
||||
CodecName string `json:"codec_name"`
|
||||
} `json:"streams"`
|
||||
}
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
return MediaInfo{}, fmt.Errorf("transcode: parse ffprobe output: %w", err)
|
||||
}
|
||||
info := MediaInfo{FormatName: parsed.Format.FormatName}
|
||||
for _, s := range parsed.Streams {
|
||||
switch s.CodecType {
|
||||
case "video":
|
||||
info.VideoCodecs = append(info.VideoCodecs, s.CodecName)
|
||||
case "audio":
|
||||
info.AudioCodecs = append(info.AudioCodecs, s.CodecName)
|
||||
}
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// buildFFmpegArgs 按探测结果生成转码参数:
|
||||
// - 编码本就兼容、只是容器不行(如 AVI 里装 H.264)→ 流拷贝 remux,零质量损失;
|
||||
// - 否则视频转 H.264(裁到偶数尺寸 + yuv420p 保证兼容性)、音频转 AAC。
|
||||
//
|
||||
// 两种情况都加 +faststart 把 moov 提前,便于边下边播。
|
||||
func buildFFmpegArgs(info MediaInfo, inPath, outPath string) []string {
|
||||
args := []string{"-y", "-i", inPath}
|
||||
videoOK := true
|
||||
for _, codec := range info.VideoCodecs {
|
||||
if !browserCompatibleVideoCodecs[strings.ToLower(codec)] {
|
||||
videoOK = false
|
||||
break
|
||||
}
|
||||
}
|
||||
audioOK := true
|
||||
for _, codec := range info.AudioCodecs {
|
||||
if !browserCompatibleAudioCodecs[strings.ToLower(codec)] {
|
||||
audioOK = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if videoOK {
|
||||
args = append(args, "-c:v", "copy")
|
||||
} else {
|
||||
args = append(args,
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-crf", "23",
|
||||
"-vf", "scale=trunc(iw/2)*2:trunc(ih/2)*2",
|
||||
"-pix_fmt", "yuv420p",
|
||||
)
|
||||
}
|
||||
if len(info.AudioCodecs) == 0 {
|
||||
args = append(args, "-an")
|
||||
} else if audioOK {
|
||||
args = append(args, "-c:a", "copy")
|
||||
} else {
|
||||
args = append(args, "-c:a", "aac", "-b:a", "128k")
|
||||
}
|
||||
args = append(args, "-movflags", "+faststart", "-f", "mp4", outPath)
|
||||
return args
|
||||
}
|
||||
|
||||
// TranscodeFile 把本地输入文件转成浏览器可播的 MP4 写到 outPath。
|
||||
func TranscodeFile(ctx context.Context, ffmpegPath string, info MediaInfo, inPath, outPath string) error {
|
||||
args := buildFFmpegArgs(info, inPath, outPath)
|
||||
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("transcode: ffmpeg: %w: %s", err, tailOf(string(out), 400))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func tailOf(s string, n int) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[len(s)-n:]
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
package transcode
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/video-site/backend/internal/catalog"
|
||||
)
|
||||
|
||||
func TestNeedsTranscode(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
info MediaInfo
|
||||
ext string
|
||||
want bool
|
||||
}{
|
||||
{
|
||||
name: "h264 aac mp4 is compatible",
|
||||
info: MediaInfo{FormatName: "mov,mp4,m4a,3gp,3g2,mj2", VideoCodecs: []string{"h264"}, AudioCodecs: []string{"aac"}},
|
||||
ext: "mp4",
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "mpeg4 in avi needs transcode",
|
||||
info: MediaInfo{FormatName: "avi", VideoCodecs: []string{"mpeg4"}, AudioCodecs: []string{"mp3"}},
|
||||
ext: "avi",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "h264 in avi needs remux",
|
||||
info: MediaInfo{FormatName: "avi", VideoCodecs: []string{"h264"}, AudioCodecs: []string{"aac"}},
|
||||
ext: "avi",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "hevc in mp4 needs transcode",
|
||||
info: MediaInfo{FormatName: "mov,mp4,m4a,3gp,3g2,mj2", VideoCodecs: []string{"hevc"}, AudioCodecs: []string{"aac"}},
|
||||
ext: "mp4",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "vp9 opus webm is compatible",
|
||||
info: MediaInfo{FormatName: "matroska,webm", VideoCodecs: []string{"vp9"}, AudioCodecs: []string{"opus"}},
|
||||
ext: "webm",
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "h264 in mkv is conservative transcode",
|
||||
info: MediaInfo{FormatName: "matroska,webm", VideoCodecs: []string{"h264"}, AudioCodecs: []string{"aac"}},
|
||||
ext: "mkv",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "pcm audio in mov needs transcode",
|
||||
info: MediaInfo{FormatName: "mov,mp4,m4a,3gp,3g2,mj2", VideoCodecs: []string{"h264"}, AudioCodecs: []string{"pcm_s16le"}},
|
||||
ext: "mov",
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "video only h264 mp4 is compatible",
|
||||
info: MediaInfo{FormatName: "mov,mp4,m4a,3gp,3g2,mj2", VideoCodecs: []string{"h264"}},
|
||||
ext: "mp4",
|
||||
want: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := NeedsTranscode(tc.info, tc.ext); got != tc.want {
|
||||
t.Fatalf("NeedsTranscode(%+v, %q) = %v, want %v", tc.info, tc.ext, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildFFmpegArgsRemuxWhenCodecsCompatible(t *testing.T) {
|
||||
// AVI 里装 H.264+AAC:只需要换容器,应该走流拷贝
|
||||
info := MediaInfo{FormatName: "avi", VideoCodecs: []string{"h264"}, AudioCodecs: []string{"aac"}}
|
||||
args := strings.Join(buildFFmpegArgs(info, "in.avi", "out.mp4"), " ")
|
||||
if !strings.Contains(args, "-c:v copy") {
|
||||
t.Fatalf("expected video stream copy, got: %s", args)
|
||||
}
|
||||
if !strings.Contains(args, "-c:a copy") {
|
||||
t.Fatalf("expected audio stream copy, got: %s", args)
|
||||
}
|
||||
if !strings.Contains(args, "+faststart") {
|
||||
t.Fatalf("expected faststart flag, got: %s", args)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildFFmpegArgsTranscodesIncompatibleCodecs(t *testing.T) {
|
||||
info := MediaInfo{FormatName: "avi", VideoCodecs: []string{"mpeg4"}, AudioCodecs: []string{"wmav2"}}
|
||||
args := strings.Join(buildFFmpegArgs(info, "in.avi", "out.mp4"), " ")
|
||||
if !strings.Contains(args, "-c:v libx264") {
|
||||
t.Fatalf("expected libx264 video encode, got: %s", args)
|
||||
}
|
||||
if !strings.Contains(args, "-c:a aac") {
|
||||
t.Fatalf("expected aac audio encode, got: %s", args)
|
||||
}
|
||||
if !strings.Contains(args, "yuv420p") {
|
||||
t.Fatalf("expected yuv420p pixel format, got: %s", args)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildFFmpegArgsDropsAudioWhenNoAudioStream(t *testing.T) {
|
||||
info := MediaInfo{FormatName: "avi", VideoCodecs: []string{"mpeg4"}}
|
||||
args := strings.Join(buildFFmpegArgs(info, "in.avi", "out.mp4"), " ")
|
||||
if !strings.Contains(args, "-an") {
|
||||
t.Fatalf("expected -an for video without audio, got: %s", args)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTranscodedName(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
fileName, title, id, want string
|
||||
}{
|
||||
{"www.98T.la@167.avi", "www.98T.la@167", "p115-1", "www.98T.la@167.mp4"},
|
||||
{"", "标题", "p115-2", "标题.mp4"},
|
||||
{"a/b\\c.wmv", "", "p115-3", "a_b_c.mp4"},
|
||||
} {
|
||||
v := &catalog.Video{FileName: tc.fileName, Title: tc.title, ID: tc.id}
|
||||
if got := transcodedName(v); got != tc.want {
|
||||
t.Fatalf("transcodedName(%q,%q,%q) = %q, want %q", tc.fileName, tc.title, tc.id, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,308 @@
|
||||
package transcode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/video-site/backend/internal/catalog"
|
||||
"github.com/video-site/backend/internal/drives"
|
||||
)
|
||||
|
||||
// DefaultTargetDirName 是转码产物在网盘上的存放目录(相对根目录)。
|
||||
// worker 第一次上传前会 EnsureDir 并把该目录加进 drive 的扫描跳过列表,
|
||||
// 避免 scanner 把转码产物当成新视频重复入库。
|
||||
const DefaultTargetDirName = "91转码"
|
||||
|
||||
type Config struct {
|
||||
FFmpegPath string
|
||||
FFprobePath string
|
||||
// WorkDir 是下载原始文件 / 写转码产物的本地临时目录。
|
||||
WorkDir string
|
||||
// TargetDirName 为空时用 DefaultTargetDirName。
|
||||
TargetDirName string
|
||||
}
|
||||
|
||||
// TaskStatus 与 preview/fingerprint worker 的状态结构对齐,供 admin 展示。
|
||||
type TaskStatus struct {
|
||||
State string
|
||||
CurrentTitle string
|
||||
QueueLength int
|
||||
DoneCount int
|
||||
TotalCount int
|
||||
}
|
||||
|
||||
// Worker 串行处理一个 drive 的转码任务。生命周期与一次"开始转码"对应:
|
||||
// Run 处理完整个候选列表(或 ctx 被取消)后即结束,不常驻。
|
||||
type Worker struct {
|
||||
cfg Config
|
||||
cat *catalog.Catalog
|
||||
drv drives.Drive
|
||||
hc *http.Client
|
||||
|
||||
mu sync.Mutex
|
||||
state string
|
||||
currentTitle string
|
||||
done int
|
||||
total int
|
||||
|
||||
targetDirOnce sync.Once
|
||||
targetDirID string
|
||||
targetDirErr error
|
||||
}
|
||||
|
||||
func NewWorker(cfg Config, cat *catalog.Catalog, drv drives.Drive) *Worker {
|
||||
if cfg.FFmpegPath == "" {
|
||||
cfg.FFmpegPath = "ffmpeg"
|
||||
}
|
||||
if cfg.FFprobePath == "" {
|
||||
cfg.FFprobePath = "ffprobe"
|
||||
}
|
||||
if cfg.TargetDirName == "" {
|
||||
cfg.TargetDirName = DefaultTargetDirName
|
||||
}
|
||||
if cfg.WorkDir == "" {
|
||||
cfg.WorkDir = os.TempDir()
|
||||
}
|
||||
return &Worker{
|
||||
cfg: cfg,
|
||||
cat: cat,
|
||||
drv: drv,
|
||||
hc: &http.Client{Timeout: 0},
|
||||
state: "idle",
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Status() TaskStatus {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
queueLen := w.total - w.done
|
||||
if w.state == "generating" && queueLen > 0 {
|
||||
// 正在处理的那条不算"排队中"
|
||||
queueLen--
|
||||
}
|
||||
if queueLen < 0 {
|
||||
queueLen = 0
|
||||
}
|
||||
return TaskStatus{
|
||||
State: w.state,
|
||||
CurrentTitle: w.currentTitle,
|
||||
QueueLength: queueLen,
|
||||
DoneCount: w.done,
|
||||
TotalCount: w.total,
|
||||
}
|
||||
}
|
||||
|
||||
// Run 串行转码整个候选列表。ctx 取消时停在当前条目边界(正在跑的 ffmpeg
|
||||
// 会被 CommandContext 杀掉),未处理的候选保持原状态,下次开始时继续。
|
||||
func (w *Worker) Run(ctx context.Context, videos []*catalog.Video) {
|
||||
w.mu.Lock()
|
||||
w.state = "generating"
|
||||
w.total = len(videos)
|
||||
w.done = 0
|
||||
w.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
w.mu.Lock()
|
||||
w.state = "idle"
|
||||
w.currentTitle = ""
|
||||
w.mu.Unlock()
|
||||
}()
|
||||
|
||||
for _, v := range videos {
|
||||
if ctx.Err() != nil {
|
||||
log.Printf("[transcode] drive=%s canceled after %d/%d", w.drv.ID(), w.doneCount(), len(videos))
|
||||
return
|
||||
}
|
||||
w.mu.Lock()
|
||||
w.currentTitle = v.Title
|
||||
w.mu.Unlock()
|
||||
|
||||
if err := w.process(ctx, v); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
// 取消导致的失败不要写 failed,保持候选状态便于下次继续
|
||||
log.Printf("[transcode] drive=%s canceled while processing %s", w.drv.ID(), v.ID)
|
||||
return
|
||||
}
|
||||
log.Printf("[transcode] drive=%s video=%s failed: %v", w.drv.ID(), v.ID, err)
|
||||
if uerr := w.cat.UpdateVideoTranscode(context.WithoutCancel(ctx), v.ID, "failed", err.Error(), "", 0); uerr != nil {
|
||||
log.Printf("[transcode] mark failed %s: %v", v.ID, uerr)
|
||||
}
|
||||
}
|
||||
w.mu.Lock()
|
||||
w.done++
|
||||
w.mu.Unlock()
|
||||
}
|
||||
log.Printf("[transcode] drive=%s finished %d videos", w.drv.ID(), len(videos))
|
||||
}
|
||||
|
||||
func (w *Worker) doneCount() int {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.done
|
||||
}
|
||||
|
||||
func (w *Worker) process(ctx context.Context, v *catalog.Video) error {
|
||||
localPath, cleanup, err := w.fetchSource(ctx, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch source: %w", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
info, err := ProbeFile(ctx, w.cfg.FFprobePath, localPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !NeedsTranscode(info, v.Ext) {
|
||||
log.Printf("[transcode] drive=%s video=%s compatible (%s), skip", w.drv.ID(), v.ID, info.FormatName)
|
||||
return w.cat.UpdateVideoTranscode(ctx, v.ID, "skipped", "", "", 0)
|
||||
}
|
||||
|
||||
outPath := filepath.Join(w.cfg.WorkDir, sanitizeFileName(v.ID)+".transcoding.mp4")
|
||||
defer os.Remove(outPath)
|
||||
if err := TranscodeFile(ctx, w.cfg.FFmpegPath, info, localPath, outPath); err != nil {
|
||||
return err
|
||||
}
|
||||
stat, err := os.Stat(outPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat transcoded output: %w", err)
|
||||
}
|
||||
|
||||
dirID, err := w.ensureTargetDir(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ensure target dir: %w", err)
|
||||
}
|
||||
f, err := os.Open(outPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
fileID, err := w.drv.Upload(ctx, dirID, transcodedName(v), f, stat.Size())
|
||||
if err != nil {
|
||||
return fmt.Errorf("upload transcoded file: %w", err)
|
||||
}
|
||||
log.Printf("[transcode] drive=%s video=%s ready: file=%s size=%d", w.drv.ID(), v.ID, fileID, stat.Size())
|
||||
return w.cat.UpdateVideoTranscode(ctx, v.ID, "ready", "", fileID, stat.Size())
|
||||
}
|
||||
|
||||
// fetchSource 把原始文件准备成本地路径。本地存储直接复用源路径(cleanup
|
||||
// 不删除源文件);云盘则整文件下载到 WorkDir。
|
||||
func (w *Worker) fetchSource(ctx context.Context, v *catalog.Video) (string, func(), error) {
|
||||
link, err := w.drv.StreamURL(ctx, v.FileID)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
u, err := url.Parse(link.URL)
|
||||
if isLocal := err == nil && u.Scheme != "http" && u.Scheme != "https"; isLocal {
|
||||
path := link.URL
|
||||
if err == nil && u.Scheme == "file" {
|
||||
path = u.Path
|
||||
}
|
||||
return path, func() {}, nil
|
||||
}
|
||||
|
||||
tmpPath := filepath.Join(w.cfg.WorkDir, sanitizeFileName(v.ID)+".src.tmp")
|
||||
cleanup := func() { os.Remove(tmpPath) }
|
||||
if err := w.downloadTo(ctx, link, tmpPath); err != nil {
|
||||
cleanup()
|
||||
return "", nil, err
|
||||
}
|
||||
return tmpPath, cleanup, nil
|
||||
}
|
||||
|
||||
func (w *Worker) downloadTo(ctx context.Context, link *drives.StreamLink, dst string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, link.URL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k, vals := range link.Headers {
|
||||
for _, val := range vals {
|
||||
req.Header.Add(k, val)
|
||||
}
|
||||
}
|
||||
res, err := w.hc.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
return fmt.Errorf("download source: HTTP %d", res.StatusCode)
|
||||
}
|
||||
f, err := os.Create(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
if _, err := io.Copy(f, res.Body); err != nil {
|
||||
return fmt.Errorf("download source: %w", err)
|
||||
}
|
||||
return f.Sync()
|
||||
}
|
||||
|
||||
// ensureTargetDir 确保网盘上的转码产物目录存在,并把它写进 drive 的扫描
|
||||
// 跳过列表(幂等),避免 scanner 把产物再当新视频收进库。
|
||||
func (w *Worker) ensureTargetDir(ctx context.Context) (string, error) {
|
||||
w.targetDirOnce.Do(func() {
|
||||
dirID, err := w.drv.EnsureDir(ctx, w.cfg.TargetDirName)
|
||||
if err != nil {
|
||||
w.targetDirErr = err
|
||||
return
|
||||
}
|
||||
w.targetDirID = dirID
|
||||
if err := w.addDirToSkipList(ctx, dirID); err != nil {
|
||||
// 跳过列表更新失败不阻塞转码,只记日志(最坏情况是 scanner
|
||||
// 之后把产物扫成新视频,可手动加跳过目录修复)。
|
||||
log.Printf("[transcode] drive=%s add skip dir %s: %v", w.drv.ID(), dirID, err)
|
||||
}
|
||||
})
|
||||
return w.targetDirID, w.targetDirErr
|
||||
}
|
||||
|
||||
func (w *Worker) addDirToSkipList(ctx context.Context, dirID string) error {
|
||||
d, err := w.cat.GetDrive(ctx, w.drv.ID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, existing := range d.SkipDirIDs {
|
||||
if existing == dirID {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return w.cat.SetDriveSkipDirIDs(ctx, w.drv.ID(), append(d.SkipDirIDs, dirID))
|
||||
}
|
||||
|
||||
// transcodedName 生成产物文件名:原文件名去掉扩展名 + .mp4。
|
||||
func transcodedName(v *catalog.Video) string {
|
||||
base := strings.TrimSpace(v.FileName)
|
||||
if base == "" {
|
||||
base = v.Title
|
||||
}
|
||||
if base == "" {
|
||||
base = v.ID
|
||||
}
|
||||
if ext := filepath.Ext(base); ext != "" {
|
||||
base = strings.TrimSuffix(base, ext)
|
||||
}
|
||||
return sanitizeFileName(base) + ".mp4"
|
||||
}
|
||||
|
||||
// sanitizeFileName 把路径分隔符等危险字符替换掉,避免拼出意外路径。
|
||||
func sanitizeFileName(name string) string {
|
||||
replacer := strings.NewReplacer(
|
||||
"/", "_", "\\", "_", ":", "_", "*", "_", "?", "_",
|
||||
"\"", "_", "<", "_", ">", "_", "|", "_", "\x00", "_",
|
||||
)
|
||||
out := strings.TrimSpace(replacer.Replace(name))
|
||||
if out == "" {
|
||||
out = fmt.Sprintf("transcoded-%d", time.Now().UnixMilli())
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -48,6 +48,7 @@ function isDriveBusy(d: api.AdminDrive) {
|
||||
d.thumbnailGenerationStatus,
|
||||
d.previewGenerationStatus,
|
||||
d.fingerprintGenerationStatus,
|
||||
d.transcodeGenerationStatus,
|
||||
].some((status) => {
|
||||
const state = status?.state || "idle";
|
||||
return state !== "idle";
|
||||
@@ -74,6 +75,7 @@ export function DrivesPage() {
|
||||
const [regenFailedThumbId, setRegenFailedThumbId] = useState("");
|
||||
const [regenFailedFingerprintId, setRegenFailedFingerprintId] = useState("");
|
||||
const [togglingTeaserId, setTogglingTeaserId] = useState("");
|
||||
const [togglingTranscodeId, setTogglingTranscodeId] = useState("");
|
||||
const [scanningAll, setScanningAll] = useState(false);
|
||||
const [stoppingAll, setStoppingAll] = useState(false);
|
||||
const [trackingNightly, setTrackingNightly] = useState(false);
|
||||
@@ -499,6 +501,41 @@ export function DrivesPage() {
|
||||
}
|
||||
}
|
||||
|
||||
async function handleStartTranscode(d: api.AdminDrive) {
|
||||
setTogglingTranscodeId(d.id);
|
||||
try {
|
||||
const resp = await api.startDriveTranscode(d.id);
|
||||
if (resp.accepted) {
|
||||
show(`已开始「${d.name || d.id}」的视频转码`, "success");
|
||||
} else {
|
||||
show(resp.message || "转码任务未能开启", "info");
|
||||
}
|
||||
refreshDriveList();
|
||||
} catch (e) {
|
||||
show(e instanceof Error ? e.message : "开启失败", "error");
|
||||
} finally {
|
||||
setTogglingTranscodeId("");
|
||||
}
|
||||
}
|
||||
|
||||
async function handleStopTranscode(d: api.AdminDrive) {
|
||||
setTogglingTranscodeId(d.id);
|
||||
try {
|
||||
const resp = await api.stopDriveTranscode(d.id);
|
||||
show(
|
||||
resp.stopped
|
||||
? `已停止「${d.name || d.id}」的视频转码`
|
||||
: `「${d.name || d.id}」没有正在运行的转码任务`,
|
||||
"success"
|
||||
);
|
||||
refreshDriveList();
|
||||
} catch (e) {
|
||||
show(e instanceof Error ? e.message : "停止失败", "error");
|
||||
} finally {
|
||||
setTogglingTranscodeId("");
|
||||
}
|
||||
}
|
||||
|
||||
const selectedDrive = useMemo(() => {
|
||||
return selectedDriveId ? list.find((d) => d.id === selectedDriveId) : null;
|
||||
}, [selectedDriveId, list]);
|
||||
@@ -634,10 +671,13 @@ export function DrivesPage() {
|
||||
regenFailedThumbId={regenFailedThumbId}
|
||||
regenFailedFingerprintId={regenFailedFingerprintId}
|
||||
togglingTeaserId={togglingTeaserId}
|
||||
togglingTranscodeId={togglingTranscodeId}
|
||||
onToggleTeaser={() => handleToggleTeaser(d)}
|
||||
onRegenFailed={() => handleRegenFailed(d)}
|
||||
onRegenFailedThumbnails={() => handleRegenFailedThumbnails(d)}
|
||||
onRegenFailedFingerprints={() => handleRegenFailedFingerprints(d)}
|
||||
onStartTranscode={() => handleStartTranscode(d)}
|
||||
onStopTranscode={() => handleStopTranscode(d)}
|
||||
/>
|
||||
|
||||
<div className="admin-detail-card">
|
||||
|
||||
@@ -112,6 +112,12 @@ export type AdminDrive = {
|
||||
fingerprintReadyCount: number;
|
||||
fingerprintPendingCount: number;
|
||||
fingerprintFailedCount: number;
|
||||
// 浏览器兼容性转码:候选(待处理)/已转码/失败/检测后无需转码 计数与任务状态。
|
||||
transcodeGenerationStatus?: DriveGenerationStatus;
|
||||
transcodePendingCount: number;
|
||||
transcodeReadyCount: number;
|
||||
transcodeFailedCount: number;
|
||||
transcodeSkippedCount: number;
|
||||
};
|
||||
|
||||
export type DriveGenerationStatus = {
|
||||
@@ -449,6 +455,26 @@ export function regenFailedFingerprints(id: string) {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动开启某存储的浏览器兼容性转码(AVI/WMV 等浏览器播不动的视频转 H.264 MP4,
|
||||
* 产物上传回同一存储)。转码默认关闭、从不自动运行,这是唯一入口;
|
||||
* 任务处理完候选列表后自然结束。
|
||||
*/
|
||||
export function startDriveTranscode(id: string) {
|
||||
return request<{ ok: boolean; accepted: boolean; message?: string }>(
|
||||
`/drives/${encodeURIComponent(id)}/transcode/start`,
|
||||
{ method: "POST" }
|
||||
);
|
||||
}
|
||||
|
||||
/** 手动停止某存储正在进行的转码任务。 */
|
||||
export function stopDriveTranscode(id: string) {
|
||||
return request<{ ok: boolean; stopped: boolean }>(
|
||||
`/drives/${encodeURIComponent(id)}/transcode/stop`,
|
||||
{ method: "POST" }
|
||||
);
|
||||
}
|
||||
|
||||
// ---------- Videos ----------
|
||||
|
||||
export type AdminVideo = {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { PlayCircle, Power, PowerOff, RotateCcw } from "lucide-react";
|
||||
import { CircleStop, PlayCircle, Power, PowerOff, RotateCcw, Wand2 } from "lucide-react";
|
||||
import * as api from "../api";
|
||||
import { formatBytes } from "../storageFormat";
|
||||
import {
|
||||
@@ -163,20 +163,26 @@ export function DriveGenerationPanel({
|
||||
regenFailedThumbId,
|
||||
regenFailedFingerprintId,
|
||||
togglingTeaserId,
|
||||
togglingTranscodeId,
|
||||
onToggleTeaser,
|
||||
onRegenFailed,
|
||||
onRegenFailedThumbnails,
|
||||
onRegenFailedFingerprints,
|
||||
onStartTranscode,
|
||||
onStopTranscode,
|
||||
}: {
|
||||
d: api.AdminDrive;
|
||||
regenFailedId: string;
|
||||
regenFailedThumbId: string;
|
||||
regenFailedFingerprintId: string;
|
||||
togglingTeaserId: string;
|
||||
togglingTranscodeId: string;
|
||||
onToggleTeaser: () => void;
|
||||
onRegenFailed: () => void;
|
||||
onRegenFailedThumbnails: () => void;
|
||||
onRegenFailedFingerprints: () => void;
|
||||
onStartTranscode: () => void;
|
||||
onStopTranscode: () => void;
|
||||
}) {
|
||||
const canQueueThumbnails =
|
||||
(d.thumbnailFailedCount ?? 0) > 0 ||
|
||||
@@ -186,6 +192,12 @@ export function DriveGenerationPanel({
|
||||
(d.teaserFailedCount ?? 0) > 0 || (d.teaserPendingCount ?? 0) > 0;
|
||||
const canQueueFingerprints =
|
||||
(d.fingerprintFailedCount ?? 0) > 0 || (d.fingerprintPendingCount ?? 0) > 0;
|
||||
// 转码默认不运行,只能在这里手动开启/停止。
|
||||
// 候选 = 还没出结果的不兼容格式视频 + 上次失败的(重新开始会自动重试)。
|
||||
const transcodeRunning =
|
||||
(d.transcodeGenerationStatus?.state || "idle") !== "idle";
|
||||
const canStartTranscode =
|
||||
(d.transcodePendingCount ?? 0) > 0 || (d.transcodeFailedCount ?? 0) > 0;
|
||||
|
||||
return (
|
||||
<div className="admin-detail-card">
|
||||
@@ -235,6 +247,13 @@ export function DriveGenerationPanel({
|
||||
pending={d.fingerprintPendingCount}
|
||||
failed={d.fingerprintFailedCount}
|
||||
/>
|
||||
<DriveGenCol
|
||||
label="转码"
|
||||
status={d.transcodeGenerationStatus}
|
||||
ready={d.transcodeReadyCount}
|
||||
pending={d.transcodePendingCount}
|
||||
failed={d.transcodeFailedCount}
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="admin-detail-actions">
|
||||
@@ -262,6 +281,33 @@ export function DriveGenerationPanel({
|
||||
<RotateCcw size={13} />
|
||||
<span>{(d.fingerprintFailedCount ?? 0) > 0 ? "重试失败指纹" : "继续生成指纹"}</span>
|
||||
</button>
|
||||
{transcodeRunning ? (
|
||||
<button
|
||||
className="admin-btn is-stop"
|
||||
disabled={togglingTranscodeId === d.id}
|
||||
onClick={onStopTranscode}
|
||||
title="停止当前的转码任务。未处理的视频保持原状态,下次开始时继续。"
|
||||
>
|
||||
<CircleStop size={13} />
|
||||
<span>{togglingTranscodeId === d.id ? "停止中..." : "停止转码"}</span>
|
||||
</button>
|
||||
) : (
|
||||
<button
|
||||
className="admin-btn"
|
||||
disabled={!canStartTranscode || togglingTranscodeId === d.id}
|
||||
onClick={onStartTranscode}
|
||||
title="把浏览器播放不了的视频(AVI/WMV/RMVB、MPEG-4 等老格式)转码成 H.264 MP4 并上传回本存储。转码不会自动运行,只能在这里手动开启。"
|
||||
>
|
||||
<Wand2 size={13} />
|
||||
<span>
|
||||
{togglingTranscodeId === d.id
|
||||
? "开启中..."
|
||||
: (d.transcodeFailedCount ?? 0) > 0 && (d.transcodePendingCount ?? 0) === 0
|
||||
? "重试失败转码"
|
||||
: "开始转码"}
|
||||
</span>
|
||||
</button>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user