diff --git a/proxy/handler.go b/proxy/handler.go index 745a739..c096b19 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -23,7 +23,7 @@ var ( // RegisterService sets up a proxy handler for a particular gRPC service and method. // The behaviour is the same as if you were registering a handler method, e.g. from a generated pb.go file. func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} + streamer := &handler{director: director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -41,15 +41,24 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s } // TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. -// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the // backends. It should be used as a `grpc.UnknownServiceHandler`. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director: director} + return TransparentHandlerWithOpts(director) +} + +// TransparentHandlerWithOpts returns a handler that attempts to proxy all requests that are not registered in the server. +// The provided call-options will be set on the internal gRPC client stream created by this transport. +// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +func TransparentHandlerWithOpts(director StreamDirector, callOptions ...grpc.CallOption) grpc.StreamHandler { + streamer := &handler{director: director, callOpts: callOptions} return streamer.handler } type handler struct { director StreamDirector + callOpts []grpc.CallOption } // handler is where the real magic of proxying happens. @@ -70,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error clientCtx, clientCancel := context.WithCancel(outgoingCtx) defer clientCancel() // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName, s.callOpts...) if err != nil { return err } diff --git a/proxy/handler_test.go b/proxy/handler_test.go index 7d07a32..7610f9f 100644 --- a/proxy/handler_test.go +++ b/proxy/handler_test.go @@ -4,6 +4,7 @@ package proxy_test import ( + "bytes" "context" "fmt" "io" @@ -35,6 +36,7 @@ const ( rejectingMdKey = "test-reject-rpc-if-in-context" countListResponses = 20 + maxRecvMsgSize = 10 * 1024 * 1024 ) // asserting service is implemented on the server side and serves as a handler for stuff @@ -179,6 +181,33 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") } +func (s *ProxyHappySuite) TestPingStream_TransparentHandlerOpts() { + stream, err := s.testClient.PingStream(context.Background(), grpc.MaxCallRecvMsgSize(maxRecvMsgSize)) + require.NoError(s.T(), err, "PingStream request should be successful.") + + // Check that the grpc CallOption is used by the TransparentHandler's client by creating a 9MB payload. + // The default for gRPC is 4MB, so this should fail if the option is unused. + var buff bytes.Buffer + for i := 0; i < 1024*1024; i++ { + buff.WriteString("123456789") + } + payload := buff.String() + + ping := &pb.PingRequest{Value: payload} + require.NoError(s.T(), stream.Send(ping), "sending a large payload to PingStream must not fail") + + resp, err := stream.Recv() + require.NoError(s.T(), err, "no error on ping") + require.Equal(s.T(), payload, resp.Value, "pong should reply with the large payload") + + require.NoError(s.T(), stream.CloseSend(), "no error on close send") + _, err = stream.Recv() + require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") + // Check that the trailer headers are here. + trailerMd := stream.Trailer() + assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") +} + func (s *ProxyHappySuite) TestPingStream_StressTest() { for i := 0; i < 50; i++ { s.TestPingStream_FullDuplexWorks() @@ -193,7 +222,7 @@ func (s *ProxyHappySuite) SetupSuite() { s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for serverListener") - s.server = grpc.NewServer() + s.server = grpc.NewServer(grpc.MaxRecvMsgSize(maxRecvMsgSize)) pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) // Setup of the proxy's Director. @@ -211,10 +240,13 @@ func (s *ProxyHappySuite) SetupSuite() { outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) return outCtx, s.serverClientConn, nil } + + handler := proxy.TransparentHandlerWithOpts(director, grpc.MaxCallRecvMsgSize(maxRecvMsgSize)) s.proxy = grpc.NewServer( //lint:ignore SA1019 regression test grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.MaxRecvMsgSize(maxRecvMsgSize), + grpc.UnknownServiceHandler(handler), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, diff --git a/testservice/testping.go b/testservice/testping.go index de5f0a9..ef9cd8b 100644 --- a/testservice/testping.go +++ b/testservice/testping.go @@ -99,7 +99,7 @@ func TestTestServiceServerImpl(t *testing.T, client TestServiceClient) { } res, err := stream.Recv() if err != nil { - t.Errorf("receiving full duplex stream: %w", err) + t.Errorf("receiving full duplex stream: %v", err) return } t.Logf("got %v (%d)", res.Value, res.Counter)