mirror of
https://github.com/nianzhibai/91.git
synced 2026-06-15 08:45:41 +08:00
Use web download API for 115 media
This commit is contained in:
@@ -383,6 +383,9 @@ func (a *App) attachDrive(ctx context.Context, d *catalog.Drive) error {
|
||||
})
|
||||
worker := preview.NewWorker(gen, a.cat, drv, "")
|
||||
thumbWorker := preview.NewThumbWorker(gen, a.cat, drv)
|
||||
if drv.Kind() == "p115" {
|
||||
preview.ShareGenerationState(worker, thumbWorker)
|
||||
}
|
||||
|
||||
workerCtx, cancel := context.WithCancel(ctx)
|
||||
go worker.Run(workerCtx)
|
||||
|
||||
@@ -202,19 +202,6 @@ func (d *Driver) StreamURL(ctx context.Context, fileID string) (*drives.StreamLi
|
||||
}
|
||||
|
||||
func (d *Driver) downloadInfo(pickCode string) (*sdk.DownloadInfo, string, error) {
|
||||
mobileUA := sdk.UAIosApp
|
||||
if info, err := d.client.DownloadWithUAByAndroidAPI(pickCode, mobileUA); err == nil {
|
||||
if info != nil && info.Url.Url != "" {
|
||||
return info, mobileUA, nil
|
||||
}
|
||||
} else {
|
||||
webInfo, webErr := d.client.DownloadWithUA(pickCode, d.ua)
|
||||
if webErr != nil {
|
||||
return nil, "", fmt.Errorf("android api: %v; chrome api: %w", err, webErr)
|
||||
}
|
||||
return webInfo, d.ua, nil
|
||||
}
|
||||
|
||||
info, err := d.client.DownloadWithUA(pickCode, d.ua)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
|
||||
@@ -847,7 +847,7 @@ type Worker struct {
|
||||
ch chan *catalog.Video
|
||||
|
||||
RateLimitCooldown time.Duration
|
||||
rateLimit rateLimitState
|
||||
shared *sharedGenerationState
|
||||
activity taskActivity
|
||||
}
|
||||
|
||||
@@ -857,6 +857,7 @@ func NewWorker(gen TeaserGenerator, cat *catalog.Catalog, drv drives.Drive, _ st
|
||||
Catalog: cat,
|
||||
Drive: drv,
|
||||
ch: make(chan *catalog.Video, 4096),
|
||||
shared: newSharedGenerationState(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -891,7 +892,7 @@ type ThumbWorker struct {
|
||||
ch chan *catalog.Video
|
||||
|
||||
RateLimitCooldown time.Duration
|
||||
rateLimit rateLimitState
|
||||
shared *sharedGenerationState
|
||||
activity taskActivity
|
||||
}
|
||||
|
||||
@@ -907,6 +908,11 @@ type rateLimitState struct {
|
||||
lastSkipLog time.Time
|
||||
}
|
||||
|
||||
type sharedGenerationState struct {
|
||||
rateLimit *rateLimitState
|
||||
gate sync.Mutex
|
||||
}
|
||||
|
||||
type TaskStatus struct {
|
||||
State string
|
||||
CurrentTitle string
|
||||
@@ -989,6 +995,31 @@ func NewThumbWorker(gen ThumbnailGenerator, cat *catalog.Catalog, drv drives.Dri
|
||||
Catalog: cat,
|
||||
Drive: drv,
|
||||
ch: make(chan *catalog.Video, 4096),
|
||||
shared: newSharedGenerationState(),
|
||||
}
|
||||
}
|
||||
|
||||
func newSharedGenerationState() *sharedGenerationState {
|
||||
return &sharedGenerationState{rateLimit: &rateLimitState{}}
|
||||
}
|
||||
|
||||
// ShareGenerationState makes preview and thumbnail generation for one drive
|
||||
// share cooldowns and avoid concurrent reads from the same remote media source.
|
||||
func ShareGenerationState(worker *Worker, thumbWorker *ThumbWorker) {
|
||||
state := newSharedGenerationState()
|
||||
if worker != nil && worker.shared != nil {
|
||||
state = worker.shared
|
||||
} else if thumbWorker != nil && thumbWorker.shared != nil {
|
||||
state = thumbWorker.shared
|
||||
}
|
||||
if state.rateLimit == nil {
|
||||
state.rateLimit = &rateLimitState{}
|
||||
}
|
||||
if worker != nil {
|
||||
worker.shared = state
|
||||
}
|
||||
if thumbWorker != nil {
|
||||
thumbWorker.shared = state
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1020,14 +1051,14 @@ func (w *Worker) Status() TaskStatus {
|
||||
if w == nil {
|
||||
return TaskStatus{State: "idle"}
|
||||
}
|
||||
return taskStatus(&w.activity, &w.rateLimit, len(w.ch))
|
||||
return taskStatus(&w.activity, w.generationState().rateLimit, len(w.ch))
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) Status() TaskStatus {
|
||||
if w == nil {
|
||||
return TaskStatus{State: "idle"}
|
||||
}
|
||||
return taskStatus(&w.activity, &w.rateLimit, len(w.ch))
|
||||
return taskStatus(&w.activity, w.generationState().rateLimit, len(w.ch))
|
||||
}
|
||||
|
||||
func taskStatus(activity *taskActivity, rateLimit *rateLimitState, queueLength int) TaskStatus {
|
||||
@@ -1090,23 +1121,71 @@ func (w *ThumbWorker) Run(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (w *Worker) processQueued(ctx context.Context, v *catalog.Video) {
|
||||
w.activity.start(v)
|
||||
defer w.activity.done()
|
||||
if !waitForRateLimitCooldown(ctx, &w.rateLimit, "preview", w.Drive) {
|
||||
state := w.generationState()
|
||||
unlock, ok := waitForGenerationTurn(ctx, state, "preview", w.Drive)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
w.activity.start(v)
|
||||
defer w.activity.done()
|
||||
w.process(ctx, v)
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) processQueued(ctx context.Context, v *catalog.Video) {
|
||||
w.activity.start(v)
|
||||
defer w.activity.done()
|
||||
if !waitForRateLimitCooldown(ctx, &w.rateLimit, "thumb", w.Drive) {
|
||||
state := w.generationState()
|
||||
unlock, ok := waitForGenerationTurn(ctx, state, "thumb", w.Drive)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
w.activity.start(v)
|
||||
defer w.activity.done()
|
||||
w.process(ctx, v)
|
||||
}
|
||||
|
||||
func (w *Worker) generationState() *sharedGenerationState {
|
||||
if w.shared == nil {
|
||||
w.shared = newSharedGenerationState()
|
||||
}
|
||||
if w.shared.rateLimit == nil {
|
||||
w.shared.rateLimit = &rateLimitState{}
|
||||
}
|
||||
return w.shared
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) generationState() *sharedGenerationState {
|
||||
if w.shared == nil {
|
||||
w.shared = newSharedGenerationState()
|
||||
}
|
||||
if w.shared.rateLimit == nil {
|
||||
w.shared.rateLimit = &rateLimitState{}
|
||||
}
|
||||
return w.shared
|
||||
}
|
||||
|
||||
func waitForGenerationTurn(ctx context.Context, state *sharedGenerationState, label string, drive drives.Drive) (func(), bool) {
|
||||
if state == nil {
|
||||
state = newSharedGenerationState()
|
||||
}
|
||||
if state.rateLimit == nil {
|
||||
state.rateLimit = &rateLimitState{}
|
||||
}
|
||||
for {
|
||||
if !waitForRateLimitCooldown(ctx, state.rateLimit, label, drive) {
|
||||
return nil, false
|
||||
}
|
||||
state.gate.Lock()
|
||||
if _, ok := state.rateLimit.coolingUntil(time.Now()); ok {
|
||||
state.gate.Unlock()
|
||||
continue
|
||||
}
|
||||
return state.gate.Unlock, true
|
||||
}
|
||||
}
|
||||
|
||||
func waitForRateLimitCooldown(ctx context.Context, state *rateLimitState, label string, drive drives.Drive) bool {
|
||||
driveID := ""
|
||||
if drive != nil {
|
||||
@@ -1133,7 +1212,7 @@ func waitForRateLimitCooldown(ctx context.Context, state *rateLimitState, label
|
||||
}
|
||||
|
||||
func (w *Worker) skipIfRateLimited(v *catalog.Video) bool {
|
||||
until, ok, shouldLog := w.rateLimit.active(time.Now())
|
||||
until, ok, shouldLog := w.generationState().rateLimit.active(time.Now())
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
@@ -1151,7 +1230,7 @@ func (w *Worker) pauseForRateLimit(err error, step, title string) bool {
|
||||
if retryAfter <= 0 {
|
||||
retryAfter = w.RateLimitCooldown
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), retryAfter)
|
||||
until := w.generationState().rateLimit.pause(time.Now(), retryAfter)
|
||||
log.Printf("[preview] drive=%s rate-limited until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
@@ -1163,13 +1242,13 @@ func (w *Worker) pauseForRecoverableError(err error, step, title string) bool {
|
||||
if !driveErrorShouldCooldown(w.Drive, err) {
|
||||
return false
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
until := w.generationState().rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
log.Printf("[preview] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
|
||||
func (w *ThumbWorker) skipIfRateLimited(v *catalog.Video) bool {
|
||||
until, ok, shouldLog := w.rateLimit.active(time.Now())
|
||||
until, ok, shouldLog := w.generationState().rateLimit.active(time.Now())
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
@@ -1187,7 +1266,7 @@ func (w *ThumbWorker) pauseForRateLimit(err error, step, title string) bool {
|
||||
if retryAfter <= 0 {
|
||||
retryAfter = w.RateLimitCooldown
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), retryAfter)
|
||||
until := w.generationState().rateLimit.pause(time.Now(), retryAfter)
|
||||
log.Printf("[thumb] drive=%s rate-limited until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
@@ -1199,7 +1278,7 @@ func (w *ThumbWorker) pauseForRecoverableError(err error, step, title string) bo
|
||||
if !driveErrorShouldCooldown(w.Drive, err) {
|
||||
return false
|
||||
}
|
||||
until := w.rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
until := w.generationState().rateLimit.pause(time.Now(), w.RateLimitCooldown)
|
||||
log.Printf("[thumb] drive=%s transient media source error until=%s step=%s video=%s: %v", w.Drive.ID(), until.Format(time.RFC3339), step, title, err)
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user