Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TransparentHandlerWithOptions function #73

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
// RegisterService sets up a proxy handler for a particular gRPC service and method.
// The behaviour is the same as if you were registering a handler method, e.g. from a generated pb.go file.
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
streamer := &handler{director}
streamer := &handler{director: director}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
Expand All @@ -41,15 +41,24 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s
}

// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// backends. It should be used as a `grpc.UnknownServiceHandler`.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director: director}
return TransparentHandlerWithOpts(director)
}

// TransparentHandlerWithOpts returns a handler that attempts to proxy all requests that are not registered in the server.
// The provided call-options will be set on the internal gRPC client stream created by this transport.
// The intended use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// backends. It should be used as a `grpc.UnknownServiceHandler`.
func TransparentHandlerWithOpts(director StreamDirector, callOptions ...grpc.CallOption) grpc.StreamHandler {
streamer := &handler{director: director, callOpts: callOptions}
return streamer.handler
}

type handler struct {
director StreamDirector
callOpts []grpc.CallOption
}

// handler is where the real magic of proxying happens.
Expand All @@ -70,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
defer clientCancel()
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName, s.callOpts...)
if err != nil {
return err
}
Expand Down
36 changes: 34 additions & 2 deletions proxy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package proxy_test

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -35,6 +36,7 @@ const (
rejectingMdKey = "test-reject-rpc-if-in-context"

countListResponses = 20
maxRecvMsgSize = 10 * 1024 * 1024
)

// asserting service is implemented on the server side and serves as a handler for stuff
Expand Down Expand Up @@ -179,6 +181,33 @@ func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() {
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
}

func (s *ProxyHappySuite) TestPingStream_TransparentHandlerOpts() {
stream, err := s.testClient.PingStream(context.Background(), grpc.MaxCallRecvMsgSize(maxRecvMsgSize))
require.NoError(s.T(), err, "PingStream request should be successful.")

// Check that the grpc CallOption is used by the TransparentHandler's client by creating a 9MB payload.
// The default for gRPC is 4MB, so this should fail if the option is unused.
var buff bytes.Buffer
for i := 0; i < 1024*1024; i++ {
buff.WriteString("123456789")
}
payload := buff.String()

ping := &pb.PingRequest{Value: payload}
require.NoError(s.T(), stream.Send(ping), "sending a large payload to PingStream must not fail")

resp, err := stream.Recv()
require.NoError(s.T(), err, "no error on ping")
require.Equal(s.T(), payload, resp.Value, "pong should reply with the large payload")

require.NoError(s.T(), stream.CloseSend(), "no error on close send")
_, err = stream.Recv()
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK")
// Check that the trailer headers are here.
trailerMd := stream.Trailer()
assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata")
}

func (s *ProxyHappySuite) TestPingStream_StressTest() {
for i := 0; i < 50; i++ {
s.TestPingStream_FullDuplexWorks()
Expand All @@ -193,7 +222,7 @@ func (s *ProxyHappySuite) SetupSuite() {
s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")

s.server = grpc.NewServer()
s.server = grpc.NewServer(grpc.MaxRecvMsgSize(maxRecvMsgSize))
pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})

// Setup of the proxy's Director.
Expand All @@ -211,10 +240,13 @@ func (s *ProxyHappySuite) SetupSuite() {
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
return outCtx, s.serverClientConn, nil
}

handler := proxy.TransparentHandlerWithOpts(director, grpc.MaxCallRecvMsgSize(maxRecvMsgSize))
s.proxy = grpc.NewServer(
//lint:ignore SA1019 regression test
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.UnknownServiceHandler(handler),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
Expand Down
2 changes: 1 addition & 1 deletion testservice/testping.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestTestServiceServerImpl(t *testing.T, client TestServiceClient) {
}
res, err := stream.Recv()
if err != nil {
t.Errorf("receiving full duplex stream: %w", err)
t.Errorf("receiving full duplex stream: %v", err)
return
}
t.Logf("got %v (%d)", res.Value, res.Counter)
Expand Down