Skip to content

Commit

Permalink
Introduce plugin logpersister package (#5342)
Browse files Browse the repository at this point in the history
* Introduce plugin logpersister package

Signed-off-by: khanhtc1202 <[email protected]>

* Add interface comments

Signed-off-by: khanhtc1202 <[email protected]>

---------

Signed-off-by: khanhtc1202 <[email protected]>
  • Loading branch information
khanhtc1202 authored Nov 15, 2024
1 parent 519d3fc commit 23eddac
Show file tree
Hide file tree
Showing 12 changed files with 1,103 additions and 127 deletions.
54 changes: 50 additions & 4 deletions pkg/app/pipedv1/cmd/piped/grpcapi/plugin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/cmd/piped/service"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/crypto"
"github.com/pipe-cd/pipecd/pkg/model"
Expand All @@ -30,19 +31,27 @@ import (
type PluginAPI struct {
service.PluginServiceServer

cfg *config.PipedSpec
cfg *config.PipedSpec
apiClient apiClient

Logger *zap.Logger
}

type apiClient interface {
ReportStageLogs(ctx context.Context, req *pipedservice.ReportStageLogsRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsResponse, error)
ReportStageLogsFromLastCheckpoint(ctx context.Context, in *pipedservice.ReportStageLogsFromLastCheckpointRequest, opts ...grpc.CallOption) (*pipedservice.ReportStageLogsFromLastCheckpointResponse, error)
}

// Register registers all handling of this service into the specified gRPC server.
func (a *PluginAPI) Register(server *grpc.Server) {
service.RegisterPluginServiceServer(server, a)
}

func NewPluginAPI(cfg *config.PipedSpec, logger *zap.Logger) *PluginAPI {
func NewPluginAPI(cfg *config.PipedSpec, apiClient apiClient, logger *zap.Logger) *PluginAPI {
return &PluginAPI{
cfg: cfg,
Logger: logger.Named("plugin-api"),
cfg: cfg,
apiClient: apiClient,
Logger: logger.Named("plugin-api"),
}
}

Expand Down Expand Up @@ -71,6 +80,43 @@ func (a *PluginAPI) DecryptSecret(ctx context.Context, req *service.DecryptSecre
}, nil
}

func (a *PluginAPI) ReportStageLogs(ctx context.Context, req *service.ReportStageLogsRequest) (*service.ReportStageLogsResponse, error) {
_, err := a.apiClient.ReportStageLogs(ctx, &pipedservice.ReportStageLogsRequest{
DeploymentId: req.DeploymentId,
StageId: req.StageId,
RetriedCount: req.RetriedCount,
Blocks: req.Blocks,
})
if err != nil {
a.Logger.Error("failed to report stage logs",
zap.String("deploymentID", req.DeploymentId),
zap.String("stageID", req.StageId),
zap.Error(err))
return nil, err
}

return &service.ReportStageLogsResponse{}, nil
}

func (a *PluginAPI) ReportStageLogsFromLastCheckpoint(ctx context.Context, req *service.ReportStageLogsFromLastCheckpointRequest) (*service.ReportStageLogsFromLastCheckpointResponse, error) {
_, err := a.apiClient.ReportStageLogsFromLastCheckpoint(ctx, &pipedservice.ReportStageLogsFromLastCheckpointRequest{
DeploymentId: req.DeploymentId,
StageId: req.StageId,
RetriedCount: req.RetriedCount,
Blocks: req.Blocks,
Completed: req.Completed,
})
if err != nil {
a.Logger.Error("failed to report stage logs from last checkpoint",
zap.String("deploymentID", req.DeploymentId),
zap.String("stageID", req.StageId),
zap.Error(err))
return nil, err
}

return &service.ReportStageLogsFromLastCheckpointResponse{}, nil
}

func initializeSecretDecrypter(sm *config.SecretManagement) (crypto.Decrypter, error) {
if sm == nil {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
// Start running plugin service server.
{
var (
service = grpcapi.NewPluginAPI(cfg, input.Logger)
service = grpcapi.NewPluginAPI(cfg, apiClient, input.Logger)
opts = []rpc.Option{
rpc.WithPort(p.pluginServicePort),
rpc.WithGracePeriod(p.gracePeriod),
Expand Down
Loading

0 comments on commit 23eddac

Please sign in to comment.