Files
91/backend/cmd/server/main.go
T
nianzhibai 3506328441 Add PikPak drive support
Add PikPak backend driver, fixed tag matching, cached transcode playback, fast cover handling, and LF normalization.
2026-05-10 23:55:04 +08:00

478 lines
12 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/video-site/backend/internal/api"
"github.com/video-site/backend/internal/auth"
"github.com/video-site/backend/internal/catalog"
"github.com/video-site/backend/internal/config"
"github.com/video-site/backend/internal/drives"
"github.com/video-site/backend/internal/drives/p115"
"github.com/video-site/backend/internal/drives/pikpak"
"github.com/video-site/backend/internal/drives/quark"
"github.com/video-site/backend/internal/drives/wopan"
"github.com/video-site/backend/internal/preview"
"github.com/video-site/backend/internal/proxy"
"github.com/video-site/backend/internal/scanner"
)
func main() {
cfgPath := "./config.yaml"
if v := os.Getenv("VIDEO_CONFIG"); v != "" {
cfgPath = v
}
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("load config: %v", err)
}
if err := os.MkdirAll(filepath.Dir(cfg.Storage.DBPath), 0o755); err != nil {
log.Fatalf("mkdir db dir: %v", err)
}
if err := os.MkdirAll(cfg.Storage.LocalPreviewDir, 0o755); err != nil {
log.Fatalf("mkdir preview dir: %v", err)
}
cat, err := catalog.Open(cfg.Storage.DBPath)
if err != nil {
log.Fatalf("open catalog: %v", err)
}
defer cat.Close()
app := &App{
cfg: cfg,
cat: cat,
registry: proxy.NewRegistry(),
workers: make(map[string]*preview.Worker),
thumbWorkers: make(map[string]*preview.ThumbWorker),
}
app.proxy = proxy.New(app.registry)
// 初始化现有 drives
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app.loadPreviewEnabled(ctx)
existing, err := cat.ListDrives(ctx)
if err != nil {
log.Fatalf("list drives: %v", err)
}
for _, d := range existing {
if err := app.attachDrive(ctx, d); err != nil {
log.Printf("[drive %s] attach failed: %v", d.ID, err)
}
}
authr := &auth.Authenticator{
Username: cfg.Server.Admin.Username,
Password: cfg.Server.Admin.Password,
Catalog: cat,
}
apiServer := &api.Server{
Catalog: cat,
Proxy: app.proxy,
LocalDir: cfg.Storage.LocalPreviewDir,
FFmpegPath: cfg.Preview.FFmpegPath,
}
adminServer := &api.AdminServer{
Catalog: cat,
Auth: authr,
OnDriveSaved: func(driveID string) error {
d, err := cat.GetDrive(ctx, driveID)
if err != nil {
return err
}
return app.attachDrive(ctx, d)
},
OnDriveRemoved: func(driveID string) {
app.detachDrive(driveID)
},
OnScanRequested: func(driveID string) {
go app.runScan(ctx, driveID)
},
OnRegenPreview: func(videoID string) {
go app.regenPreview(ctx, videoID)
},
GetPreviewEnabled: func() bool { return app.PreviewEnabled() },
SetPreviewEnabled: func(enabled bool) error {
return app.SetPreviewEnabled(ctx, enabled)
},
}
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(corsMiddleware)
apiServer.RegisterRoutes(r, authr)
adminServer.Register(r)
// 启动定时扫描
go app.scanLoop(ctx)
srv := &http.Server{
Addr: cfg.Server.Listen,
Handler: r,
}
go func() {
log.Printf("video-site backend listening on %s", cfg.Server.Listen)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("server error: %v", err)
}
}()
// 等待退出信号
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
log.Println("shutting down...")
shutCtx, shutCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutCancel()
_ = srv.Shutdown(shutCtx)
}
// ---------- App ----------
type App struct {
cfg *config.Config
cat *catalog.Catalog
registry *proxy.Registry
proxy *proxy.Proxy
mu sync.Mutex
workers map[string]*preview.Worker
thumbWorkers map[string]*preview.ThumbWorker
cancels map[string]context.CancelFunc
// 运行时 preview 开关(从 DB 读)
previewEnabled bool
}
// PreviewEnabled 线程安全读
func (a *App) PreviewEnabled() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.previewEnabled
}
// SetPreviewEnabled 切换开关,写库 + 若开启则立刻补扫 pending
func (a *App) SetPreviewEnabled(ctx context.Context, enabled bool) error {
a.mu.Lock()
a.previewEnabled = enabled
a.mu.Unlock()
val := "0"
if enabled {
val = "1"
}
if err := a.cat.SetSetting(ctx, "preview.enabled", val); err != nil {
return err
}
if enabled {
// 异步补扫所有盘
go func() {
for _, d := range a.registry.All() {
a.mu.Lock()
w := a.workers[d.ID()]
a.mu.Unlock()
if w != nil {
a.enqueuePending(ctx, d.ID(), w)
}
}
}()
}
return nil
}
// loadPreviewEnabled 从 DB 读运行时开关,首次启动取 config 默认值
func (a *App) loadPreviewEnabled(ctx context.Context) {
def := "0"
if a.cfg.Preview.Enabled {
def = "1"
}
v, err := a.cat.GetSetting(ctx, "preview.enabled", def)
if err != nil {
log.Printf("[preview] load setting: %v (fallback to config)", err)
a.mu.Lock()
a.previewEnabled = a.cfg.Preview.Enabled
a.mu.Unlock()
return
}
a.mu.Lock()
a.previewEnabled = v == "1"
a.mu.Unlock()
}
func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
var drv drives.Drive
switch d.Kind {
case "quark":
drv = quark.New(quark.Config{
ID: d.ID,
Cookie: d.Credentials["cookie"],
RootID: d.RootID,
OnCookieUpdate: func(cookie string) {
d.Credentials["cookie"] = cookie
_ = a.cat.UpsertDrive(ctx, d)
},
})
case "p115":
drv = p115.New(p115.Config{
ID: d.ID,
Cookie: d.Credentials["cookie"],
RootID: d.RootID,
})
case "pikpak":
drv = pikpak.New(pikpak.Config{
ID: d.ID,
Username: d.Credentials["username"],
Password: d.Credentials["password"],
Platform: d.Credentials["platform"],
RefreshToken: d.Credentials["refresh_token"],
AccessToken: d.Credentials["access_token"],
CaptchaToken: d.Credentials["captcha_token"],
DeviceID: d.Credentials["device_id"],
RootID: d.RootID,
DisableMediaLink: pikpak.ParseBoolDefault(d.Credentials["disable_media_link"], true),
OnTokenUpdate: func(access, refresh, captcha, deviceID string) {
d.Credentials["access_token"] = access
d.Credentials["refresh_token"] = refresh
d.Credentials["captcha_token"] = captcha
d.Credentials["device_id"] = deviceID
_ = a.cat.UpsertDrive(ctx, d)
},
})
case "wopan":
drv = wopan.New(wopan.Config{
ID: d.ID,
AccessToken: d.Credentials["access_token"],
RefreshToken: d.Credentials["refresh_token"],
FamilyID: d.Credentials["family_id"],
RootID: d.RootID,
OnTokenUpdate: func(access, refresh string) {
d.Credentials["access_token"] = access
d.Credentials["refresh_token"] = refresh
_ = a.cat.UpsertDrive(ctx, d)
},
})
default:
return fmt.Errorf("unknown drive kind: %s", d.Kind)
}
if err := drv.Init(ctx); err != nil {
d.Status = "error"
d.LastError = err.Error()
_ = a.cat.UpsertDrive(ctx, d)
return err
}
d.Status = "ok"
d.LastError = ""
_ = a.cat.UpsertDrive(ctx, d)
a.registry.Set(d.ID, drv)
// preview worker
gen := preview.New(preview.Config{
FFmpegPath: a.cfg.Preview.FFmpegPath,
FFprobePath: a.cfg.Preview.FFprobePath,
DurationSeconds: a.cfg.Preview.DurationSeconds,
Width: a.cfg.Preview.Width,
Segments: a.cfg.Preview.Segments,
LocalDir: a.cfg.Storage.LocalPreviewDir,
RemoteDir: a.cfg.Preview.RemoteDir,
})
worker := preview.NewWorker(gen, a.cat, drv, a.cfg.Preview.RemoteDir)
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
workerCtx, cancel := context.WithCancel(ctx)
go worker.Run(workerCtx)
go thumbWorker.Run(workerCtx)
a.mu.Lock()
if a.cancels == nil {
a.cancels = make(map[string]context.CancelFunc)
}
if old, ok := a.cancels[d.ID]; ok {
old()
}
a.workers[d.ID] = worker
a.thumbWorkers[d.ID] = thumbWorker
a.cancels[d.ID] = cancel
a.mu.Unlock()
return nil
}
func (a *App) enqueuePending(ctx context.Context, driveID string, w *preview.Worker) {
pending, err := a.cat.ListVideosByPreviewStatus(ctx, driveID, "pending", 0)
if err != nil {
log.Printf("[preview] list pending %s: %v", driveID, err)
return
}
if len(pending) == 0 {
return
}
log.Printf("[preview] enqueue %d pending videos for drive=%s", len(pending), driveID)
for _, v := range pending {
w.Enqueue(v)
}
}
func (a *App) enqueueThumbnails(ctx context.Context, driveID string, w *preview.ThumbWorker) {
pending, err := a.cat.ListVideosNeedingThumbnail(ctx, driveID, 0)
if err != nil {
log.Printf("[thumb] list pending %s: %v", driveID, err)
return
}
if len(pending) == 0 {
return
}
log.Printf("[thumb] enqueue %d missing thumbnails for drive=%s", len(pending), driveID)
for _, v := range pending {
w.Enqueue(v)
}
}
func (a *App) detachDrive(id string) {
a.registry.Remove(id)
a.mu.Lock()
if cancel, ok := a.cancels[id]; ok {
cancel()
delete(a.cancels, id)
}
delete(a.workers, id)
delete(a.thumbWorkers, id)
a.mu.Unlock()
}
func (a *App) runScan(ctx context.Context, driveID string) {
drv, ok := a.registry.Get(driveID)
if !ok {
log.Printf("[scan] drive %s not attached", driveID)
return
}
a.mu.Lock()
worker := a.workers[driveID]
thumbWorker := a.thumbWorkers[driveID]
a.mu.Unlock()
var onNew func(v *catalog.Video)
if thumbWorker != nil || (a.PreviewEnabled() && worker != nil) {
onNew = func(v *catalog.Video) {
if thumbWorker != nil && v.ThumbnailURL == "" {
thumbWorker.Enqueue(v)
}
if a.PreviewEnabled() && worker != nil {
worker.Enqueue(v)
}
}
}
sc := scanner.New(a.cat, drv, a.cfg.Scanner.VideoExtensions, a.cfg.Scanner.MaxDepth, onNew)
// 使用 drive 的 scan_root_id,否则 root_id
d, err := a.cat.GetDrive(ctx, driveID)
if err != nil {
log.Printf("[scan] get drive %s: %v", driveID, err)
return
}
startID := d.ScanRootID
if startID == "" {
startID = d.RootID
}
log.Printf("[scan] drive=%s start=%s", driveID, startID)
stats, err := sc.Run(ctx, startID)
if err != nil {
log.Printf("[scan] drive=%s error: %v", driveID, err)
return
}
log.Printf("[scan] drive=%s done scanned=%d added=%d", driveID, stats.Scanned, stats.Added)
if thumbWorker != nil {
a.enqueueThumbnails(ctx, driveID, thumbWorker)
}
if a.PreviewEnabled() && worker != nil {
go a.enqueuePending(ctx, driveID, worker)
}
}
func (a *App) regenPreview(ctx context.Context, videoID string) {
v, err := a.cat.GetVideo(ctx, videoID)
if err != nil {
return
}
a.mu.Lock()
worker := a.workers[v.DriveID]
a.mu.Unlock()
if worker != nil {
worker.Enqueue(v)
}
}
func (a *App) scanLoop(ctx context.Context) {
// 启动后立刻扫一次
a.scanAllOnce(ctx)
if a.cfg.Scanner.IntervalSeconds <= 0 {
return
}
ticker := time.NewTicker(time.Duration(a.cfg.Scanner.IntervalSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
a.scanAllOnce(ctx)
}
}
}
func (a *App) scanAllOnce(ctx context.Context) {
for _, d := range a.registry.All() {
a.runScan(ctx, d.ID())
}
}
// ---------- middleware ----------
func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", originOr(r, "*"))
w.Header().Set("Access-Control-Allow-Credentials", "true")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func originOr(r *http.Request, fallback string) string {
if o := r.Header.Get("Origin"); o != "" {
return o
}
return fallback
}