diff --git a/README.md b/README.md index 06195e4..9d53120 100644 --- a/README.md +++ b/README.md @@ -25,22 +25,24 @@ not know about registered handlers or their data types. Please consult the docs, Defining a `StreamDirector` that decides where (if at all) to send the request ```go -director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) { +director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromIncomingContext(ctx) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. - return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) + c, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure()) + return ctx, c, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { - return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) + c, err := grpc.DialContext(ctx, "localhost:50055", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure()) + return ctx, c, err } } - return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } ``` Then you need to register it with a `grpc.Server`. The server may have other handlers that will be served diff --git a/proxy/codec.go b/proxy/codec.go index 846b9c4..0ae9942 100644 --- a/proxy/codec.go +++ b/proxy/codec.go @@ -10,6 +10,7 @@ import ( // Codec returns a proxying grpc.Codec with the default protobuf codec as parent. // // See CodecWithParent. +// 构建输出函数 func Codec() grpc.Codec { return CodecWithParent(&protoCodec{}) } @@ -32,6 +33,7 @@ type frame struct { payload []byte } +//构建原始字节解码器 func (c *rawCodec) Marshal(v interface{}) ([]byte, error) { out, ok := v.(*frame) if !ok { @@ -55,6 +57,7 @@ func (c *rawCodec) String() string { } // protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC. +// 构建proto解码器 type protoCodec struct{} func (protoCodec) Marshal(v interface{}) ([]byte, error) { diff --git a/proxy/handler.go b/proxy/handler.go index 752f892..7c77c5f 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -4,11 +4,10 @@ package proxy import ( - "io" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "io" ) var ( @@ -68,7 +67,8 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if err != nil { return err } - + defer backendConn.Close() + clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) @@ -80,6 +80,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ s2cErrChan := s.forwardServerToClient(serverStream, clientStream) c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + //s.forwardClientHeaderToServer(clientStream, serverStream) // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { @@ -116,11 +117,11 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt go func() { f := &frame{} for i := 0; ; i++ { - if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case - break - } if i == 0 { + // Because sometimes the client will read the header first + // it is necessary to advance the header data exchange to recv. + // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go#L212 + // This is a bit of a hack, but client to server headers are only readable after first client msg is // received but must be written to server stream before the first msg is flushed. // This is the only place to do it nicely. @@ -134,6 +135,10 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt break } } + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } if err := dst.SendMsg(f); err != nil { ret <- err break