Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stage Plugins] Implement ListStageCommands() in pipedapi #5534

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 57 additions & 38 deletions pkg/app/pipedv1/apistore/commandstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -41,25 +42,32 @@
type Lister interface {
ListApplicationCommands() []model.ReportableCommand
ListDeploymentCommands() []model.ReportableCommand
ListStageCommands(deploymentID, stageID string) []model.ReportableCommand
ListBuildPlanPreviewCommands() []model.ReportableCommand
ListPipedCommands() []model.ReportableCommand

// ListStageCommands returns all stage commands of the given deployment and stage.
// If the command type is not supported, it returns an error.
ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error)
}

// stageCommandMap is a map of stage commands. Keys are deploymentID and stageID.
type stageCommandMap map[string]map[string][]*model.Command

type store struct {
apiClient apiClient
syncInterval time.Duration
// TODO: Using atomic for storing a map of all commands
// instead of some separate lists + mutex as the current.
applicationCommands []model.ReportableCommand
deploymentCommands []model.ReportableCommand
stageCommands []model.ReportableCommand
planPreviewCommands []model.ReportableCommand
pipedCommands []model.ReportableCommand
handledCommands map[string]time.Time
mu sync.RWMutex
gracePeriod time.Duration
logger *zap.Logger
applicationCommands []model.ReportableCommand
deploymentCommands []model.ReportableCommand
planPreviewCommands []model.ReportableCommand
pipedCommands []model.ReportableCommand
stageApproveCommands stageCommandMap
stageSkipCommands stageCommandMap
handledCommands map[string]time.Time
mu sync.RWMutex
gracePeriod time.Duration
logger *zap.Logger
}

var (
Expand Down Expand Up @@ -117,33 +125,37 @@
}

var (
applicationCommands = make([]model.ReportableCommand, 0)
deploymentCommands = make([]model.ReportableCommand, 0)
stageCommands = make([]model.ReportableCommand, 0)
planPreviewCommands = make([]model.ReportableCommand, 0)
pipedCommands = make([]model.ReportableCommand, 0)
applicationCommands = make([]model.ReportableCommand, 0)
deploymentCommands = make([]model.ReportableCommand, 0)
planPreviewCommands = make([]model.ReportableCommand, 0)
pipedCommands = make([]model.ReportableCommand, 0)
stageApproveCommands stageCommandMap
stageSkipCommands stageCommandMap

Check warning on line 133 in pkg/app/pipedv1/apistore/commandstore/store.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/apistore/commandstore/store.go#L128-L133

Added lines #L128 - L133 were not covered by tests
)
for _, cmd := range resp.Commands {
switch cmd.Type {
case model.Command_SYNC_APPLICATION, model.Command_UPDATE_APPLICATION_CONFIG, model.Command_CHAIN_SYNC_APPLICATION:
applicationCommands = append(applicationCommands, s.makeReportableCommand(cmd))
case model.Command_CANCEL_DEPLOYMENT:
deploymentCommands = append(deploymentCommands, s.makeReportableCommand(cmd))
case model.Command_APPROVE_STAGE, model.Command_SKIP_STAGE:
stageCommands = append(stageCommands, s.makeReportableCommand(cmd))
case model.Command_BUILD_PLAN_PREVIEW:
planPreviewCommands = append(planPreviewCommands, s.makeReportableCommand(cmd))
case model.Command_RESTART_PIPED:
pipedCommands = append(pipedCommands, s.makeReportableCommand(cmd))
case model.Command_APPROVE_STAGE:
stageApproveCommands.append(cmd)
case model.Command_SKIP_STAGE:
stageSkipCommands.append(cmd)

Check warning on line 148 in pkg/app/pipedv1/apistore/commandstore/store.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/apistore/commandstore/store.go#L145-L148

Added lines #L145 - L148 were not covered by tests
}
}

s.mu.Lock()
s.applicationCommands = applicationCommands
s.deploymentCommands = deploymentCommands
s.stageCommands = stageCommands
s.planPreviewCommands = planPreviewCommands
s.pipedCommands = pipedCommands
s.stageApproveCommands = stageApproveCommands
s.stageSkipCommands = stageSkipCommands

Check warning on line 158 in pkg/app/pipedv1/apistore/commandstore/store.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/apistore/commandstore/store.go#L157-L158

Added lines #L157 - L158 were not covered by tests
s.mu.Unlock()

return nil
Expand Down Expand Up @@ -191,26 +203,6 @@
return commands
}

func (s *store) ListStageCommands(deploymentID, stageID string) []model.ReportableCommand {
s.mu.RLock()
defer s.mu.RUnlock()

commands := make([]model.ReportableCommand, 0, len(s.stageCommands))
for _, cmd := range s.stageCommands {
if _, ok := s.handledCommands[cmd.Id]; ok {
continue
}
if cmd.DeploymentId != deploymentID {
continue
}
if cmd.StageId != stageID {
continue
}
commands = append(commands, cmd)
}
return commands
}

func (s *store) ListBuildPlanPreviewCommands() []model.ReportableCommand {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down Expand Up @@ -264,3 +256,30 @@
})
return err
}

func (s *store) ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error) {
var commands stageCommandMap
switch commandType {
case model.Command_APPROVE_STAGE:
commands = s.stageApproveCommands
case model.Command_SKIP_STAGE:
commands = s.stageSkipCommands
default:
s.logger.Error("invalid command type", zap.String("commandType", commandType.String()))
return nil, fmt.Errorf("invalid command type: %v", commandType.String())
}

s.mu.RLock()
defer s.mu.RUnlock()

return commands[deploymentID][stageID], nil
}

func (m stageCommandMap) append(c *model.Command) {
deploymentID := c.DeploymentId
stageID := c.StageId
if _, ok := m[deploymentID]; !ok {
m[deploymentID] = make(map[string][]*model.Command)
}
m[deploymentID][stageID] = append(m[deploymentID][stageID], c)

Check warning on line 284 in pkg/app/pipedv1/apistore/commandstore/store.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/apistore/commandstore/store.go#L278-L284

Added lines #L278 - L284 were not covered by tests
}
138 changes: 138 additions & 0 deletions pkg/app/pipedv1/apistore/commandstore/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commandstore

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/model"
)

func TestListStageCommands(t *testing.T) {
t.Parallel()

store := store{
stageApproveCommands: stageCommandMap{
"deployment-1": {
"stage-1": []*model.Command{
{
Id: "command-1",
DeploymentId: "deployment-1",
StageId: "stage-1",
Type: model.Command_APPROVE_STAGE,
Commander: "commander-1",
},
{
Id: "command-2",
DeploymentId: "deployment-1",
StageId: "stage-1",
Type: model.Command_APPROVE_STAGE,
Commander: "commander-2",
},
},
},
},
stageSkipCommands: stageCommandMap{
"deployment-11": {
"stage-11": []*model.Command{
{
Id: "command-11",
DeploymentId: "deployment-11",
StageId: "stage-11",
Type: model.Command_SKIP_STAGE,
},
},
},
},
logger: zap.NewNop(),
}

testcases := []struct {
name string
deploymentID string
stageID string
commandType model.Command_Type
want []*model.Command
wantErr error
}{
{
name: "valid arguments of Approve",
deploymentID: "deployment-1",
stageID: "stage-1",
commandType: model.Command_APPROVE_STAGE,
want: []*model.Command{
{
Id: "command-1",
DeploymentId: "deployment-1",
StageId: "stage-1",
Type: model.Command_APPROVE_STAGE,
Commander: "commander-1",
},
{
Id: "command-2",
DeploymentId: "deployment-1",
StageId: "stage-1",
Type: model.Command_APPROVE_STAGE,
Commander: "commander-2",
},
},
wantErr: nil,
},
{
name: "valid arguments of Skip",
deploymentID: "deployment-11",
stageID: "stage-11",
commandType: model.Command_SKIP_STAGE,
want: []*model.Command{
{
Id: "command-11",
DeploymentId: "deployment-11",
StageId: "stage-11",
Type: model.Command_SKIP_STAGE,
},
},
wantErr: nil,
},
{
name: "stageID not exists",
deploymentID: "deployment-1",
stageID: "stage-999",
commandType: model.Command_APPROVE_STAGE,
want: nil,
wantErr: nil,
},
{
name: "invalid commandType",
deploymentID: "deployment-1",
stageID: "stage-1",
commandType: model.Command_CANCEL_DEPLOYMENT,
want: nil,
wantErr: fmt.Errorf("invalid command type: CANCEL_DEPLOYMENT"),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
got, err := store.ListStageCommands(tc.deploymentID, tc.stageID, tc.commandType)
assert.Equal(t, tc.wantErr, err)
assert.Equal(t, tc.want, got)
})
}
}
26 changes: 25 additions & 1 deletion pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
service "github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"

"go.uber.org/zap"
Expand All @@ -36,19 +37,31 @@
toolRegistry *toolRegistry
Logger *zap.Logger
metadataStoreRegistry *metadatastore.MetadataStoreRegistry
stageCommandLister stageCommandLister
}

type apiClient interface {
ReportStageLogs(ctx context.Context, req *pipedservice.ReportStageLogsRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsResponse, error)
ReportStageLogsFromLastCheckpoint(ctx context.Context, in *pipedservice.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsFromLastCheckpointResponse, error)
}

type stageCommandLister interface {
ListStageCommands(deploymentID, stageID string, commandType model.Command_Type) ([]*model.Command, error)
}

// Register registers all handling of this service into the specified gRPC server.
func (a *PluginAPI) Register(server *grpc.Server) {
service.RegisterPluginServiceServer(server, a)
}

func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, toolsDir string, logger *zap.Logger, metadataStoreRegistry *metadatastore.MetadataStoreRegistry) (*PluginAPI, error) {
func NewPluginAPI(
cfg *config.PipedSpec,
apiClient apiClient,
toolsDir string,
logger *zap.Logger,
metadataStoreRegistry *metadatastore.MetadataStoreRegistry,
stageCommandLister stageCommandLister,
) (*PluginAPI, error) {

Check warning on line 64 in pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go#L64

Added line #L64 was not covered by tests
toolRegistry, err := newToolRegistry(toolsDir)
if err != nil {
return nil, fmt.Errorf("failed to create tool registry: %w", err)
Expand All @@ -60,6 +73,7 @@
toolRegistry: toolRegistry,
Logger: logger.Named("plugin-api"),
metadataStoreRegistry: metadataStoreRegistry,
stageCommandLister: stageCommandLister,

Check warning on line 76 in pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go#L76

Added line #L76 was not covered by tests
}, nil
}

Expand Down Expand Up @@ -143,3 +157,13 @@
func (a *PluginAPI) GetDeploymentSharedMetadata(ctx context.Context, req *service.GetDeploymentSharedMetadataRequest) (*service.GetDeploymentSharedMetadataResponse, error) {
return a.metadataStoreRegistry.GetDeploymentSharedMetadata(ctx, req)
}

func (a *PluginAPI) ListStageCommands(ctx context.Context, req *service.ListStageCommandsRequest) (*service.ListStageCommandsResponse, error) {
commands, err := a.stageCommandLister.ListStageCommands(req.DeploymentId, req.StageId, req.Type)
if err != nil {
return nil, err
}
return &service.ListStageCommandsResponse{
Commands: commands,
}, nil

Check warning on line 168 in pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go#L161-L168

Added lines #L161 - L168 were not covered by tests
}
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@
// Start running plugin service server.
{
var (
service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger, metadataStoreRegistry)
service, err = grpcapi.NewPluginAPI(cfg, apiClient, p.toolsDir, input.Logger, metadataStoreRegistry, commandLister)

Check warning on line 305 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L305

Added line #L305 was not covered by tests
opts = []rpc.Option{
rpc.WithPort(p.pluginServicePort),
rpc.WithGracePeriod(p.gracePeriod),
Expand Down
1 change: 0 additions & 1 deletion pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type deploymentLister interface {

type commandLister interface {
ListDeploymentCommands() []model.ReportableCommand
ListStageCommands(deploymentID, stageID string) []model.ReportableCommand
}

type notifier interface {
Expand Down