From cbd95fd45d8b27bf8f6cd8a07acb5bed7fc58196 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Wed, 6 Dec 2023 10:48:10 -0600 Subject: [PATCH 1/2] Fix linter error on test runs. --- testservice/testping.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testservice/testping.go b/testservice/testping.go index de5f0a9..ef9cd8b 100644 --- a/testservice/testping.go +++ b/testservice/testping.go @@ -99,7 +99,7 @@ func TestTestServiceServerImpl(t *testing.T, client TestServiceClient) { } res, err := stream.Recv() if err != nil { - t.Errorf("receiving full duplex stream: %w", err) + t.Errorf("receiving full duplex stream: %v", err) return } t.Logf("got %v (%d)", res.Value, res.Counter) From 5510011c7d2bb8de3b8fa54836de3e6d2cdbbb26 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Wed, 6 Dec 2023 10:52:05 -0600 Subject: [PATCH 2/2] Add TransparentHandlerWithOptions function. Prior to this function being added, it was not possible to set gRPC CallOptions on the call to `grpc.NewClientStream(...)` performed by the TransparentHandler. Notably, the `grpc.MaxRecvMsgSize()` would not be configured and would result in closed streams if the payload size was exceeded, despite the fact that the proxy's server had the value correctly configured. The introduction of this `TransparentHandlerWithOptions(...)` function allows for various settings to be configured on the handler and passed through to the `grpc.NewClientStream(...)` call. --- proxy/handler.go | 17 +++++++++++++---- proxy/handler_test.go | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 745a739..c096b19 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -23,7 +23,7 @@ var ( // RegisterService sets up a proxy handler for a particular gRPC service and method. // The behaviour is the same as if you were registering a handler method, e.g. from a generated pb.go file. func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} + streamer := &handler{director: director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -41,15 +41,24 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s } // TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. -// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the // backends. It should be used as a `grpc.UnknownServiceHandler`. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director: director} + return TransparentHandlerWithOpts(director) +} + +// TransparentHandlerWithOpts returns a handler that attempts to proxy all requests that are not registered in the server. +// The provided call-options will be set on the internal gRPC client stream created by this transport. +// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +func TransparentHandlerWithOpts(director StreamDirector, callOptions ...grpc.CallOption) grpc.StreamHandler { + streamer := &handler{director: director, callOpts: callOptions} return streamer.handler } type handler struct { director StreamDirector + callOpts []grpc.CallOption } // handler is where the real magic of proxying happens. @@ -70,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error clientCtx, clientCancel := context.WithCancel(outgoingCtx) defer clientCancel() // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName, s.callOpts...) if err != nil { return err } diff --git a/proxy/handler_test.go b/proxy/handler_test.go index 7d07a32..7610f9f 100644 --- a/proxy/handler_test.go +++ b/proxy/handler_test.go @@ -4,6 +4,7 @@ package proxy_test import ( + "bytes" "context" "fmt" "io" @@ -35,6 +36,7 @@ const ( rejectingMdKey = "test-reject-rpc-if-in-context" countListResponses = 20 + maxRecvMsgSize = 10 * 1024 * 1024 ) // asserting service is implemented on the server side and serves as a handler for stuff @@ -179,6 +181,33 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } +func (s *ProxyHappySuite) TestPingStream_TransparentHandlerOpts() { + stream, err := s.testClient.PingStream(context.Background(), grpc.MaxCallRecvMsgSize(maxRecvMsgSize)) + require.NoError(s.T(), err, "PingStream request should be successful.") + + // Check that the grpc CallOption is used by the TransparentHandler's client by creating a 9MB payload. + // The default for gRPC is 4MB, so this should fail if the option is unused. + var buff bytes.Buffer + for i := 0; i < 1024*1024; i++ { + buff.WriteString("123456789") + } + payload := buff.String() + + ping := &pb.PingRequest{Value: payload} + require.NoError(s.T(), stream.Send(ping), "sending a large payload to PingStream must not fail") + + resp, err := stream.Recv() + require.NoError(s.T(), err, "no error on ping") + require.Equal(s.T(), payload, resp.Value, "pong should reply with the large payload") + + require.NoError(s.T(), stream.CloseSend(), "no error on close send") + _, err = stream.Recv() + require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") + // Check that the trailer headers are here. + trailerMd := stream.Trailer() + assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") +} + func (s *ProxyHappySuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() @@ -193,7 +222,7 @@ func (s *ProxyHappySuite) SetupSuite() { s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for serverListener") - s.server = grpc.NewServer() + s.server = grpc.NewServer(grpc.MaxRecvMsgSize(maxRecvMsgSize)) pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) // Setup of the proxy's Director. @@ -211,10 +240,13 @@ func (s *ProxyHappySuite) SetupSuite() { outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) return outCtx, s.serverClientConn, nil } + + handler := proxy.TransparentHandlerWithOpts(director, grpc.MaxCallRecvMsgSize(maxRecvMsgSize)) s.proxy = grpc.NewServer( //lint:ignore SA1019 regression test grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.MaxRecvMsgSize(maxRecvMsgSize), + grpc.UnknownServiceHandler(handler), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director,