From 35d3aaa6496e75ddb0c87b539dee07f45a04ba47 Mon Sep 17 00:00:00 2001 From: Carl Braganza Date: Tue, 8 Oct 2024 02:37:11 +0000 Subject: [PATCH] Log stream responses with progress with a separate detailed trace level. --- .prow.sh | 0 Makefile | 5 +- pkg/internal/server/grpc/common_test.go | 27 ++++++++ .../server/grpc/get_metadata_allocated.go | 64 +++++++++++++++++-- .../grpc/get_metadata_allocated_test.go | 16 +++-- .../server/grpc/get_metadata_delta.go | 64 +++++++++++++++++-- .../server/grpc/get_metadata_delta_test.go | 16 +++-- pkg/internal/server/grpc/server.go | 3 +- pkg/internal/server/grpc/status.go | 2 +- 9 files changed, 177 insertions(+), 20 deletions(-) mode change 100644 => 100755 .prow.sh diff --git a/.prow.sh b/.prow.sh old mode 100644 new mode 100755 diff --git a/Makefile b/Makefile index 82d9b64f..447c10bd 100644 --- a/Makefile +++ b/Makefile @@ -39,5 +39,6 @@ CMDS=csi-snapshot-metadata include release-tools/build.make -# Extend the test target to include lint -test: lint +# Eventually extend the test target to include lint. +# Currently the linter is not available in the CI infrastructure. +#test: lint diff --git a/pkg/internal/server/grpc/common_test.go b/pkg/internal/server/grpc/common_test.go index e27d8244..9251d5f8 100644 --- a/pkg/internal/server/grpc/common_test.go +++ b/pkg/internal/server/grpc/common_test.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "flag" "net" + "strconv" "strings" "testing" @@ -37,6 +39,7 @@ import ( apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" clientgotesting "k8s.io/client-go/testing" + "k8s.io/klog/v2" smsv1alpha1 "github.com/kubernetes-csi/external-snapshot-metadata/client/apis/snapshotmetadataservice/v1alpha1" fakecbt "github.com/kubernetes-csi/external-snapshot-metadata/client/clientset/versioned/fake" @@ -407,3 +410,27 @@ func convStringByteMapToStringStringMap(inMap map[string][]byte) map[string]stri } return ret } + +type KlogRestoreVerbosityFunc func() + +// SetKlogVerbosity sets up the default logger with the specified verbosity level. +func (th *testHarness) SetKlogVerbosity(verboseLevel int, uniquePrefix string) KlogRestoreVerbosityFunc { + klog.ClearLogger() + // Set the verbosity level using a new flag set. + // It is not possible to set a verbose klog/v2/testlogger as the background logger + // because the klog.V() performs its own checks. + var level klog.Level + level.Set(strconv.Itoa(verboseLevel)) + fs := flag.NewFlagSet(uniquePrefix+"Fs1", flag.ContinueOnError) + fs.Var(&level, uniquePrefix+"V1", "test log verbosity level") + klog.InitFlags(fs) + + return func() { + // restore the verbosity level using a new flag set + klog.ClearLogger() + fs := flag.NewFlagSet(uniquePrefix+"Fs2", flag.ExitOnError) + level.Set("1") + fs.Var(&level, uniquePrefix+"V2", "test log verbosity level") + klog.InitFlags(fs) + } +} diff --git a/pkg/internal/server/grpc/get_metadata_allocated.go b/pkg/internal/server/grpc/get_metadata_allocated.go index 23bf8e39..7bd20974 100644 --- a/pkg/internal/server/grpc/get_metadata_allocated.go +++ b/pkg/internal/server/grpc/get_metadata_allocated.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "fmt" "io" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -114,22 +116,76 @@ func (s *Server) convertToCSIGetMetadataAllocatedRequest(ctx context.Context, re } func (s *Server) streamGetMetadataAllocatedResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataAllocatedServer, csiStream csi.SnapshotMetadata_GetMetadataAllocatedClient) error { + var ( + blockMetadataType api.BlockMetadataType + lastByteOffset int64 + lastSize int64 + logger = klog.FromContext(ctx) + numBlockMetadata int + responseNum int + volumeCapacityBytes int64 + ) + for { csiResp, err := csiStream.Recv() if err == io.EOF { - klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF") + logger.V(HandlerTraceLogLevel).WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Info("stream EOF") return nil } - //TODO: stream logging with progress - if err != nil { + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedCSIDriverResponse) return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err) } + responseNum++ + clientResp := s.convertToGetMetadataAllocatedResponse(csiResp) + blockMetadataType = clientResp.BlockMetadataType + volumeCapacityBytes = clientResp.VolumeCapacityBytes + numBlockMetadata = len(clientResp.BlockMetadata) - 1 + if numBlockMetadata >= 0 { + lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset + lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes + } + + if logger.V(HandlerDetailedTraceLogLevel).Enabled() { + var b strings.Builder + b.WriteString("[") + for _, bmd := range clientResp.BlockMetadata { + b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes)) + } + b.WriteString("]") + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + "blockMetadata", b.String(), + "numBlockMetadata", len(clientResp.BlockMetadata), + ).Info("stream response") + } + if err := clientStream.Send(clientResp); err != nil { - return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err) + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedToSendResponse) + return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err) } } } diff --git a/pkg/internal/server/grpc/get_metadata_allocated_test.go b/pkg/internal/server/grpc/get_metadata_allocated_test.go index 208920ea..3b136474 100644 --- a/pkg/internal/server/grpc/get_metadata_allocated_test.go +++ b/pkg/internal/server/grpc/get_metadata_allocated_test.go @@ -482,13 +482,16 @@ type mockCSIMetadataAllocatedResponse struct { } func TestStreamGetMetadataAllocatedResponse(t *testing.T) { - ctx := context.Background() th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs() defer th.TerminateMockCSIDriver() grpcServer := th.StartGRPCServer(t, th.Runtime()) defer th.StopGRPCServer(t) + // Test at the trace logging level. + restoreFn := th.SetKlogVerbosity(HandlerDetailedTraceLogLevel, "Alloc") + defer restoreFn() + for _, tc := range []struct { name string req *api.GetMetadataAllocatedRequest @@ -698,15 +701,16 @@ func TestStreamGetMetadataAllocatedResponse(t *testing.T) { mockCSIStream.EXPECT().Recv().Return(nil, io.EOF) } + sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError} + ctx := grpcServer.getMetadataAllocatedContextWithLogger(tc.req, sms) + csiReq, err := grpcServer.convertToCSIGetMetadataAllocatedRequest(ctx, tc.req) assert.NoError(t, err) csiStream, err := csiClient.GetMetadataAllocated(ctx, csiReq) assert.NoError(t, err) - sms := &fakeStreamServerSnapshotAllocated{err: tc.mockK8sStreamError} - - errStream := grpcServer.streamGetMetadataAllocatedResponse(sms, csiStream) + errStream := grpcServer.streamGetMetadataAllocatedResponse(ctx, sms, csiStream) if tc.expectStreamError { assert.NoError(t, err) st, ok := status.FromError(errStream) @@ -729,6 +733,10 @@ type fakeStreamServerSnapshotAllocated struct { err error } +func (f *fakeStreamServerSnapshotAllocated) Context() context.Context { + return context.Background() +} + func (f *fakeStreamServerSnapshotAllocated) Send(m *api.GetMetadataAllocatedResponse) error { if f.err != nil { return f.err diff --git a/pkg/internal/server/grpc/get_metadata_delta.go b/pkg/internal/server/grpc/get_metadata_delta.go index cab300cb..f0760d7a 100644 --- a/pkg/internal/server/grpc/get_metadata_delta.go +++ b/pkg/internal/server/grpc/get_metadata_delta.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "fmt" "io" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" @@ -135,22 +137,76 @@ func (s *Server) convertToCSIGetMetadataDeltaRequest(ctx context.Context, req *a } func (s *Server) streamGetMetadataDeltaResponse(ctx context.Context, clientStream api.SnapshotMetadata_GetMetadataDeltaServer, csiStream csi.SnapshotMetadata_GetMetadataDeltaClient) error { + var ( + blockMetadataType api.BlockMetadataType + lastByteOffset int64 + lastSize int64 + logger = klog.FromContext(ctx) + numBlockMetadata int + responseNum int + volumeCapacityBytes int64 + ) + for { csiResp, err := csiStream.Recv() if err == io.EOF { - klog.FromContext(ctx).V(HandlerTraceLogLevel).Info("stream EOF") + logger.V(HandlerTraceLogLevel).WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Info("stream EOF") return nil } - //TODO: stream logging with progress - if err != nil { + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "lastResponseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedCSIDriverResponse) return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedCSIDriverResponseFmt, err) } + responseNum++ + clientResp := s.convertToGetMetadataDeltaResponse(csiResp) + blockMetadataType = clientResp.BlockMetadataType + volumeCapacityBytes = clientResp.VolumeCapacityBytes + numBlockMetadata = len(clientResp.BlockMetadata) - 1 + if numBlockMetadata >= 0 { + lastByteOffset = clientResp.BlockMetadata[numBlockMetadata].ByteOffset + lastSize = clientResp.BlockMetadata[numBlockMetadata].SizeBytes + } + + if logger.V(HandlerDetailedTraceLogLevel).Enabled() { + var b strings.Builder + b.WriteString("[") + for _, bmd := range clientResp.BlockMetadata { + b.WriteString(fmt.Sprintf("{%d,%d}", bmd.ByteOffset, bmd.SizeBytes)) + } + b.WriteString("]") + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + "blockMetadata", b.String(), + "numBlockMetadata", len(clientResp.BlockMetadata), + ).Info("stream response") + } + if err := clientStream.Send(clientResp); err != nil { - return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedtoSendResponseFmt, err) + logger.WithValues( + "blockMetadataType", blockMetadataType.String(), + "lastByteOffset", lastByteOffset, + "lastSize", lastSize, + "responseNum", responseNum, + "volumeCapacityBytes", volumeCapacityBytes, + ).Error(err, msgInternalFailedToSendResponse) + return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err) } } } diff --git a/pkg/internal/server/grpc/get_metadata_delta_test.go b/pkg/internal/server/grpc/get_metadata_delta_test.go index d6d94529..8da29f7c 100644 --- a/pkg/internal/server/grpc/get_metadata_delta_test.go +++ b/pkg/internal/server/grpc/get_metadata_delta_test.go @@ -712,13 +712,16 @@ type mockCSIMetadataDeltaResponse struct { } func TestStreamGetMetadataDeltaResponse(t *testing.T) { - ctx := context.Background() th := newTestHarness().WithMockCSIDriver(t).WithFakeClientAPIs() defer th.TerminateMockCSIDriver() grpcServer := th.StartGRPCServer(t, th.Runtime()) defer th.StopGRPCServer(t) + // Test at the trace logging level. + restoreFn := th.SetKlogVerbosity(HandlerDetailedTraceLogLevel, "Delta") + defer restoreFn() + for _, tc := range []struct { name string req *api.GetMetadataDeltaRequest @@ -932,15 +935,16 @@ func TestStreamGetMetadataDeltaResponse(t *testing.T) { mockCSIStream.EXPECT().Recv().Return(nil, io.EOF) } + sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError} + ctx := grpcServer.getMetadataDeltaContextWithLogger(tc.req, sms) + csiReq, err := grpcServer.convertToCSIGetMetadataDeltaRequest(ctx, tc.req) assert.NoError(t, err) csiStream, err := csiClient.GetMetadataDelta(ctx, csiReq) assert.NoError(t, err) - sms := &fakeStreamServerSnapshotDelta{err: tc.mockK8sStreamError} - - errStream := grpcServer.streamGetMetadataDeltaResponse(sms, csiStream) + errStream := grpcServer.streamGetMetadataDeltaResponse(ctx, sms, csiStream) if tc.expectStreamError { assert.NoError(t, err) st, ok := status.FromError(errStream) @@ -963,6 +967,10 @@ type fakeStreamServerSnapshotDelta struct { err error } +func (f *fakeStreamServerSnapshotDelta) Context() context.Context { + return context.Background() +} + func (f *fakeStreamServerSnapshotDelta) Send(m *api.GetMetadataDeltaResponse) error { if f.err != nil { return f.err diff --git a/pkg/internal/server/grpc/server.go b/pkg/internal/server/grpc/server.go index b53b2ffb..10386ce4 100644 --- a/pkg/internal/server/grpc/server.go +++ b/pkg/internal/server/grpc/server.go @@ -38,7 +38,8 @@ import ( ) const ( - HandlerTraceLogLevel = 4 + HandlerTraceLogLevel = 4 + HandlerDetailedTraceLogLevel = 5 ) type ServerConfig struct { diff --git a/pkg/internal/server/grpc/status.go b/pkg/internal/server/grpc/status.go index 6b988d58..04a01fd1 100644 --- a/pkg/internal/server/grpc/status.go +++ b/pkg/internal/server/grpc/status.go @@ -31,7 +31,7 @@ const ( msgInternalFailedToFindCR = "failed to find the SnapshotMetadataService CR for driver" msgInternalFailedToFindCRFmt = msgInternalFailedToFindCR + " '%s': %v" msgInternalFailedToSendResponse = "failed to send response" - msgInternalFailedtoSendResponseFmt = msgInternalFailedToSendResponse + ": %v" + msgInternalFailedToSendResponseFmt = msgInternalFailedToSendResponse + ": %v" msgInvalidArgumentBaseSnapshotNameMissing = "baseSnapshotName cannot be empty" msgInvalidArgumentNamespaceMissing = "namespace parameter cannot be empty"