mirror of
https://github.com/nianzhibai/91.git
synced 2026-06-15 08:45:41 +08:00
Fix 115 teaser generation stability
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
- 管理后台支持网盘管理、视频管理、标签管理和运行时 Teaser 生成开关。
|
||||
- 视频管理支持按网盘筛选、每页 100 条分页、每个网盘的 Teaser 已生成/待生成/失败统计、单条或全量重生 teaser、编辑标题/作者/分类/标签等元数据。
|
||||
- 标签管理支持创建标签并自动分类已有视频;内置规则会把常见番号污染归并到 `AV` 等系统标签,降低标签列表噪声。
|
||||
- 115 生成 teaser 时会顺序取链并分段生成,降低 CDN 403 / WAF 风控导致的大量失败概率;遇到疑似风控会进入冷却并保留任务为 `pending`。
|
||||
|
||||
## 快速开始
|
||||
|
||||
@@ -110,6 +111,17 @@ git add vendor/ # 入库
|
||||
| 沃盘 | `access_token`、`refresh_token`、可选 `family_id` | 第一版只能手动粘贴 token;后续会加扫码/短信登录 |
|
||||
| OneDrive | `refresh_token`,可选 `access_token`、`api_url_address`、`region`、`is_sharepoint`、`site_id` | 按 OpenList 默认方式调用 `https://api.oplist.org/onedrive/renewapi` 在线刷新 token;`rootId` / `scanRootId` 默认填 `root`,SharePoint 需填 `is_sharepoint=true` 和 `site_id` |
|
||||
|
||||
### 115 说明
|
||||
|
||||
115 的下载直链对同一个 CDN URL 的多段随机读取比较敏感,尤其是大文件生成多段 teaser 时,容易出现 `403 Forbidden`、WAF 阻断、`moov atom not found` 或 `partial file`。后端对 115 做了专门处理:
|
||||
|
||||
- 取流优先使用移动端下载接口,失败再回退到原 chrome 下载接口。
|
||||
- 生成 teaser 时不再让 ffmpeg 同时打开多个 115 直链;每个 3 秒片段会单独取链、单独生成本地小片段,最后在本地 concat。
|
||||
- ffmpeg 访问 115 CDN 时会经过进程内本地代理转发 Range 请求,避免直接暴露签名 URL,并统一处理必要请求头。
|
||||
- 如果 115 返回 403 / 405 / WAF 阻断 / `moov atom not found` / `partial file` 等疑似临时风控错误,当前网盘的 teaser worker 会进入默认 30 分钟冷却,当前任务保持 `pending`,避免继续请求导致更多失败。
|
||||
|
||||
管理后台的“重生失败 teaser”会把 `failed` 重置为 `pending` 并入队。一次性重生大量 115 视频仍可能触发上游风控;建议点一次后观察日志,如果出现 `transient media source error until=...`,等待冷却结束再继续,不要反复点击。
|
||||
|
||||
### PikPak 速度说明
|
||||
|
||||
PikPak 的 `disable_media_link` 默认按 `true` 处理,会使用 `web_content_link` 原始下载链接;在当前服务器实测,单连接通常只有约 2.8-3 MiB/s。把该字段设置为 `false` 后,后端会改用 `usage=CACHE` 返回的 media/cache 链接,当前服务器实测 `/p/stream` 64 MiB Range 可到约 8.9 MiB/s。
|
||||
@@ -128,7 +140,9 @@ OneDrive 当前采用 OpenList 在线 API 的续期方式,不要求用户提
|
||||
- 极短视频会按可容纳的完整 3 秒片段数自动降级
|
||||
- 首次失败的任务标 `preview_status = failed`,不再自动重试;管理后台可手动重新生成
|
||||
- 服务启动或网盘重新挂载时,如果 Teaser 开关已开启,会自动把历史 `pending` 任务重新入队,避免重启后停在“待生成”。
|
||||
- 115 使用顺序分段生成:每段独立取链、独立转码,最后本地拼接,避免同一 115 CDN 链接被多输入并发读取。
|
||||
- OneDrive 直链生成 teaser 时可能触发 Microsoft 429 限流;后端会识别这类错误并让当前网盘进入冷却期,保留任务为 `pending`,避免连续请求触发更严重限流。
|
||||
- 115 直链生成 teaser 时如果触发 403 / WAF / 截断数据等临时错误,也会让当前网盘进入冷却期,保留任务为 `pending`。
|
||||
- 详见 plan 15.12 节
|
||||
|
||||
## 常用管理能力
|
||||
|
||||
@@ -92,7 +92,7 @@ func (d *Driver) StreamURL(ctx context.Context, fileID string) (*drives.StreamLi
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("115 get file: %w", err)
|
||||
}
|
||||
info, err := d.client.DownloadWithUA(f.PickCode, d.ua)
|
||||
info, ua, err := d.downloadInfo(f.PickCode)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("115 download url: %w", err)
|
||||
}
|
||||
@@ -101,13 +101,15 @@ func (d *Driver) StreamURL(ctx context.Context, fileID string) (*drives.StreamLi
|
||||
}
|
||||
|
||||
headers := http.Header{}
|
||||
headers.Set("User-Agent", d.ua)
|
||||
// 115 直链会返回一组 Cookie / Referer,info.Header 里带了
|
||||
for k, vs := range info.Header {
|
||||
for _, v := range vs {
|
||||
headers.Add(k, v)
|
||||
}
|
||||
}
|
||||
if headers.Get("User-Agent") == "" {
|
||||
headers.Set("User-Agent", ua)
|
||||
}
|
||||
|
||||
return &drives.StreamLink{
|
||||
URL: info.Url.Url,
|
||||
@@ -116,6 +118,27 @@ func (d *Driver) StreamURL(ctx context.Context, fileID string) (*drives.StreamLi
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *Driver) downloadInfo(pickCode string) (*sdk.DownloadInfo, string, error) {
|
||||
mobileUA := sdk.UAIosApp
|
||||
if info, err := d.client.DownloadWithUAByAndroidAPI(pickCode, mobileUA); err == nil {
|
||||
if info != nil && info.Url.Url != "" {
|
||||
return info, mobileUA, nil
|
||||
}
|
||||
} else {
|
||||
webInfo, webErr := d.client.DownloadWithUA(pickCode, d.ua)
|
||||
if webErr != nil {
|
||||
return nil, "", fmt.Errorf("android api: %v; chrome api: %w", err, webErr)
|
||||
}
|
||||
return webInfo, d.ua, nil
|
||||
}
|
||||
|
||||
info, err := d.client.DownloadWithUA(pickCode, d.ua)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
return info, d.ua, nil
|
||||
}
|
||||
|
||||
func (d *Driver) Upload(ctx context.Context, parentID, name string, r io.Reader, size int64) (string, error) {
|
||||
// 115 上传流程比较复杂:RapidUpload -> OSS 分片
|
||||
// 第一版 teaser 文件小(<2MB),直接读全量写 seeker,走 RapidUploadOrByOSS
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@@ -44,6 +46,10 @@ type TeaserGenerator interface {
|
||||
MoveToLocal(tmpPath, videoID string) (string, error)
|
||||
}
|
||||
|
||||
type refreshingTeaserGenerator interface {
|
||||
GenerateWithLinkProvider(ctx context.Context, first *drives.StreamLink, duration float64, refresh func(context.Context) (*drives.StreamLink, error)) (string, error)
|
||||
}
|
||||
|
||||
func New(cfg Config) *Generator {
|
||||
if cfg.FFmpegPath == "" {
|
||||
cfg.FFmpegPath = "ffmpeg"
|
||||
@@ -209,17 +215,20 @@ func (g *Generator) GenerateThumbnail(ctx context.Context, link *drives.StreamLi
|
||||
|
||||
ctx2, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
ffmpegLink, cleanup, err := prepareFFmpegLink(ctx2, link)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
args := []string{
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
"-ss", fmt.Sprintf("%.2f", offset),
|
||||
}
|
||||
if h := buildHeaders(link.Headers); h != "" {
|
||||
args = append(args, "-headers", h)
|
||||
}
|
||||
args = append(args, ffmpegHTTPInputOptions(ffmpegLink)...)
|
||||
args = append(args,
|
||||
"-i", link.URL,
|
||||
"-i", ffmpegLink.URL,
|
||||
"-frames:v", "1",
|
||||
"-vf", fmt.Sprintf("scale=%d:-2", g.cfg.Width),
|
||||
"-q:v", "3",
|
||||
@@ -245,6 +254,11 @@ func (g *Generator) GenerateThumbnail(ctx context.Context, link *drives.StreamLi
|
||||
func (g *Generator) Probe(ctx context.Context, link *drives.StreamLink) (float64, error) {
|
||||
ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
ffmpegLink, cleanup, err := prepareFFmpegLink(ctx2, link)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
args := []string{
|
||||
"-hide_banner",
|
||||
@@ -252,10 +266,8 @@ func (g *Generator) Probe(ctx context.Context, link *drives.StreamLink) (float64
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
}
|
||||
if h := buildHeaders(link.Headers); h != "" {
|
||||
args = append(args, "-headers", h)
|
||||
}
|
||||
args = append(args, link.URL)
|
||||
args = append(args, ffmpegHTTPInputOptions(ffmpegLink)...)
|
||||
args = append(args, ffmpegLink.URL)
|
||||
|
||||
cmd := exec.CommandContext(ctx2, g.cfg.FFprobePath, args...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
@@ -274,6 +286,21 @@ func (g *Generator) Probe(ctx context.Context, link *drives.StreamLink) (float64
|
||||
// Generate 拉取 teaser 到本地临时文件,返回路径。
|
||||
// 根据 Config.Segments 和视频时长决定是单段还是多段拼接。
|
||||
func (g *Generator) Generate(ctx context.Context, link *drives.StreamLink, duration float64) (string, error) {
|
||||
return g.generate(ctx, duration, func(int) (*drives.StreamLink, error) {
|
||||
return link, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (g *Generator) GenerateWithLinkProvider(ctx context.Context, first *drives.StreamLink, duration float64, refresh func(context.Context) (*drives.StreamLink, error)) (string, error) {
|
||||
return g.generateSequential(ctx, duration, func(index int) (*drives.StreamLink, error) {
|
||||
if index == 0 || refresh == nil {
|
||||
return first, nil
|
||||
}
|
||||
return refresh(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
func (g *Generator) generate(ctx context.Context, duration float64, linkForInput func(int) (*drives.StreamLink, error)) (string, error) {
|
||||
if err := os.MkdirAll(g.cfg.LocalDir, 0o755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -300,17 +327,31 @@ func (g *Generator) Generate(ctx context.Context, link *drives.StreamLink, durat
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
}
|
||||
headers := buildHeaders(link.Headers)
|
||||
|
||||
// 每段独立 -ss + -i,精确 seek 重新解码保证拼接帧准
|
||||
for _, s := range starts {
|
||||
if headers != "" {
|
||||
args = append(args, "-headers", headers)
|
||||
var cleanups []func()
|
||||
defer func() {
|
||||
for i := len(cleanups) - 1; i >= 0; i-- {
|
||||
cleanups[i]()
|
||||
}
|
||||
}()
|
||||
for i, s := range starts {
|
||||
link, err := linkForInput(i)
|
||||
if err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return "", err
|
||||
}
|
||||
ffmpegLink, cleanup, err := prepareFFmpegLink(ctx2, link)
|
||||
if err != nil {
|
||||
os.Remove(tmpPath)
|
||||
return "", err
|
||||
}
|
||||
cleanups = append(cleanups, cleanup)
|
||||
args = append(args, ffmpegHTTPInputOptions(ffmpegLink)...)
|
||||
args = append(args,
|
||||
"-ss", fmt.Sprintf("%.2f", s),
|
||||
"-t", fmt.Sprintf("%.2f", eachSec),
|
||||
"-i", link.URL,
|
||||
"-i", ffmpegLink.URL,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -318,7 +359,7 @@ func (g *Generator) Generate(ctx context.Context, link *drives.StreamLink, durat
|
||||
// 单段:无需 concat,直接缩放 + 无音
|
||||
args = append(args,
|
||||
"-an",
|
||||
"-vf", fmt.Sprintf("scale=%d:-2", g.cfg.Width),
|
||||
"-vf", fmt.Sprintf("scale=%d:-2,setsar=1", g.cfg.Width),
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-crf", "28",
|
||||
@@ -339,7 +380,7 @@ func (g *Generator) Generate(ctx context.Context, link *drives.StreamLink, durat
|
||||
filter.WriteString(";")
|
||||
}
|
||||
fmt.Fprintf(&filter,
|
||||
"[%d:v]scale=%d:-2,fade=t=in:st=0:d=%.2f,fade=t=out:st=%.2f:d=0.2[v%d]",
|
||||
"[%d:v]scale=%d:-2,setsar=1,fade=t=in:st=0:d=%.2f,fade=t=out:st=%.2f:d=0.2[v%d]",
|
||||
i, g.cfg.Width, fadeIn, fadeOutStart, i)
|
||||
}
|
||||
filter.WriteString(";")
|
||||
@@ -374,6 +415,273 @@ func (g *Generator) Generate(ctx context.Context, link *drives.StreamLink, durat
|
||||
return tmpPath, nil
|
||||
}
|
||||
|
||||
func (g *Generator) generateSequential(ctx context.Context, duration float64, linkForInput func(int) (*drives.StreamLink, error)) (string, error) {
|
||||
if err := os.MkdirAll(g.cfg.LocalDir, 0o755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
plan := buildTeaserPlan(g.cfg, duration)
|
||||
starts := plan.starts
|
||||
eachSec := plan.eachSec
|
||||
if len(starts) == 0 {
|
||||
return "", fmt.Errorf("video too short for %.0fs teaser segment", eachSec)
|
||||
}
|
||||
|
||||
ctx2, cancel := context.WithTimeout(ctx, 4*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
segmentPaths := make([]string, 0, len(starts))
|
||||
success := false
|
||||
defer func() {
|
||||
if success {
|
||||
return
|
||||
}
|
||||
for _, p := range segmentPaths {
|
||||
_ = os.Remove(p)
|
||||
}
|
||||
}()
|
||||
|
||||
for i, start := range starts {
|
||||
seg, err := g.generateSingleSegment(ctx2, i, start, eachSec, linkForInput)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
segmentPaths = append(segmentPaths, seg)
|
||||
}
|
||||
|
||||
if len(segmentPaths) == 1 {
|
||||
success = true
|
||||
return segmentPaths[0], nil
|
||||
}
|
||||
|
||||
tmp, err := os.CreateTemp(g.cfg.LocalDir, "teaser-*.mp4")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
tmp.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
|
||||
list, err := os.CreateTemp(g.cfg.LocalDir, "teaser-concat-*.txt")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
listPath := list.Name()
|
||||
for _, p := range segmentPaths {
|
||||
if _, err := fmt.Fprintf(list, "file '%s'\n", escapeConcatPath(p)); err != nil {
|
||||
list.Close()
|
||||
_ = os.Remove(listPath)
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
if err := list.Close(); err != nil {
|
||||
_ = os.Remove(listPath)
|
||||
return "", err
|
||||
}
|
||||
defer os.Remove(listPath)
|
||||
|
||||
args := []string{
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
"-f", "concat",
|
||||
"-safe", "0",
|
||||
"-i", listPath,
|
||||
"-c", "copy",
|
||||
"-movflags", "+faststart",
|
||||
"-y", tmpPath,
|
||||
}
|
||||
out, err := exec.CommandContext(ctx2, g.cfg.FFmpegPath, args...).CombinedOutput()
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpPath)
|
||||
return "", ffmpegCommandError("ffmpeg concat", err, out)
|
||||
}
|
||||
if info, statErr := os.Stat(tmpPath); statErr != nil || info.Size() == 0 {
|
||||
_ = os.Remove(tmpPath)
|
||||
return "", fmt.Errorf("ffmpeg concat produced empty file, stderr: %s", string(out))
|
||||
}
|
||||
|
||||
for _, p := range segmentPaths {
|
||||
_ = os.Remove(p)
|
||||
}
|
||||
success = true
|
||||
return tmpPath, nil
|
||||
}
|
||||
|
||||
func (g *Generator) generateSingleSegment(ctx context.Context, index int, start, eachSec float64, linkForInput func(int) (*drives.StreamLink, error)) (string, error) {
|
||||
link, err := linkForInput(index)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ffmpegLink, cleanup, err := prepareFFmpegLink(ctx, link)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
seg, err := os.CreateTemp(g.cfg.LocalDir, fmt.Sprintf("teaser-seg-%d-*.mp4", index))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
segPath := seg.Name()
|
||||
seg.Close()
|
||||
|
||||
fadeIn := 0.2
|
||||
fadeOutStart := eachSec - 0.2
|
||||
if fadeOutStart < 0 {
|
||||
fadeOutStart = 0
|
||||
}
|
||||
filter := fmt.Sprintf("scale=%d:-2,setsar=1,fade=t=in:st=0:d=%.2f,fade=t=out:st=%.2f:d=0.2", g.cfg.Width, fadeIn, fadeOutStart)
|
||||
|
||||
args := []string{
|
||||
"-hide_banner",
|
||||
"-loglevel", "error",
|
||||
}
|
||||
args = append(args, ffmpegHTTPInputOptions(ffmpegLink)...)
|
||||
args = append(args,
|
||||
"-ss", fmt.Sprintf("%.2f", start),
|
||||
"-t", fmt.Sprintf("%.2f", eachSec),
|
||||
"-i", ffmpegLink.URL,
|
||||
"-an",
|
||||
"-vf", filter,
|
||||
"-c:v", "libx264",
|
||||
"-preset", "veryfast",
|
||||
"-crf", "28",
|
||||
"-movflags", "+faststart",
|
||||
"-y", segPath,
|
||||
)
|
||||
out, err := exec.CommandContext(ctx, g.cfg.FFmpegPath, args...).CombinedOutput()
|
||||
if err != nil {
|
||||
_ = os.Remove(segPath)
|
||||
return "", ffmpegCommandError("ffmpeg segment", err, out)
|
||||
}
|
||||
if info, statErr := os.Stat(segPath); statErr != nil || info.Size() == 0 {
|
||||
_ = os.Remove(segPath)
|
||||
return "", fmt.Errorf("ffmpeg segment produced empty file, stderr: %s", string(out))
|
||||
}
|
||||
return segPath, nil
|
||||
}
|
||||
|
||||
func escapeConcatPath(path string) string {
|
||||
if abs, err := filepath.Abs(path); err == nil {
|
||||
path = abs
|
||||
}
|
||||
return strings.ReplaceAll(path, "'", "'\\''")
|
||||
}
|
||||
|
||||
func prepareFFmpegLink(ctx context.Context, link *drives.StreamLink) (*drives.StreamLink, func(), error) {
|
||||
if link == nil {
|
||||
return nil, func() {}, errors.New("missing stream link")
|
||||
}
|
||||
if !shouldProxyFFmpegLink(link) {
|
||||
return link, func() {}, nil
|
||||
}
|
||||
return startLocalFFmpegProxy(ctx, link)
|
||||
}
|
||||
|
||||
func shouldProxyFFmpegLink(link *drives.StreamLink) bool {
|
||||
if link == nil {
|
||||
return false
|
||||
}
|
||||
raw := strings.ToLower(link.URL)
|
||||
if !strings.HasPrefix(raw, "http://") && !strings.HasPrefix(raw, "https://") {
|
||||
return false
|
||||
}
|
||||
if strings.Contains(raw, "115cdn") {
|
||||
return true
|
||||
}
|
||||
return strings.Contains(strings.ToLower(link.Headers.Get("User-Agent")), "115")
|
||||
}
|
||||
|
||||
func startLocalFFmpegProxy(ctx context.Context, link *drives.StreamLink) (*drives.StreamLink, func(), error) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
client := &http.Client{Timeout: 0}
|
||||
srv := &http.Server{
|
||||
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/stream" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if r.Method != http.MethodGet && r.Method != http.MethodHead {
|
||||
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequestWithContext(r.Context(), r.Method, link.URL, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
for k, vs := range link.Headers {
|
||||
for _, v := range vs {
|
||||
req.Header.Add(k, v)
|
||||
}
|
||||
}
|
||||
if rng := r.Header.Get("Range"); rng != "" {
|
||||
req.Header.Set("Range", rng)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
for _, k := range []string{
|
||||
"Content-Type", "Content-Length", "Content-Range",
|
||||
"Accept-Ranges", "Last-Modified", "Etag",
|
||||
} {
|
||||
if v := resp.Header.Get(k); v != "" {
|
||||
w.Header().Set(k, v)
|
||||
}
|
||||
}
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if r.Method != http.MethodHead {
|
||||
_, _ = io.Copy(w, resp.Body)
|
||||
}
|
||||
}),
|
||||
}
|
||||
go func() {
|
||||
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Printf("[preview] local ffmpeg proxy: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
var once sync.Once
|
||||
cleanup := func() {
|
||||
once.Do(func() {
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
_ = srv.Shutdown(shutdownCtx)
|
||||
})
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
cleanup()
|
||||
}()
|
||||
|
||||
proxied := *link
|
||||
proxied.URL = "http://" + ln.Addr().String() + "/stream"
|
||||
proxied.Headers = nil
|
||||
return &proxied, cleanup, nil
|
||||
}
|
||||
|
||||
func ffmpegHTTPInputOptions(link *drives.StreamLink) []string {
|
||||
if link == nil {
|
||||
return nil
|
||||
}
|
||||
var args []string
|
||||
if ua := strings.TrimSpace(link.Headers.Get("User-Agent")); ua != "" {
|
||||
args = append(args, "-user_agent", ua)
|
||||
}
|
||||
if h := buildHeaders(link.Headers); h != "" {
|
||||
args = append(args, "-headers", h)
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
func ffmpegCommandError(tool string, err error, output []byte) error {
|
||||
msg := fmt.Sprintf("%s: %v, stderr: %s", tool, err, redactURLs(string(output)))
|
||||
wrapped := errors.New(msg)
|
||||
@@ -629,6 +937,18 @@ func (w *Worker) pauseForRateLimit(err error, step, title string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *Worker) pauseForRecoverableError(err error, step, title string) bool {
|
||||
if w.pauseForRateLimit(err, step, title) {
|
||||
return true
|
||||
}
|
||||
if !driveErrorShouldCooldown(w.Drive, err) {
|
||||
return false
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
log.Printf("[preview] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) skipIfRateLimited(v *catalog.Video) bool {
|
||||
until, ok, shouldLog := w.rateLimit.active(time.Now())
|
||||
if !ok {
|
||||
@@ -653,6 +973,34 @@ func (w *ThumbWorker) pauseForRateLimit(err error, step, title string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) pauseForRecoverableError(err error, step, title string) bool {
|
||||
if w.pauseForRateLimit(err, step, title) {
|
||||
return true
|
||||
}
|
||||
if !driveErrorShouldCooldown(w.Drive, err) {
|
||||
return false
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
log.Printf("[thumb] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
|
||||
func driveErrorShouldCooldown(d drives.Drive, err error) bool {
|
||||
if d == nil || err == nil || d.Kind() != "p115" {
|
||||
return false
|
||||
}
|
||||
text := strings.ToLower(err.Error())
|
||||
return strings.Contains(text, "server returned 403") ||
|
||||
strings.Contains(text, "403 forbidden") ||
|
||||
strings.Contains(text, "server returned 405") ||
|
||||
strings.Contains(text, "405 method") ||
|
||||
strings.Contains(text, "access denied") ||
|
||||
strings.Contains(text, "moov atom not found") ||
|
||||
strings.Contains(text, "partial file") ||
|
||||
strings.Contains(text, "request has been blocked") ||
|
||||
strings.Contains(text, "访问被阻断")
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
|
||||
if w.skipIfRateLimited(v) {
|
||||
return
|
||||
@@ -662,7 +1010,7 @@ func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
|
||||
if localLink, ok := localPreviewLink(v); ok {
|
||||
link = localLink
|
||||
} else {
|
||||
if w.pauseForRateLimit(err, "streamURL", v.Title) {
|
||||
if w.pauseForRecoverableError(err, "streamURL", v.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] streamURL %s: %v", v.Title, err)
|
||||
@@ -676,7 +1024,7 @@ func (w *ThumbWorker) process(ctx context.Context, v *catalog.Video) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if w.pauseForRateLimit(err, "generate", v.Title) {
|
||||
if w.pauseForRecoverableError(err, "generate", v.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[thumb] generate %s: %v", v.Title, err)
|
||||
@@ -732,7 +1080,7 @@ func (w *Worker) process(ctx context.Context, v *catalog.Video) {
|
||||
}
|
||||
link, err := w.Drive.StreamURL(ctx, v.FileID)
|
||||
if err != nil {
|
||||
if w.pauseForRateLimit(err, "streamURL", v.Title) {
|
||||
if w.pauseForRecoverableError(err, "streamURL", v.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[preview] streamURL %s: %v", v.Title, err)
|
||||
@@ -748,15 +1096,15 @@ func (w *Worker) process(ctx context.Context, v *catalog.Video) {
|
||||
_ = w.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{
|
||||
DurationSeconds: int(dur),
|
||||
})
|
||||
} else if err != nil && w.pauseForRateLimit(err, "probe", v.Title) {
|
||||
} else if err != nil && w.pauseForRecoverableError(err, "probe", v.Title) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 2) teaser
|
||||
tmp, err := w.Gen.Generate(ctx, link, duration)
|
||||
tmp, err := w.generateTeaser(ctx, v, link, duration)
|
||||
if err != nil {
|
||||
if w.pauseForRateLimit(err, "generate", v.Title) {
|
||||
if w.pauseForRecoverableError(err, "generate", v.Title) {
|
||||
return
|
||||
}
|
||||
log.Printf("[preview] generate %s: %v", v.Title, err)
|
||||
@@ -775,6 +1123,16 @@ func (w *Worker) process(ctx context.Context, v *catalog.Video) {
|
||||
log.Printf("[preview] ready %s (duration=%.1fs)", v.Title, duration)
|
||||
}
|
||||
|
||||
func (w *Worker) generateTeaser(ctx context.Context, v *catalog.Video, link *drives.StreamLink, duration float64) (string, error) {
|
||||
gen, ok := w.Gen.(refreshingTeaserGenerator)
|
||||
if !ok || w.Drive == nil || w.Drive.Kind() != "p115" {
|
||||
return w.Gen.Generate(ctx, link, duration)
|
||||
}
|
||||
return gen.GenerateWithLinkProvider(ctx, link, duration, func(ctx context.Context) (*drives.StreamLink, error) {
|
||||
return w.Drive.StreamURL(ctx, v.FileID)
|
||||
})
|
||||
}
|
||||
|
||||
func removePreviousLocalTeaser(previous, current string) {
|
||||
if previous == "" {
|
||||
return
|
||||
@@ -795,6 +1153,9 @@ func buildHeaders(h map[string][]string) string {
|
||||
}
|
||||
var sb strings.Builder
|
||||
for k, vs := range h {
|
||||
if strings.EqualFold(k, "User-Agent") {
|
||||
continue
|
||||
}
|
||||
for _, v := range vs {
|
||||
sb.WriteString(k)
|
||||
sb.WriteString(": ")
|
||||
|
||||
@@ -3,6 +3,7 @@ package preview
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -111,3 +112,34 @@ func TestFFmpegCommandErrorRedactsSignedURLs(t *testing.T) {
|
||||
t.Fatalf("error = %q, want redacted URL with punctuation preserved", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFmpegHTTPInputOptionsUsesDedicatedUserAgent(t *testing.T) {
|
||||
link := &drives.StreamLink{
|
||||
URL: "https://download.example/video.mp4",
|
||||
Headers: http.Header{
|
||||
"User-Agent": {"Mozilla/5.0 115Browser/27.0.5.7"},
|
||||
"Cookie": {"UID=redacted"},
|
||||
},
|
||||
}
|
||||
|
||||
args := ffmpegHTTPInputOptions(link)
|
||||
joined := strings.Join(args, "\n")
|
||||
if !strings.Contains(joined, "-user_agent\nMozilla/5.0 115Browser/27.0.5.7") {
|
||||
t.Fatalf("args = %#v, want dedicated ffmpeg user_agent option", args)
|
||||
}
|
||||
if strings.Contains(joined, "User-Agent:") {
|
||||
t.Fatalf("args = %#v, user agent should not be duplicated in raw headers", args)
|
||||
}
|
||||
if !strings.Contains(joined, "Cookie: UID=redacted") {
|
||||
t.Fatalf("args = %#v, want cookie preserved in raw headers", args)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldProxy115FFmpegLinks(t *testing.T) {
|
||||
if !shouldProxyFFmpegLink(&drives.StreamLink{URL: "https://cdnfhnfile.115cdn.net/file.mp4"}) {
|
||||
t.Fatal("115 CDN link should use local ffmpeg proxy")
|
||||
}
|
||||
if shouldProxyFFmpegLink(&drives.StreamLink{URL: "https://download.example/file.mp4"}) {
|
||||
t.Fatal("generic link should not use local ffmpeg proxy")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,6 +216,52 @@ func TestPreviewWorkerRateLimitLeavesCurrentPendingAndSkipsNextVideo(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreviewWorkerP115TransientErrorKeepsVideoPending(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, video := seedPreviewTestVideo(t, "preview-p115-transient")
|
||||
|
||||
gen := &fakeTeaserGenerator{
|
||||
generateErr: errors.New("ffmpeg: exit status 1, stderr: Server returned 403 Forbidden"),
|
||||
}
|
||||
drv := &previewFakeDrive{kind: "p115"}
|
||||
worker := NewWorker(gen, cat, drv, "")
|
||||
|
||||
worker.process(ctx, video)
|
||||
|
||||
got, err := cat.GetVideo(ctx, video.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("get video: %v", err)
|
||||
}
|
||||
if got.PreviewStatus != "pending" {
|
||||
t.Fatalf("preview status = %q, want pending for transient 115 media error", got.PreviewStatus)
|
||||
}
|
||||
if gen.generateCalls != 1 {
|
||||
t.Fatalf("generate calls = %d, want 1", gen.generateCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreviewWorkerRefreshesP115LinksPerTeaserInput(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cat, video := seedPreviewTestVideo(t, "preview-p115-refresh")
|
||||
video.DurationSeconds = 81
|
||||
if err := cat.UpsertVideo(ctx, video); err != nil {
|
||||
t.Fatalf("update video: %v", err)
|
||||
}
|
||||
|
||||
gen := &fakeTeaserGenerator{}
|
||||
drv := &previewFakeDrive{kind: "p115"}
|
||||
worker := NewWorker(gen, cat, drv, "")
|
||||
|
||||
worker.process(ctx, video)
|
||||
|
||||
if gen.refreshCalls != 3 {
|
||||
t.Fatalf("refresh calls = %d, want 3 extra links for a four-input p115 teaser", gen.refreshCalls)
|
||||
}
|
||||
if drv.streamCalls != 4 {
|
||||
t.Fatalf("stream calls = %d, want initial link plus 3 refreshed links", drv.streamCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func seedPreviewTestVideo(t *testing.T, id string) (*catalog.Catalog, *catalog.Video) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
@@ -268,6 +314,7 @@ type fakeTeaserGenerator struct {
|
||||
localPath string
|
||||
generateErr error
|
||||
generateCalls int
|
||||
refreshCalls int
|
||||
}
|
||||
|
||||
func (g *fakeTeaserGenerator) Probe(context.Context, *drives.StreamLink) (float64, error) {
|
||||
@@ -282,6 +329,16 @@ func (g *fakeTeaserGenerator) Generate(context.Context, *drives.StreamLink, floa
|
||||
return "/tmp/source-teaser.mp4", nil
|
||||
}
|
||||
|
||||
func (g *fakeTeaserGenerator) GenerateWithLinkProvider(ctx context.Context, first *drives.StreamLink, duration float64, refresh func(context.Context) (*drives.StreamLink, error)) (string, error) {
|
||||
for i := 0; i < 3; i++ {
|
||||
if _, err := refresh(ctx); err != nil {
|
||||
return "", err
|
||||
}
|
||||
g.refreshCalls++
|
||||
}
|
||||
return g.Generate(ctx, first, duration)
|
||||
}
|
||||
|
||||
func (g *fakeTeaserGenerator) MoveToLocal(_ string, videoID string) (string, error) {
|
||||
if g.localPath != "" {
|
||||
return g.localPath, nil
|
||||
@@ -290,14 +347,21 @@ func (g *fakeTeaserGenerator) MoveToLocal(_ string, videoID string) (string, err
|
||||
}
|
||||
|
||||
type previewFakeDrive struct {
|
||||
kind string
|
||||
streamFileID string
|
||||
streamCalls int
|
||||
streamErr error
|
||||
ensureDirCalls int
|
||||
uploadCalls int
|
||||
}
|
||||
|
||||
func (d *previewFakeDrive) Kind() string { return "fake" }
|
||||
func (d *previewFakeDrive) ID() string { return "drive-id" }
|
||||
func (d *previewFakeDrive) Kind() string {
|
||||
if d.kind != "" {
|
||||
return d.kind
|
||||
}
|
||||
return "fake"
|
||||
}
|
||||
func (d *previewFakeDrive) ID() string { return "drive-id" }
|
||||
func (d *previewFakeDrive) Init(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@@ -308,6 +372,7 @@ func (d *previewFakeDrive) Stat(context.Context, string) (*drives.Entry, error)
|
||||
return nil, drives.ErrNotSupported
|
||||
}
|
||||
func (d *previewFakeDrive) StreamURL(_ context.Context, fileID string) (*drives.StreamLink, error) {
|
||||
d.streamCalls++
|
||||
d.streamFileID = fileID
|
||||
if d.streamErr != nil {
|
||||
return nil, d.streamErr
|
||||
|
||||
Reference in New Issue
Block a user