Skip to content

Commit

Permalink
fix(fc2): fix double cancellation on quality upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkness4 committed Aug 23, 2024
1 parent 9cbf9fc commit d62bac0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 12 deletions.
9 changes: 8 additions & 1 deletion cmd/download/download_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,14 @@ Available format options:
if loop {
for {
_, err := downloader.Watch(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, fc2.ErrWebSocketStreamEnded) {
if errors.Is(err, context.Canceled) {
select {
case <-ctx.Done():
default:
log.Panic().
Err(err).
Msg("a channel was cancelled, but the parent context was not")
}
log.Info().Str("channelID", channelID).Msg("abort watching channel")
break
}
Expand Down
29 changes: 19 additions & 10 deletions fc2/fc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,35 +677,33 @@ playlistLoop:
doneChan = make(chan struct{}, 1)

// Download thread.
go func() {
go func(ctx context.Context) {
log.Info().Msg("downloader thread started")
defer func() {
close(doneChan)
}()
span.AddEvent("downloading")
end := metrics.TimeStartRecording(currentCtx, metrics.Downloads.CompletionTime, time.Second, metric.WithAttributes(
end := metrics.TimeStartRecording(ctx, metrics.Downloads.CompletionTime, time.Second, metric.WithAttributes(
attribute.String("channel_id", f.channelID),
),
)
defer end()
metrics.Downloads.Runs.Add(currentCtx, 1, metric.WithAttributes(
metrics.Downloads.Runs.Add(ctx, 1, metric.WithAttributes(
attribute.String("channel_id", f.channelID),
))

// Actually download. It will block until the download is finished.
checkpointMu.Lock()
checkpoint, err = downloader.Read(currentCtx, file, checkpoint)
checkpoint, err = downloader.Read(ctx, file, checkpoint)
checkpointMu.Unlock()

if err != nil {
errChan <- err
}
}()
f.log.Info().Msg("downloader thread finished")
}(currentCtx)

case err := <-errChan:
if currentCancel != nil {
currentCancel()
}

if err == nil {
f.log.Panic().Msg(
"undefined behavior, downloader finished with nil, the download MUST finish with io.EOF",
Expand All @@ -714,13 +712,24 @@ playlistLoop:
if err == io.EOF {
f.log.Info().Msg("downloader finished reading")
} else if errors.Is(err, context.Canceled) {
f.log.Info().Msg("downloader canceled")
select {
case <-ctx.Done():
f.log.Info().Msg("downloader cancelled by parent context")
// Parent context was cancelled, we should return.
default:
// Parent context was not cancelled, we should continue.
f.log.Info().Msg("downloader cancelled")
continue
}
} else {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
f.log.Error().Err(err).Msg("downloader failed with error")
}

if currentCancel != nil {
currentCancel()
}
return err
}
}
Expand Down
5 changes: 5 additions & 0 deletions fc2/fc2_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,21 @@ func (w *WebSocket) Listen(
}
switch arguments.Code {
case 4101:
w.log.Info().Msg("ws paid program")
return ErrWebSocketPaidProgram
case 4507:
w.log.Info().Msg("ws login required")
return ErrWebSocketLoginRequired
case 4512:
w.log.Info().Msg("ws multiple connection")
return ErrWebSocketMultipleConnection
default:
w.log.Error().Msg("ws server disconnection")
return ErrWebSocketServerDisconnection
}

case "publish_stop":
w.log.Info().Msg("ws stream ended")
return ErrWebSocketStreamEnded
case "comment":
var arguments CommentArguments
Expand Down
2 changes: 1 addition & 1 deletion hls/hls_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (hls *Downloader) Read(
writer io.Writer,
checkpoint Checkpoint,
) (newCheckpoint Checkpoint, err error) {
hls.log.Debug().Msg("started to read stream")
hls.log.Info().Msg("hls downloader started")
ctx, span := otel.Tracer(tracerName).Start(ctx, "hls.Read", trace.WithAttributes(
attribute.String("last_fragment_name", checkpoint.LastFragmentName),
attribute.String("last_fragment_time", checkpoint.LastFragmentTime.String()),
Expand Down

0 comments on commit d62bac0

Please sign in to comment.