Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel handling in pipedv1 scheduler #5597

Merged
merged 4 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/app/pipedv1/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final
TargetDeploymentSource: tds.ToPluginDeploySource(),
},
})
if err != nil {
// do not return error if the context is already canceled.
// this occurs when the stage is canceled.
// otherwise, return the error.
if err != nil && ctx.Err() == nil {
s.logger.Error("failed to execute stage", zap.String("stage-name", ps.Name), zap.Error(err))
s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires)
return model.StageStatus_STAGE_FAILURE
Expand Down
5 changes: 4 additions & 1 deletion pkg/app/pipedv1/plugin/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func wait(ctx context.Context, duration time.Duration, initialStart time.Time, s

case <-ctx.Done(): // on cancelled
slp.Info("Wait cancelled")
return sdk.StageStatusCancelled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[IMO] I think StageStatusCancelled should remain, although it's not used in piped.
That's because plugin developers will be confused about which status to return.

If we want to remove StageStatusCancelled, we should remove case <-ctx.Done(): section too. (If possible, that's ideal)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you are concerned about.
On the other hand, my concern is that the plugin developers may think they have to handle context cancellation as StageStatusCancelled. This is incorrect; the plugin should exit its operation on the context cancel without concern about its response.
The WAIT plugin's case is special because we must handle context cancellation to exit its operation. Almost all plugins can do this only by passing the context to their internal functions because deployment operations can handle context cancellation as a failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin developers may think they have to handle context cancellation as StageStatusCancelled.

I agree!

What about WaitApproval and ScriptRun stages?

e.LogPersister.Infof("Waiting for approval from at least %d user(s)...", num)
for {
select {
case <-ticker.C:
if e.checkApproval(ctx, num) {
return model.StageStatus_STAGE_SUCCESS
}
case s := <-sig.Ch():
switch s {
case executor.StopSignalCancel:
return model.StageStatus_STAGE_CANCELLED
case executor.StopSignalTerminate:
return originalStatus
default:
return model.StageStatus_STAGE_FAILURE
}
case <-timer.C:
e.LogPersister.Errorf("Timed out %v", timeout)
return model.StageStatus_STAGE_FAILURE
}
}

for {
select {
case result := <-c:
return result
case <-timer.C:
e.LogPersister.Errorf("Canceled because of timeout")
return model.StageStatus_STAGE_FAILURE
case s := <-sig.Ch():
switch s {
case executor.StopSignalCancel:
e.LogPersister.Info("Canceled by user")
return model.StageStatus_STAGE_CANCELLED
case executor.StopSignalTerminate:
e.LogPersister.Info("Terminated by system")
return originalStatus
default:
e.LogPersister.Error("Unexpected")
return model.StageStatus_STAGE_FAILURE
}
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, the timeout should be handled on the piped side so the plugin doesn't have to.

SCRIPT_RUN stage should use os/exec.CommandContext: it handles context cancellation as an interruption of executed commands. So we can implement it without watching ctx.Done().

WAIT_APPROVAL stage is difficult to implement without watching ctx.Done() because it doesn't operate something with context other than polling the approval states.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i got it.

Let's add a note about when to handle ctx.Done() to the plugin dev guide.
Even if StageStatusCancelled is removed, plugin developers should be aware of cancellation to certainly exit the stage.

// The piped handles this case as cancelled by the user without using the plugin's result.
// So we don't need to consider which status should be returned.
// We return the failure here.
return sdk.StageStatusFailure
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/plugin/wait/wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestWait_Cancel(t *testing.T) {

select {
case result := <-resultCh:
assert.Equal(t, sdk.StageStatusCancelled, result)
assert.Equal(t, sdk.StageStatusFailure, result)
case <-time.After(1 * time.Second):
t.Error("wait() did not ended even after the context was canceled")
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/plugin/sdk/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,25 +455,25 @@ type ExecuteStageResponse struct {
type StageStatus int

const (
StageStatusSuccess StageStatus = 2
StageStatusFailure StageStatus = 3
StageStatusCancelled StageStatus = 4

// StageStatusSkipped StageStatus = 5 // TODO: If SDK can handle whole skipping, this is unnecessary.

_ StageStatus = iota
// StageStatusSuccess indicates that the stage succeeded.
StageStatusSuccess
// StageStatusFailure indicates that the stage failed.
StageStatusFailure
// StageStatusExited can be used when the stage succeeded and exit the pipeline without executing the following stages.
StageStatusExited StageStatus = 6
StageStatusExited

// StageStatusSkipped // TODO: If SDK can handle whole skipping, this is unnecessary.
)

// toModelEnum converts the StageStatus to the model.StageStatus.
// It returns model.StageStatus_STAGE_FAILURE if the given value is invalid.
func (o StageStatus) toModelEnum() model.StageStatus {
switch o {
case StageStatusSuccess:
return model.StageStatus_STAGE_SUCCESS
case StageStatusFailure:
return model.StageStatus_STAGE_FAILURE
case StageStatusCancelled:
return model.StageStatus_STAGE_CANCELLED
case StageStatusExited:
return model.StageStatus_STAGE_EXITED
default:
Expand Down