mirror of
https://github.com/nianzhibai/91.git
synced 2026-06-15 08:45:41 +08:00
feat(spider91): 流式爬取 + 完成后统一入队 teaser + 封面失败标 failed
三件相关改动,主题都是 spider91 爬虫流程。
1. 流式爬取协议(取代旧的 "Python 凑齐 15 个再交 Go" 模型)
Python 端 (spider_91porn.py):
- 新增 --stream-output flag。开启后每解析出一个 video 直链就把
entry 作为一行 JSON 写到 stdout 并 flush。
- log() 在 stream 模式下走 stderr,避免污染 stdout JSONL 协议。
- --output FILE 仍生效,作离线归档用。
Go 端 (crawler.go):
- 新 startSpiderTargetNew() 异步启动 cmd,返回 stdout pipe。
- RunOnce 用 bufio.Scanner 按行读 stdout,每行解析后立即 processOne
(下载视频 + 封面 + UpsertVideo)。删掉旧 readSpiderOutput / 全 JSON
文件解析路径。
- Python stderr 转发到 backend log,前缀 [spider91:py]。
收益:Python 翻页找下一个 viewkey 与 Go 下载当前视频在时间上重叠,
最大化每条签名链接 e= 时间窗。今天观察到 Python 77 秒就找完 15 个
viewkey 全部 emit;如果还像旧模型那样要等 Go 串行下完才开始下一个,
后面几个的签名很容易过期(之前 8/15 全 EOF 的根因之一)。
2. teaser 在 crawler 完成后统一入队(取代每条入库立即 enqueue)
- main.go attachSpider91Crawler 不再注入 OnNewVideo callback。
- main.go runSpider91Crawl 在 Crawler.RunOnce 完成后调一次
enqueueDriveGeneration(driveID),让所有新视频统一进 teaser worker。
- 与 nightly Phase 2 的 "等 teaser 队列 idle" 语义自然对齐。
- 下载阶段不和 ffmpeg 抢 CPU/IO。
3. 网站封面下载失败时显式标 thumbnail_status='failed'
spider91 drive 的 thumb worker 按设计不处理 spider91 视频(封面应是
网站原图直接保存)。当网站封面下载失败时,url='' + status='pending'
会让 enqueueDriveGeneration 的 waitForThumbnailsBeforePreview 因为
CountVideosNeedingThumbnail > 0 把 teaser 卡死等待循环。
修复:crawler.go processOne 中 thumb 失败分支显式标 status='failed'
(CountVideosNeedingThumbnail 条件 status != 'failed' 会排除)。
今天观察到的现象:187 MB 视频 c2c04fc8602c5396d469 卡在
'[preview] waiting for 1 thumbnails before teaser generation'
循环 35 分钟。
测试:
- crawler_test.go 重构为 buildFakeSpiderScript helper,
生成支持 --stream-output 的伪 python(其实是 sh),逐行 echo JSON。
- TestCrawlerRunOnceFullFlow / TestCrawlerThumbDownloadFailureMarksStatusFailed
通过新 helper 验证流式协议 + thumb fail 闸门。
go test ./... 全绿;线上手动触发 spider91 抓取验证流式行为正确。
This commit is contained in:
@@ -29,3 +29,6 @@ tsconfig.tsbuildinfo
|
||||
91porn_videos.json
|
||||
91VideoSpider/91porn_videos.json
|
||||
91VideoSpider/data/
|
||||
91VideoSpider/__pycache__/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
@@ -138,11 +138,18 @@ class Porn91Spider:
|
||||
quiet: bool = False,
|
||||
target_new: int = None,
|
||||
seen_viewkeys: list = None,
|
||||
stream_output: bool = False,
|
||||
):
|
||||
"""
|
||||
构造函数。所有参数都有默认值,等同于使用脚本顶部的全局配置。
|
||||
backend 调用时会传 output_file/seen_viewkeys/target_new,等价于:
|
||||
"从第 1 页开始爬,跳过 seen_viewkeys 里的视频,凑够 target_new 个新视频后停止"
|
||||
|
||||
stream_output=True 时(backend 流水线用):
|
||||
- 每凑齐一个 video 直链就把该 entry 作为一行 JSON 写到 stdout 并 flush,
|
||||
便于上层(Go crawler)边读边下载,不再等所有详情页处理完。
|
||||
- 所有日志改走 stderr,避免与 stdout JSONL 流混合。
|
||||
- --output 仍生效,作为离线归档用(脚本退出时一次性写完整 JSON)。
|
||||
"""
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update(HEADERS)
|
||||
@@ -160,6 +167,10 @@ class Porn91Spider:
|
||||
# target_new 是 backend 触发时的核心模式:累计处理这么多新 viewkey 后退出。
|
||||
self.target_new = target_new if target_new and target_new > 0 else None
|
||||
self.quiet = bool(quiet)
|
||||
# stream_output:每解析出一个 video 直链立即输出一行 JSON 到 stdout
|
||||
# (配合 backend Go 端 bufio.Scanner 实时消费,下载一个就开始下一个)。
|
||||
# 开启后所有 log 都走 stderr。
|
||||
self.stream_output = bool(stream_output)
|
||||
|
||||
# 添加重试适配器
|
||||
try:
|
||||
@@ -212,9 +223,25 @@ class Porn91Spider:
|
||||
pass
|
||||
|
||||
def log(self, message: str):
|
||||
"""带时间戳的日志输出"""
|
||||
"""带时间戳的日志输出。stream_output 模式下走 stderr,避免污染 stdout JSONL。"""
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{timestamp}] {message}")
|
||||
line = f"[{timestamp}] {message}"
|
||||
if self.stream_output:
|
||||
print(line, file=sys.stderr, flush=True)
|
||||
else:
|
||||
print(line)
|
||||
|
||||
def emit_stream_video(self, video: dict):
|
||||
"""stream_output 模式下把单条 video entry 作为一行 JSON 写到 stdout 并立即刷盘。
|
||||
Go 端 bufio.Scanner 按行读取,每收到一行就立即下载视频和封面。"""
|
||||
if not self.stream_output:
|
||||
return
|
||||
try:
|
||||
print(json.dumps(video, ensure_ascii=False), flush=True)
|
||||
except Exception as e:
|
||||
# stdout 异常基本只在管道断开时发生(消费方进程死了);
|
||||
# 写到 stderr 让 backend 看到,然后让 crawl 循环自己 break。
|
||||
print(f"[stream] emit failed: {e}", file=sys.stderr, flush=True)
|
||||
|
||||
def random_sleep(self, min_sec: float, max_sec: float):
|
||||
"""随机延时,模拟人类行为"""
|
||||
@@ -522,6 +549,8 @@ class Porn91Spider:
|
||||
self.skip_viewkeys.add(video['viewkey'])
|
||||
self.processed_videos += 1
|
||||
self.log(f" [OK] 成功提取视频直链")
|
||||
# 流式:立刻把这条 entry 交给 Go 端开始下载,不等本批余下视频
|
||||
self.emit_stream_video(video)
|
||||
else:
|
||||
self.log(f" [FAIL] 未找到视频直链: {video['viewkey']}")
|
||||
video["video_url"] = ""
|
||||
@@ -636,6 +665,9 @@ def main():
|
||||
help="目标新增模式:从 page 1 起翻页直到累计处理这么多新 viewkey 后停止(backend 凌晨任务用)")
|
||||
parser.add_argument("--seen-viewkeys-file", type=str, default=None,
|
||||
help="文件路径,每行一个已处理过的 viewkey;脚本会跳过这些视频不再请求详情页")
|
||||
parser.add_argument("--stream-output", action="store_true",
|
||||
help="流式模式:每解析一条视频直链就立即把它作为一行 JSON 写到 stdout 并 flush;"
|
||||
"日志改走 stderr。配合 backend 边读边下载使用。")
|
||||
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
@@ -671,6 +703,7 @@ def main():
|
||||
quiet=args.quiet,
|
||||
target_new=args.target_new,
|
||||
seen_viewkeys=seen_viewkeys,
|
||||
stream_output=args.stream_output,
|
||||
)
|
||||
elif args.page is not None:
|
||||
# 单页模式(保留作手动调试用):start_page=N, max_pages=1
|
||||
@@ -683,6 +716,7 @@ def main():
|
||||
resume=False,
|
||||
quiet=args.quiet,
|
||||
seen_viewkeys=seen_viewkeys,
|
||||
stream_output=args.stream_output,
|
||||
)
|
||||
else:
|
||||
# 全量模式(向后兼容):从 page 1 起爬到末尾
|
||||
@@ -691,6 +725,7 @@ def main():
|
||||
resume=False if args.no_resume else None,
|
||||
quiet=args.quiet,
|
||||
seen_viewkeys=seen_viewkeys,
|
||||
stream_output=args.stream_output,
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
+16
-11
@@ -640,17 +640,11 @@ func (a *App) attachSpider91Crawler(d *catalog.Drive, drv *spider91.Driver) {
|
||||
WorkDir: filepath.Dir(scriptPath),
|
||||
CommonThumbDir: a.commonThumbsDir(),
|
||||
ProxyURL: proxyURL,
|
||||
OnNewVideo: func(v *catalog.Video) {
|
||||
// 新视频入库后,触发 teaser worker(不需要 thumb worker,封面已就绪)
|
||||
a.mu.Lock()
|
||||
worker := a.workers[driveID]
|
||||
a.mu.Unlock()
|
||||
// 这里没有外层 ctx —— Crawler 是后台 worker,OnNewVideo 是 fire-and-forget 回调。
|
||||
// 用 Background ctx 查 catalog 即可(teaserEnabledForDrive 内部仅做一次 GetDrive)。
|
||||
if worker != nil && a.teaserEnabledForDrive(context.Background(), driveID) {
|
||||
worker.Enqueue(v)
|
||||
}
|
||||
},
|
||||
// 新流程:teaser 不在每条视频入库时立即入队,而是 RunOnce 全部下完后由
|
||||
// runSpider91Crawl 统一调 enqueueDriveGeneration 一次性入队。这样:
|
||||
// - 下载阶段不和 ffmpeg 抢 CPU/IO
|
||||
// - "等待 teaser 队列 idle" 在 nightly Phase 2 的语义上更直观
|
||||
// 不再传 OnNewVideo(crawler 内部的回调字段保留,仅为单测计数器之用)。
|
||||
})
|
||||
|
||||
a.mu.Lock()
|
||||
@@ -1201,6 +1195,17 @@ func (a *App) runSpider91Crawl(ctx context.Context, driveID string) {
|
||||
if err := a.cat.UpsertDrive(ctx, d); err != nil {
|
||||
log.Printf("[spider91] drive=%s update last_crawl_at: %v", driveID, err)
|
||||
}
|
||||
|
||||
// 爬取全部完成后,统一把所有还 pending 的 teaser 入队。
|
||||
// 这是新流水线设计:crawler 自身不再每条入库就立即触发 teaser 生成,
|
||||
// 让"下载阶段"和"teaser 阶段"在时间上分清楚(也跟 nightly Phase 2
|
||||
// 的"等 teaser 队列 idle"语义对齐)。enqueueDriveGeneration 内部会读
|
||||
// 该 drive 当前的 teaser_enabled,关闭时是 noop。
|
||||
a.mu.Lock()
|
||||
worker := a.workers[driveID]
|
||||
thumbWorker := a.thumbWorkers[driveID]
|
||||
a.mu.Unlock()
|
||||
a.enqueueDriveGeneration(ctx, driveID, worker, thumbWorker)
|
||||
}
|
||||
|
||||
// spider91IntCred 解析 credentials 中的整数字段,缺省时返回 def。
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package spider91
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@@ -133,21 +134,15 @@ type spiderVideoEntry struct {
|
||||
DetailURL string `json:"detail_url"`
|
||||
}
|
||||
|
||||
type spiderOutput struct {
|
||||
CrawlTime string `json:"crawl_time"`
|
||||
PagesCrawled int `json:"pages_crawled"`
|
||||
TotalVideos int `json:"total_videos"`
|
||||
Successful int `json:"successful"`
|
||||
Failed int `json:"failed"`
|
||||
Videos []spiderVideoEntry `json:"videos"`
|
||||
}
|
||||
|
||||
// RunOnce 执行一次"跑爬虫 → 下载 → 入库"流程:
|
||||
// 1. 从 catalog 拉取本 drive 已存在的 viewkey 列表,写到临时文件
|
||||
// 2. 启动 python,传 --target-new=targetNew --seen-viewkeys-file=<tmp>,
|
||||
// 让 Python 从 page 1 起翻页,跳过已知 viewkey,凑够 targetNew 个新视频后退出
|
||||
// 3. 解析 JSON,按顺序下载视频和封面(视频文件后缀按 URL 真实后缀决定)
|
||||
// 4. 入库 + 触发 OnNewVideo 回调(让 backend 把新视频塞进 teaser worker)
|
||||
// 1. 从 catalog 拉取本 drive 已存在的 viewkey 列表,写到临时文件
|
||||
// 2. 启动 Python 爬虫(--target-new + --seen-viewkeys-file + --stream-output),
|
||||
// Python 每解析出一个 video 直链就把 entry 当作一行 JSON 写到 stdout。
|
||||
// 3. Go 端 bufio.Scanner 按行读:每行立即下载视频和封面、入库。
|
||||
// 这样 "Python 翻页找下一个" 与 "Go 下载当前一个" 在时间上重叠,缩短整轮耗时;
|
||||
// 更重要的是不会让前几个下载耽误后面签名链接 e= 过期。
|
||||
// 4. 全部消费完 + 子进程退出 → 返回 CrawlResult。teaser 不在此处入队,
|
||||
// 由调用方 (App.runSpider91Crawl) 在 RunOnce 后统一调 enqueueDriveGeneration。
|
||||
//
|
||||
// targetNew <= 0 会被规范化成 spider91DefaultTargetNew(15)。
|
||||
func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, error) {
|
||||
@@ -202,46 +197,66 @@ func (c *Crawler) RunOnce(ctx context.Context, targetNew int) (*CrawlResult, err
|
||||
}
|
||||
result.SeenSnapshot = seenCount
|
||||
|
||||
// 2. 跑 Python 爬虫(target_new 模式)
|
||||
if err := c.runSpiderTargetNew(ctx, targetNew, seenPath, outputPath); err != nil {
|
||||
return result, fmt.Errorf("spider91 crawler: spider run: %w", err)
|
||||
}
|
||||
|
||||
// 3. 解析 JSON
|
||||
spec, err := readSpiderOutput(outputPath)
|
||||
// 2-3. 启动 Python 爬虫(流式 stdout 协议),并边读边处理。
|
||||
//
|
||||
// 协议:Python 每解析出一个 video 的直链就把 entry JSON 写到 stdout 一行,
|
||||
// 立即 flush;本端 bufio.Scanner 收到一行就立即 processOne 下载视频和封面。
|
||||
// 这样把 "Python 等所有视频解析完 + Go 顺序下载 N 个" 重叠成 "Python 翻页找下一个的同时
|
||||
// Go 在下载当前一个",缩短总耗时;更重要的是把每条直链 e= 过期时间窗用满 ——
|
||||
// 不会因为 Go 在下前面 7 个时让后面 8 个的签名超时。
|
||||
cmd, stdout, err := c.startSpiderTargetNew(ctx, targetNew, seenPath, outputPath)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("spider91 crawler: parse output: %w", err)
|
||||
return result, fmt.Errorf("spider91 crawler: spider start: %w", err)
|
||||
}
|
||||
result.TotalEntries = len(spec.Videos)
|
||||
|
||||
// 4. 顺序处理每条;保留二次去重作防御性兜底
|
||||
for _, item := range spec.Videos {
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) // 单条 entry 远小于 4 MB;保险加大上限
|
||||
for scanner.Scan() {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return result, err
|
||||
}
|
||||
if result.NewVideos >= targetNew {
|
||||
// Python 侧已经按 target_new 控制了输出,这里再兜底一次防止脚本表现异常
|
||||
_ = cmd.Process.Kill()
|
||||
break
|
||||
}
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var item spiderVideoEntry
|
||||
if jerr := json.Unmarshal([]byte(line), &item); jerr != nil {
|
||||
log.Printf("[spider91] drive=%s stdout parse: %v line=%q", c.cfg.Driver.ID(), jerr, line)
|
||||
continue
|
||||
}
|
||||
result.TotalEntries++
|
||||
viewkey := strings.TrimSpace(item.Viewkey)
|
||||
if viewkey == "" || strings.TrimSpace(item.VideoURL) == "" {
|
||||
result.Failed++
|
||||
continue
|
||||
}
|
||||
|
||||
if result.NewVideos >= targetNew {
|
||||
// Python 侧已用 target_new 控制;这里再兜底防止脚本异常多输出
|
||||
break
|
||||
}
|
||||
videoID := buildVideoID(c.cfg.Driver.ID(), viewkey)
|
||||
// viewkey 已经入库 → 跳过(防御性,正常 Python 端已经过滤)
|
||||
if existing, _ := c.cfg.Catalog.GetVideo(ctx, videoID); existing != nil {
|
||||
result.Skipped++
|
||||
continue
|
||||
}
|
||||
if err := c.processOne(ctx, videoID, item); err != nil {
|
||||
log.Printf("[spider91] drive=%s viewkey=%s failed: %v", c.cfg.Driver.ID(), viewkey, err)
|
||||
if perr := c.processOne(ctx, videoID, item); perr != nil {
|
||||
log.Printf("[spider91] drive=%s viewkey=%s failed: %v", c.cfg.Driver.ID(), viewkey, perr)
|
||||
result.Failed++
|
||||
continue
|
||||
}
|
||||
result.NewVideos++
|
||||
}
|
||||
if scerr := scanner.Err(); scerr != nil {
|
||||
log.Printf("[spider91] drive=%s stdout scan: %v", c.cfg.Driver.ID(), scerr)
|
||||
}
|
||||
if werr := cmd.Wait(); werr != nil {
|
||||
// 子进程被我们 Kill 是预期;其它错误(exit code != 0)记录日志但不当致命错误,
|
||||
// 因为流式模式下 stdout 已读完,能拿到的视频已经处理。
|
||||
if ctx.Err() == nil {
|
||||
log.Printf("[spider91] drive=%s spider exit: %v", c.cfg.Driver.ID(), werr)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -288,11 +303,13 @@ func (c *Crawler) writeSeenViewkeys(ctx context.Context, path string) (int, erro
|
||||
return len(seen), nil
|
||||
}
|
||||
|
||||
// runSpiderTargetNew 用 --target-new + --seen-viewkeys-file 模式调起 python 子进程。
|
||||
func (c *Crawler) runSpiderTargetNew(ctx context.Context, targetNew int, seenPath, outputPath string) error {
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, c.cfg.SpiderTimeout)
|
||||
defer cancel()
|
||||
|
||||
// runSpiderTargetNew 启动 Python 子进程(--target-new + --seen-viewkeys-file
|
||||
// + --stream-output)。返回 cmd 和 stdout 的 reader;调用方按行 JSON 消费 stdout,
|
||||
// 每读到一行就立即 processOne,下完再读下一行。Python 的日志被引到 stderr,
|
||||
// 由本函数转发到 backend log,不影响 stdout 的 JSONL 协议。
|
||||
//
|
||||
// 使用方负责调 cmd.Wait(),并 close stdout reader。
|
||||
func (c *Crawler) startSpiderTargetNew(ctx context.Context, targetNew int, seenPath, outputPath string) (*exec.Cmd, io.ReadCloser, error) {
|
||||
args := []string{
|
||||
c.cfg.ScriptPath,
|
||||
"--target-new", fmt.Sprintf("%d", targetNew),
|
||||
@@ -300,22 +317,47 @@ func (c *Crawler) runSpiderTargetNew(ctx context.Context, targetNew int, seenPat
|
||||
"--output", outputPath,
|
||||
"--no-resume",
|
||||
"--quiet",
|
||||
"--stream-output",
|
||||
}
|
||||
cmd := exec.CommandContext(cmdCtx, c.cfg.PythonPath, args...)
|
||||
// 子进程的 ctx 走外层 ctx 即可,不再额外加 SpiderTimeout —— 流式模式下
|
||||
// 单个视频的下载在 Go 端做超时控制(DownloadTimeout);爬虫脚本主要时间在
|
||||
// 列表/详情页 + 网络等待,整轮上限通过外层 ctx 控制更准确。
|
||||
cmd := exec.CommandContext(ctx, c.cfg.PythonPath, args...)
|
||||
if c.cfg.WorkDir != "" {
|
||||
cmd.Dir = c.cfg.WorkDir
|
||||
}
|
||||
cmd.Stdout = os.Stdout
|
||||
cmd.Stderr = os.Stderr
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
_ = stdout.Close()
|
||||
return nil, nil, fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
log.Printf("[spider91] drive=%s exec %s --target-new=%d --seen=%s --output=%s",
|
||||
c.cfg.Driver.ID(), c.cfg.ScriptPath, targetNew, seenPath, outputPath)
|
||||
if err := cmd.Run(); err != nil {
|
||||
return err
|
||||
if err := cmd.Start(); err != nil {
|
||||
_ = stdout.Close()
|
||||
_ = stderr.Close()
|
||||
return nil, nil, fmt.Errorf("start: %w", err)
|
||||
}
|
||||
if _, err := os.Stat(outputPath); err != nil {
|
||||
return fmt.Errorf("output file not produced: %w", err)
|
||||
// stderr 转发到 backend log。子进程退出时 reader 自动 EOF,goroutine 自然结束。
|
||||
go forwardSpiderLog(c.cfg.Driver.ID(), stderr)
|
||||
return cmd, stdout, nil
|
||||
}
|
||||
|
||||
// forwardSpiderLog 把 Python stderr 逐行转发到 backend log,便于调试。
|
||||
func forwardSpiderLog(driveID string, r io.Reader) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
log.Printf("[spider91:py] drive=%s %s", driveID, line)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// processOne 处理单个 viewkey:下载视频 + 封面 + 复制封面 + 入库。
|
||||
@@ -346,7 +388,9 @@ func (c *Crawler) processOne(ctx context.Context, videoID string, item spiderVid
|
||||
return fmt.Errorf("download video: %w", err)
|
||||
}
|
||||
|
||||
// 封面下载失败不致命,记录后继续,让 thumbnail worker 兜底。
|
||||
// 封面下载失败不致命,视频本身仍入库;下方在 UpsertVideo 后会把
|
||||
// thumbnail_status 显式标 'failed'(spider91 drive 的 thumb worker 按设计
|
||||
// 不处理 spider91 视频,没人能"兜底")。
|
||||
thumbReady := false
|
||||
if strings.TrimSpace(item.ThumbURL) != "" {
|
||||
if _, err := c.downloadAtomic(dlCtx, item.ThumbURL, thumbPath, item.DetailURL); err != nil {
|
||||
@@ -404,12 +448,14 @@ func (c *Crawler) processOne(ctx context.Context, videoID string, item spiderVid
|
||||
_ = os.Remove(thumbPath)
|
||||
return fmt.Errorf("upsert video: %w", err)
|
||||
}
|
||||
if thumbReady {
|
||||
// UpsertVideo 路径上 thumbnail_status 默认 'pending',
|
||||
// 这里再补一次确保为 'ready'。
|
||||
if !thumbReady {
|
||||
// 网站封面下载失败的视频:spider91 drive 的 thumb worker 按设计不
|
||||
// 处理 spider91 视频(封面应是网站原图直接保存),所以没人接手。
|
||||
// 显式标 'failed' 让 CountVideosNeedingThumbnail 排除(条件 status
|
||||
// != 'failed'),否则 enqueueDriveGeneration → waitForThumbnailsBeforePreview
|
||||
// 会因为 count > 0 把 teaser 入队永远卡在等待循环里。
|
||||
_ = c.cfg.Catalog.UpdateVideoMeta(ctx, v.ID, catalog.VideoMetaPatch{
|
||||
ThumbnailURL: v.ThumbnailURL,
|
||||
ThumbnailStatus: "ready",
|
||||
ThumbnailStatus: "failed",
|
||||
})
|
||||
}
|
||||
if c.cfg.OnNewVideo != nil {
|
||||
@@ -476,19 +522,6 @@ func (c *Crawler) downloadAtomic(ctx context.Context, src, dst, referer string)
|
||||
return written, nil
|
||||
}
|
||||
|
||||
// readSpiderOutput 读取 Python 写出的 JSON。
|
||||
func readSpiderOutput(path string) (*spiderOutput, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out spiderOutput
|
||||
if err := json.Unmarshal(data, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
// copyFileAtomic 把 src 复制到 dst,先写 .part 再 rename。
|
||||
func copyFileAtomic(src, dst string) error {
|
||||
in, err := os.Open(src)
|
||||
|
||||
@@ -2,7 +2,7 @@ package spider91
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
@@ -46,45 +46,27 @@ func TestCrawlerRunOnceFullFlow(t *testing.T) {
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
// 2. 假 python 脚本:写一个 shell 脚本伪装成 python,解析 --output 参数后输出 JSON
|
||||
jsonPayload := fmt.Sprintf(`{
|
||||
"crawl_time": "2026-05-22T00:00:00",
|
||||
"pages_crawled": 1,
|
||||
"total_videos": 2,
|
||||
"successful": 2,
|
||||
"failed": 0,
|
||||
"videos": [
|
||||
{
|
||||
"title": "Video One",
|
||||
"thumb_url": "%s/thumbs/thumb1.jpg",
|
||||
"video_url": "%s/videos/video1.mp4",
|
||||
"viewkey": "vk-001",
|
||||
"detail_url": "%s/v.php?viewkey=vk-001"
|
||||
},
|
||||
{
|
||||
"title": "Video Two",
|
||||
"thumb_url": "%s/thumbs/thumb2.jpg",
|
||||
"video_url": "%s/videos/video2.mp4",
|
||||
"viewkey": "vk-002",
|
||||
"detail_url": "%s/v.php?viewkey=vk-002"
|
||||
}
|
||||
]
|
||||
}`, srv.URL, srv.URL, srv.URL, srv.URL, srv.URL, srv.URL)
|
||||
|
||||
// 2. 假 python 脚本:解析 --output / --stream-output 参数,
|
||||
// 在 stream 模式下逐行 echo 每条视频的 JSON 到 stdout(模拟 Python 端 stream),
|
||||
// 同时仍写 --output 文件作归档。
|
||||
videoEntries := []map[string]string{
|
||||
{
|
||||
"title": "Video One",
|
||||
"thumb_url": srv.URL + "/thumbs/thumb1.jpg",
|
||||
"video_url": srv.URL + "/videos/video1.mp4",
|
||||
"viewkey": "vk-001",
|
||||
"detail_url": srv.URL + "/v.php?viewkey=vk-001",
|
||||
},
|
||||
{
|
||||
"title": "Video Two",
|
||||
"thumb_url": srv.URL + "/thumbs/thumb2.jpg",
|
||||
"video_url": srv.URL + "/videos/video2.mp4",
|
||||
"viewkey": "vk-002",
|
||||
"detail_url": srv.URL + "/v.php?viewkey=vk-002",
|
||||
},
|
||||
}
|
||||
scriptPath := filepath.Join(tmp, "fake_spider.sh")
|
||||
scriptBody := "#!/bin/sh\n" +
|
||||
"# 解析 --output FILE 写入预设 JSON\n" +
|
||||
"out=\"\"\n" +
|
||||
"while [ $# -gt 0 ]; do\n" +
|
||||
" case \"$1\" in\n" +
|
||||
" --output) out=\"$2\"; shift 2;;\n" +
|
||||
" *) shift;;\n" +
|
||||
" esac\n" +
|
||||
"done\n" +
|
||||
"if [ -z \"$out\" ]; then echo no output >&2; exit 1; fi\n" +
|
||||
"cat > \"$out\" <<'PAYLOAD'\n" +
|
||||
jsonPayload + "\n" +
|
||||
"PAYLOAD\n"
|
||||
scriptBody := buildFakeSpiderScript(videoEntries)
|
||||
if err := os.WriteFile(scriptPath, []byte(scriptBody), 0o755); err != nil {
|
||||
t.Fatalf("write script: %v", err)
|
||||
}
|
||||
@@ -249,3 +231,141 @@ func TestCrawlerRunOnceMissingScript(t *testing.T) {
|
||||
t.Fatalf("expected error for missing script")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// TestCrawlerThumbDownloadFailureMarksStatusFailed 验证:网站封面下载失败时
|
||||
// crawler 把 thumbnail_status 显式标 'failed',避免 enqueueDriveGeneration 的
|
||||
// waitForThumbnailsBeforePreview 因为 count > 0 把 teaser 卡死等待。
|
||||
//
|
||||
// 历史 bug:之前 thumb 下载失败仅打 log,url='', status 走 schema DEFAULT 'pending'。
|
||||
// CountVideosNeedingThumbnail 条件是 url='' AND status != 'failed' → count=1。
|
||||
// spider91 drive 的 thumb worker 按设计不处理 spider91 视频 → 没人会改 status。
|
||||
// 结果 teaser 永远卡在 [preview] waiting for 1 thumbnails before teaser generation。
|
||||
func TestCrawlerThumbDownloadFailureMarksStatusFailed(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("shell-based fake script only on unix")
|
||||
}
|
||||
tmp := t.TempDir()
|
||||
|
||||
// 假 HTTP 服务器:thumb 路径返回 500,video 正常返回字节。
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case strings.Contains(r.URL.Path, "video.mp4"):
|
||||
w.Header().Set("Content-Type", "video/mp4")
|
||||
_, _ = w.Write([]byte("FAKEVIDEO"))
|
||||
case strings.Contains(r.URL.Path, "thumb.jpg"):
|
||||
http.Error(w, "broken", http.StatusInternalServerError)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
videoEntries := []map[string]string{
|
||||
{
|
||||
"title": "Thumb Failure Video",
|
||||
"thumb_url": srv.URL + "/thumbs/thumb.jpg",
|
||||
"video_url": srv.URL + "/videos/video.mp4",
|
||||
"viewkey": "vk-thumb-fail",
|
||||
"detail_url": srv.URL + "/v.php?viewkey=vk-thumb-fail",
|
||||
},
|
||||
}
|
||||
scriptPath := filepath.Join(tmp, "fake.sh")
|
||||
if err := os.WriteFile(scriptPath, []byte(buildFakeSpiderScript(videoEntries)), 0o755); err != nil {
|
||||
t.Fatalf("write script: %v", err)
|
||||
}
|
||||
|
||||
cat, err := catalog.Open(filepath.Join(tmp, "test.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("catalog: %v", err)
|
||||
}
|
||||
defer cat.Close()
|
||||
|
||||
driveID := "thumbfail-drive"
|
||||
drv := New(Config{ID: driveID, RootDir: filepath.Join(tmp, "spider91", driveID)})
|
||||
if err := cat.UpsertDrive(context.Background(), &catalog.Drive{
|
||||
ID: driveID, Kind: Kind, Name: "thumbfail",
|
||||
}); err != nil {
|
||||
t.Fatalf("upsert drive: %v", err)
|
||||
}
|
||||
|
||||
c := NewCrawler(CrawlerConfig{
|
||||
Driver: drv,
|
||||
Catalog: cat,
|
||||
PythonPath: "sh",
|
||||
ScriptPath: scriptPath,
|
||||
CommonThumbDir: filepath.Join(tmp, "previews", "thumbs"),
|
||||
SpiderTimeout: 10 * time.Second,
|
||||
DownloadTimeout: 10 * time.Second,
|
||||
})
|
||||
|
||||
res, err := c.RunOnce(context.Background(), 5)
|
||||
if err != nil {
|
||||
t.Fatalf("RunOnce: %v", err)
|
||||
}
|
||||
if res.NewVideos != 1 {
|
||||
t.Fatalf("expected 1 new video, got %d (failed=%d)", res.NewVideos, res.Failed)
|
||||
}
|
||||
|
||||
got, err := cat.GetVideo(context.Background(), "spider91-"+driveID+"-vk-thumb-fail")
|
||||
if err != nil {
|
||||
t.Fatalf("get video: %v", err)
|
||||
}
|
||||
if got.ThumbnailURL != "" {
|
||||
t.Errorf("ThumbnailURL = %q, want empty (download failed)", got.ThumbnailURL)
|
||||
}
|
||||
|
||||
// 关键断言:CountVideosNeedingThumbnail 应该返回 0。
|
||||
// 该函数的 SQL 条件是 `url = '' AND status != 'failed'`;如果 crawler 没把
|
||||
// status 标 'failed'(schema DEFAULT 'pending'),count 就会是 1,外层
|
||||
// waitForThumbnailsBeforePreview 会因为 count > 0 把 teaser 卡死等待。
|
||||
count, err := cat.CountVideosNeedingThumbnail(context.Background(), driveID)
|
||||
if err != nil {
|
||||
t.Fatalf("count: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Fatalf("CountVideosNeedingThumbnail = %d, want 0 (status should be 'failed' to unblock teaser worker)", count)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// buildFakeSpiderScript 生成一个伪 python 脚本(其实是 sh)。
|
||||
//
|
||||
// 行为:
|
||||
// - 解析 --output FILE / --stream-output 两个 flag
|
||||
// - --stream-output 时:逐行输出每个 entry 的 JSON 到 stdout 并 flush
|
||||
// - --output 时:把完整 JSON 数据写到 FILE(向后兼容,且作归档)
|
||||
//
|
||||
// 用 sh 来写是为了避免 Python 依赖。每条 entry 的 JSON 用 Go marshal 出来后嵌入。
|
||||
func buildFakeSpiderScript(entries []map[string]string) string {
|
||||
var sb strings.Builder
|
||||
sb.WriteString("#!/bin/sh\n")
|
||||
sb.WriteString("out=\"\"; stream=0\n")
|
||||
sb.WriteString("while [ $# -gt 0 ]; do case \"$1\" in --output) out=\"$2\"; shift 2;; --stream-output) stream=1; shift;; *) shift;; esac; done\n")
|
||||
|
||||
// stream 模式:逐行 echo
|
||||
sb.WriteString("if [ \"$stream\" = \"1\" ]; then\n")
|
||||
for _, e := range entries {
|
||||
raw, _ := json.Marshal(e)
|
||||
// 用单引号 here-string 形式确保 JSON 中的双引号原样出来
|
||||
sb.WriteString(" cat <<'STREAM_EOF'\n")
|
||||
sb.Write(raw)
|
||||
sb.WriteString("\nSTREAM_EOF\n")
|
||||
}
|
||||
sb.WriteString("fi\n")
|
||||
|
||||
// 写 --output 文件(带完整 wrapper)
|
||||
sb.WriteString("if [ -n \"$out\" ]; then\n")
|
||||
sb.WriteString(" mkdir -p \"$(dirname \"$out\")\" 2>/dev/null\n")
|
||||
sb.WriteString(" cat > \"$out\" <<'OUT_EOF'\n")
|
||||
wrapper := map[string]any{
|
||||
"crawl_time": "2026-01-01T00:00:00",
|
||||
"total_videos": len(entries),
|
||||
"videos": entries,
|
||||
}
|
||||
wrapped, _ := json.MarshalIndent(wrapper, "", " ")
|
||||
sb.Write(wrapped)
|
||||
sb.WriteString("\nOUT_EOF\n")
|
||||
sb.WriteString("fi\n")
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user