fix(spider91): cool down PikPak captcha migration failures

This commit is contained in:
nianzhibai
2026-05-27 10:59:12 +08:00
parent f05df174ac
commit 95bf67667a
4 changed files with 440 additions and 13 deletions
+28 -7
View File
@@ -8,6 +8,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/go-resty/resty/v2"
@@ -43,6 +44,14 @@ type Driver struct {
client *resty.Client
onTokenUpdate func(access, refresh, captcha, deviceID string)
// captchaMu serializes captcha-token refreshes triggered by 4002 / 9
// recovery in requestOnce. Without it, N concurrent callers all hitting
// 4002 at once would each post to /v1/shield/captcha/init, racing to
// overwrite d.captchaToken — wasteful and likely to be flagged by
// PikPak as abuse. With it, only one refresh is in flight; later
// callers observe d.captchaToken has changed and skip the refresh.
captchaMu sync.Mutex
}
type Config struct {
@@ -279,14 +288,26 @@ func (d *Driver) requestOnce(ctx context.Context, url, method string, configure
}
case 9, 4002:
if retry {
// 4002 = captcha_token expired;先清掉缓存让 refresh 走空 token 路径,
// 否则 refresh 仍会带着同一个过期 token 再次拿到 4002。
// 9 = 通用 captcha_invalid,刷新后通常能恢复。
if e.ErrorCode == 4002 {
d.captchaToken = ""
// Snapshot the token we *just used* (which the server rejected).
// Then take captchaMu so concurrent recovery attempts are
// serialized. Once we hold the lock, if d.captchaToken has
// already moved past staleToken, another goroutine has refreshed
// it for us — we skip the refresh and just retry. Otherwise we
// clear the cached token (4002 means "the value in the body is
// expired"; sending it again will keep returning 4002) and ask
// /v1/shield/captcha/init for a fresh one.
staleToken := d.captchaToken
d.captchaMu.Lock()
var refreshErr error
if d.captchaToken == staleToken {
if e.ErrorCode == 4002 {
d.captchaToken = ""
}
refreshErr = d.refreshCaptchaTokenAtLogin(ctx, getAction(method, url), d.userID)
}
if err := d.refreshCaptchaTokenAtLogin(ctx, getAction(method, url), d.userID); err != nil {
return err
d.captchaMu.Unlock()
if refreshErr != nil {
return refreshErr
}
return d.requestOnce(ctx, url, method, configure, out, false)
}
+23
View File
@@ -1,6 +1,7 @@
package pikpak
import (
"errors"
"fmt"
"strconv"
"strings"
@@ -58,6 +59,28 @@ func (e *errResp) Error() string {
return fmt.Sprintf("pikpak error_code=%d error=%s description=%s", e.ErrorCode, e.ErrorMsg, e.ErrorDescription)
}
// APIError is the public alias for the PikPak API error response. Callers
// outside this package (e.g. the spider91→PikPak migrator, tests) can either
// construct it for fakes or unwrap it via errors.As. Prefer IsCaptchaError
// over hard-coding the numeric error codes.
type APIError = errResp
// IsCaptchaError reports whether err originates from a PikPak captcha-related
// API error (error_code 4002 = captcha_token expired; 9 = captcha_invalid).
//
// It walks the error chain via errors.As, so the caller can wrap the original
// error with fmt.Errorf("...: %w", err) without breaking detection.
func IsCaptchaError(err error) bool {
if err == nil {
return false
}
var e *errResp
if errors.As(err, &e) {
return e != nil && (e.ErrorCode == 4002 || e.ErrorCode == 9)
}
return false
}
type captchaTokenRequest struct {
Action string `json:"action"`
CaptchaToken string `json:"captcha_token"`
+135 -3
View File
@@ -57,7 +57,11 @@ type Config struct {
// KeepLatestN 是每个 spider91 drive 在本地保留的最新视频数。
// 超过的部分中"已迁移"的会被清理;未迁移的不动。0 时默认 15;< 0 关闭清理。
KeepLatestN int
OnMigrated func(videoID string)
// CaptchaCooldown 是迁移 worker 在遇到 PikPak captcha 错误(error_code
// 4002 / 9)后整体进入冷却的时长。冷却期间 runOnce 直接返回,不再发起任何
// PikPak API 请求,避免被进一步风控。0 时默认 5 分钟;< 0 关闭冷却(仅用于测试)。
CaptchaCooldown time.Duration
OnMigrated func(videoID string)
}
type Migrator struct {
@@ -65,6 +69,15 @@ type Migrator struct {
trigger chan struct{}
mu sync.Mutex
running bool
// cooldownMu 保护 cooldownUntil。captcha 冷却的语义:
// - migrateDrive 遇到上传失败且 pikpak.IsCaptchaError(err) == true 时
// 调 setCooldown,未来 cfg.CaptchaCooldown 内 runOnce 直接 noop
// - 一次冷却期内只打印一行进入日志和一行恢复日志,避免之前那种
// "每秒一条 4002" 的刷屏
cooldownMu sync.Mutex
cooldownUntil time.Time
cooldownLogged bool
}
func New(cfg Config) *Migrator {
@@ -77,12 +90,66 @@ func New(cfg Config) *Migrator {
if cfg.KeepLatestN == 0 {
cfg.KeepLatestN = 15
}
if cfg.CaptchaCooldown == 0 {
cfg.CaptchaCooldown = 5 * time.Minute
}
return &Migrator{
cfg: cfg,
trigger: make(chan struct{}, 1),
}
}
// inCooldown 返回当前是否处于 captcha 冷却期,以及冷却结束时间。
// 冷却期间应该跳过整个 runOnce —— 不要列盘、不要尝试上传,
// 让 PikPak 喘口气。
func (m *Migrator) inCooldown() (bool, time.Time) {
m.cooldownMu.Lock()
defer m.cooldownMu.Unlock()
return time.Now().Before(m.cooldownUntil), m.cooldownUntil
}
// cooldownState 返回当前冷却状态。若发现冷却已经过期,会清掉状态并让
// 调用方打印一次恢复日志。
func (m *Migrator) cooldownState() (active bool, until time.Time, resumed bool) {
m.cooldownMu.Lock()
defer m.cooldownMu.Unlock()
if m.cooldownUntil.IsZero() {
return false, time.Time{}, false
}
until = m.cooldownUntil
if time.Now().Before(until) {
return true, until, false
}
m.cooldownUntil = time.Time{}
m.cooldownLogged = false
return false, until, true
}
// setCooldown 把冷却结束时间往后推 cfg.CaptchaCooldown,并返回结束时间。
// 当 cfg.CaptchaCooldown < 0(仅测试用)时不改任何状态、返回零值。
func (m *Migrator) setCooldown() time.Time {
if m.cfg.CaptchaCooldown < 0 {
return time.Time{}
}
m.cooldownMu.Lock()
defer m.cooldownMu.Unlock()
m.cooldownUntil = time.Now().Add(m.cfg.CaptchaCooldown)
m.cooldownLogged = false
return m.cooldownUntil
}
// markCooldownLogged 是 runOnce 用来只打一次"在冷却中"日志的小工具。
// 第一次返回 false(应该打),第二次起返回 true(不再打),冷却到期 / 重新设置时复位。
func (m *Migrator) markCooldownLogged() bool {
m.cooldownMu.Lock()
defer m.cooldownMu.Unlock()
if m.cooldownLogged {
return true
}
m.cooldownLogged = true
return false
}
// Trigger 安排一次"立即跑"。多次调用会被合并成一次(channel buffer=1)。
func (m *Migrator) Trigger() {
select {
@@ -95,16 +162,54 @@ func (m *Migrator) Trigger() {
func (m *Migrator) Run(ctx context.Context) {
t := time.NewTicker(m.cfg.Interval)
defer t.Stop()
var cooldownTimer *time.Timer
var cooldownC <-chan time.Time
stopCooldownTimer := func() {
if cooldownTimer == nil {
return
}
if !cooldownTimer.Stop() {
select {
case <-cooldownTimer.C:
default:
}
}
cooldownTimer = nil
cooldownC = nil
}
resetCooldownTimer := func() {
stopCooldownTimer()
active, until := m.inCooldown()
if !active {
return
}
delay := time.Until(until)
if delay < 0 {
delay = 0
}
cooldownTimer = time.NewTimer(delay)
cooldownC = cooldownTimer.C
}
defer stopCooldownTimer()
// 启动后立刻跑一次(不等第一个 tick)
m.runOnce(ctx)
resetCooldownTimer()
for {
select {
case <-ctx.Done():
return
case <-t.C:
m.runOnce(ctx)
resetCooldownTimer()
case <-m.trigger:
m.runOnce(ctx)
resetCooldownTimer()
case <-cooldownC:
cooldownTimer = nil
cooldownC = nil
m.runOnce(ctx)
resetCooldownTimer()
}
}
}
@@ -126,6 +231,18 @@ func (m *Migrator) runOnce(ctx context.Context) {
m.mu.Unlock()
}()
// captcha 冷却期间整轮跳过 —— 不做任何 PikPak API 调用、不做本地清理,
// 等冷却结束。这样从用户视角看:进入冷却 → 一行日志 → 完全静默 → 冷却
// 结束自然恢复。避免之前每秒一条 4002 的日志雪崩。
if active, until, resumed := m.cooldownState(); active {
if !m.markCooldownLogged() {
log.Printf("[spider91migrate] captcha cooldown active until %s, skipping run", until.Format(time.RFC3339))
}
return
} else if resumed {
log.Printf("[spider91migrate] captcha cooldown ended at %s, resuming migration", until.Format(time.RFC3339))
}
target, pp, err := m.resolveTarget()
if err != nil {
// 没目标就静默 —— 用户可能还没配 PikPak drive
@@ -142,6 +259,12 @@ func (m *Migrator) runOnce(ctx context.Context) {
log.Printf("[spider91migrate] drive=%s migrate batch error: %v", src.ID(), err)
}
migrated += n
if active, _ := m.inCooldown(); active {
if migrated > 0 {
log.Printf("[spider91migrate] migrated %d video(s) to drive=%s", migrated, target)
}
return
}
}
if migrated > 0 {
log.Printf("[spider91migrate] migrated %d video(s) to drive=%s", migrated, target)
@@ -215,8 +338,8 @@ func (m *Migrator) spider91Drives() []*spider91.Driver {
// - 列出 spider91 drive 本地 videos/ 目录所有 mp4 文件,按 mtime 降序排
// - 跳过最新 KeepLatestN 个:这些是用户希望保留在本地的最新爬取
// - 对剩下的(更旧)逐个处理:
// * 还没迁移(drive_id 仍是 src.ID())→ 上传到 PikPak + 改 catalog + 删本地
// * 已经迁移过但本地还有残留 → 仅删本地(兜底)
// - 还没迁移(drive_id 仍是 src.ID())→ 上传到 PikPak + 改 catalog + 删本地
// - 已经迁移过但本地还有残留 → 仅删本地(兜底)
//
// KeepLatestN < 0 时不保护任何本地文件,全部尝试迁移(旧行为,主要给测试用)。
func (m *Migrator) migrateDrive(ctx context.Context, src *spider91.Driver, targetDriveID string, pp pikpakUploader) (int, error) {
@@ -296,6 +419,15 @@ func (m *Migrator) migrateDrive(ctx context.Context, src *spider91.Driver, targe
ok, err := m.migrateOne(ctx, v, src, targetDriveID, pp)
if err != nil {
log.Printf("[spider91migrate] %s: %v", v.ID, err)
// captcha 错误(4002 / 9)说明 PikPak 当前正拒绝我们;继续在
// 同一轮里尝试其它文件大概率会拿到同样的 4002,并且每多一次
// 失败就多一份"被风控加深"的风险。立即中止当前 batch 并
// 打开冷却窗口,等 cfg.CaptchaCooldown 之后再重试。
if pikpak.IsCaptchaError(err) {
until := m.setCooldown()
log.Printf("[spider91migrate] drive=%s captcha-blocked, cooling down until %s", src.ID(), until.Format(time.RFC3339))
return migrated, nil
}
continue
}
if ok {
@@ -70,9 +70,9 @@ func (d *fakePikPak) ID() string { return d.id }
func (d *fakePikPak) RootID() string {
return d.rootID
}
func (d *fakePikPak) Init(context.Context) error { return nil }
func (d *fakePikPak) List(context.Context, string) ([]drives.Entry, error) { return nil, nil }
func (d *fakePikPak) Stat(context.Context, string) (*drives.Entry, error) { return nil, nil }
func (d *fakePikPak) Init(context.Context) error { return nil }
func (d *fakePikPak) List(context.Context, string) ([]drives.Entry, error) { return nil, nil }
func (d *fakePikPak) Stat(context.Context, string) (*drives.Entry, error) { return nil, nil }
func (d *fakePikPak) StreamURL(context.Context, string) (*drives.StreamLink, error) {
return nil, drives.ErrNotSupported
}
@@ -113,7 +113,9 @@ var _ pikpakUploader = (*fakePikPak)(nil)
// TestBackfillFileNamesRenamesOnlyMismatchedSpider91Videos 验证回填逻辑:
//
// - 已经是期望格式的不会再调 Rename(幂等)
//
// - 名字仍是旧格式的 spider91-* 视频会被改名 + catalog 同步
//
// - 不是 spider91-* 的 PikPak 视频不动(避免误伤手工导入的)
//
// - 反复跑 runOnce 不会再重复改名
@@ -642,3 +644,252 @@ func TestRunOnceMigratesOnlyOlderFilesBeyondKeepWindow(t *testing.T) {
}
}
// TestRunOnceCoolsDownOnCaptchaErrorAndAbortsBatch 验证当 PikPak 返回
// captcha 错误(4002 / 9)时:
//
// 1. migrateDrive 立即放弃当前 batch,不继续遍历后续候选;
// 2. migrator 进入 cooldown,下一次 runOnce 直接 noop,不再发起任何上传;
// 3. cooldown 到期后 runOnce 自然恢复,不需要外部干预。
//
// 这个测试覆盖之前观察到的 "每秒一条 4002 日志雪崩" bug:当时 batch 里 50 个
// 文件每个都会触发同样的 captcha 失败,本测试断言其中只有 1 个会被尝试。
func TestRunOnceCoolsDownOnCaptchaErrorAndAbortsBatch(t *testing.T) {
cat := setupCatalog(t)
src, _ := setupSpider91(t)
pp := newFakePikPak("pikpak-target", "pikpak-root-id")
pp.uploadFunc = func(ctx context.Context, parentID, name string, r io.Reader, size int64) (pikpak.UploadResult, error) {
_, _ = io.Copy(io.Discard, r)
// 模拟真实 PikPak 4002 错误:通过包装 *pikpak.APIError
// pikpak.IsCaptchaError 应该能识别出来。
captcha := &pikpak.APIError{ErrorCode: 4002, ErrorMsg: "captcha_invalid", ErrorDescription: "Code(4002) - captcha_token expired"}
return pikpak.UploadResult{}, fmt.Errorf("pikpak upload: request session: %w", captcha)
}
reg := newFakeRegistry()
reg.Add(src)
reg.Add(pp)
now := time.Now()
// 写 5 个本地文件,全都"够老"应该被迁。KeepLatestN=-1 关闭保留窗口,
// 让所有候选都进 batch 循环。
for i := 0; i < 5; i++ {
viewkey := fmt.Sprintf("vk-cd-%02d", i)
mtime := now.Add(time.Duration(-i) * time.Hour)
_ = writeSpider91Video(t, cat, src, viewkey, ".mp4", []byte("payload"), mtime)
path, _ := src.VideoPath(viewkey + ".mp4")
_ = os.Chtimes(path, mtime, mtime)
}
m := New(Config{
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
KeepLatestN: -1,
CaptchaCooldown: 10 * time.Minute,
})
// 第一次 runOnce:应该在第 1 个文件失败时就退出 batch,且进入冷却。
m.runOnce(context.Background())
if pp.uploadCalls != 1 {
t.Fatalf("after first runOnce upload calls = %d, want 1 (batch should abort on captcha error)", pp.uploadCalls)
}
if active, _ := m.inCooldown(); !active {
t.Fatalf("expected migrator to be in cooldown after captcha error")
}
// 第二次 runOnce:应该完全 noop,因为还在冷却期。
m.runOnce(context.Background())
if pp.uploadCalls != 1 {
t.Fatalf("after second runOnce upload calls = %d, want 1 (cooldown should skip the run)", pp.uploadCalls)
}
// catalog 行不能被改 —— 上传失败的文件保持在 spider91 drive
for i := 0; i < 5; i++ {
viewkey := fmt.Sprintf("vk-cd-%02d", i)
id := "spider91-" + src.ID() + "-" + viewkey
v, _ := cat.GetVideo(context.Background(), id)
if v.DriveID != src.ID() {
t.Errorf("%s drive_id = %q, want spider91 (upload failed, catalog should stay)", viewkey, v.DriveID)
}
// 本地文件也不能被删
path, _ := src.VideoPath(viewkey + ".mp4")
if _, err := os.Stat(path); err != nil {
t.Errorf("%s local file removed despite failed upload: %v", viewkey, err)
}
}
}
// TestRunOnceResumesAfterCooldownExpires 验证冷却到期后 runOnce 可以继续工作。
//
// 用 cfg.CaptchaCooldown = 50msset 完冷却立即等 60ms,第二次 runOnce 应该重新
// 进入正常路径。这里把 uploadFunc 换成成功版本,验证整条链路通畅。
func TestRunOnceResumesAfterCooldownExpires(t *testing.T) {
cat := setupCatalog(t)
src, _ := setupSpider91(t)
pp := newFakePikPak("pikpak-target", "pikpak-root-id")
// 第一次:失败;第二次:成功。
var failOnce sync.Once
pp.uploadFunc = func(ctx context.Context, parentID, name string, r io.Reader, size int64) (pikpak.UploadResult, error) {
body, _ := io.ReadAll(r)
var failed bool
failOnce.Do(func() { failed = true })
if failed {
captcha := &pikpak.APIError{ErrorCode: 4002, ErrorMsg: "captcha_invalid"}
return pikpak.UploadResult{}, fmt.Errorf("pikpak upload: request session: %w", captcha)
}
pp.mu.Lock()
pp.gotBodies[name] = body
pp.mu.Unlock()
return pikpak.UploadResult{
FileID: "remote-" + name,
Hash: "FAKEHASH40CHARSXXXXXXXXXXXXXXXXXXXXXXXXX",
Size: int64(len(body)),
}, nil
}
reg := newFakeRegistry()
reg.Add(src)
reg.Add(pp)
now := time.Now()
_ = writeSpider91Video(t, cat, src, "vk-resume", ".mp4", []byte("payload"), now)
m := New(Config{
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
KeepLatestN: -1,
CaptchaCooldown: 30 * time.Millisecond,
})
// 第一次:失败 + 进入冷却
m.runOnce(context.Background())
if pp.uploadCalls != 1 {
t.Fatalf("first run upload calls = %d, want 1", pp.uploadCalls)
}
if active, _ := m.inCooldown(); !active {
t.Fatalf("expected cooldown after first failure")
}
// 等冷却到期
time.Sleep(80 * time.Millisecond)
if active, _ := m.inCooldown(); active {
t.Fatalf("cooldown should have expired by now")
}
// 第二次:成功
m.runOnce(context.Background())
if pp.uploadCalls != 2 {
t.Fatalf("second run upload calls = %d, want 2 (resume after cooldown)", pp.uploadCalls)
}
id := "spider91-" + src.ID() + "-vk-resume"
v, _ := cat.GetVideo(context.Background(), id)
if v.DriveID != pp.ID() {
t.Fatalf("after resume, drive_id = %q, want PikPak", v.DriveID)
}
}
// TestRunWakesWhenCooldownExpires 验证 Run 循环会在 cooldown 到点后主动唤醒
// 一次迁移,而不是等下一个普通 interval tick。
func TestRunWakesWhenCooldownExpires(t *testing.T) {
cat := setupCatalog(t)
src, _ := setupSpider91(t)
pp := newFakePikPak("pikpak-target", "pikpak-root-id")
migrated := make(chan struct{}, 1)
var failOnce sync.Once
pp.uploadFunc = func(ctx context.Context, parentID, name string, r io.Reader, size int64) (pikpak.UploadResult, error) {
body, _ := io.ReadAll(r)
var failed bool
failOnce.Do(func() { failed = true })
if failed {
captcha := &pikpak.APIError{ErrorCode: 4002, ErrorMsg: "captcha_invalid"}
return pikpak.UploadResult{}, fmt.Errorf("pikpak upload: request session: %w", captcha)
}
pp.mu.Lock()
pp.gotBodies[name] = body
pp.mu.Unlock()
return pikpak.UploadResult{
FileID: "remote-" + name,
Hash: "FAKEHASH40CHARSXXXXXXXXXXXXXXXXXXXXXXXXX",
Size: int64(len(body)),
}, nil
}
reg := newFakeRegistry()
reg.Add(src)
reg.Add(pp)
now := time.Now()
id := writeSpider91Video(t, cat, src, "vk-auto-resume", ".mp4", []byte("payload"), now)
m := New(Config{
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
Interval: time.Hour,
KeepLatestN: -1,
CaptchaCooldown: 30 * time.Millisecond,
OnMigrated: func(videoID string) {
if videoID == id {
select {
case migrated <- struct{}{}:
default:
}
}
},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go m.Run(ctx)
select {
case <-migrated:
case <-time.After(500 * time.Millisecond):
t.Fatalf("Run did not resume migration after cooldown expired")
}
got, err := cat.GetVideo(context.Background(), id)
if err != nil {
t.Fatalf("get video: %v", err)
}
if got.DriveID != pp.ID() {
t.Fatalf("after auto resume, drive_id = %q, want PikPak", got.DriveID)
}
}
// TestNonCaptchaErrorDoesNotTriggerCooldown 验证非 captcha 类的上传错误(如
// 网络抖动)不会让整个 worker 进冷却 —— 只跳过这一条,继续尝试 batch 里其它的。
func TestNonCaptchaErrorDoesNotTriggerCooldown(t *testing.T) {
cat := setupCatalog(t)
src, _ := setupSpider91(t)
pp := newFakePikPak("pikpak-target", "pikpak-root-id")
pp.uploadFunc = func(ctx context.Context, parentID, name string, r io.Reader, size int64) (pikpak.UploadResult, error) {
_, _ = io.Copy(io.Discard, r)
return pikpak.UploadResult{}, errors.New("simulated network failure")
}
reg := newFakeRegistry()
reg.Add(src)
reg.Add(pp)
now := time.Now()
for i := 0; i < 3; i++ {
viewkey := fmt.Sprintf("vk-net-%02d", i)
_ = writeSpider91Video(t, cat, src, viewkey, ".mp4", []byte("payload"), now.Add(time.Duration(-i)*time.Hour))
}
m := New(Config{
Catalog: cat,
Registry: reg,
GetTargetDriveID: func() string { return pp.ID() },
KeepLatestN: -1,
})
m.runOnce(context.Background())
// 所有 3 个都被尝试(每个都失败,但不应触发冷却中止 batch)
if pp.uploadCalls != 3 {
t.Fatalf("upload calls = %d, want 3 (non-captcha errors should not abort batch)", pp.uploadCalls)
}
if active, _ := m.inCooldown(); active {
t.Fatalf("non-captcha error should not trigger cooldown")
}
}