From a815976b9b6cb3c15f9148218b860b100d5a73ea Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 18:50:11 +0800 Subject: [PATCH 01/10] fix --- proxy/handler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/proxy/handler.go b/proxy/handler.go index 752f892..1e9cbaf 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -116,6 +116,20 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt go func() { f := &frame{} for i := 0; ; i++ { + // Sometimes we will read the header first, so we need to set it first + // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go + // line 224 + if i == 0 { + md, err := src.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } if err := src.RecvMsg(f); err != nil { ret <- err // this can be io.EOF which is happy case break From b9a41a6b9548964d7f6c61e6e206003179b550d7 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 18:54:19 +0800 Subject: [PATCH 02/10] fix --- proxy/handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 1e9cbaf..1c01a3d 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -120,15 +120,15 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go // line 224 if i == 0 { - md, err := src.Header() + _, err := src.Header() if err != nil { ret <- err break } - if err := dst.SendHeader(md); err != nil { - ret <- err - break - } + //if err := dst.SendHeader(md); err != nil { + // ret <- err + // break + //} } if err := src.RecvMsg(f); err != nil { ret <- err // this can be io.EOF which is happy case From 6d732ba72d960704ff7a190f8184f90b835dcc7d Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 19:39:58 +0800 Subject: [PATCH 03/10] fix --- proxy/handler.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 1c01a3d..2869ca2 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -120,15 +120,15 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go // line 224 if i == 0 { - _, err := src.Header() + md, err := src.Header() if err != nil { ret <- err break } - //if err := dst.SendHeader(md); err != nil { - // ret <- err - // break - //} + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } } if err := src.RecvMsg(f); err != nil { ret <- err // this can be io.EOF which is happy case @@ -138,15 +138,15 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt // This is a bit of a hack, but client to server headers are only readable after first client msg is // received but must be written to server stream before the first msg is flushed. // This is the only place to do it nicely. - md, err := src.Header() - if err != nil { - ret <- err - break - } - if err := dst.SendHeader(md); err != nil { - ret <- err - break - } + //md, err := src.Header() + //if err != nil { + // ret <- err + // break + //} + //if err := dst.SendHeader(md); err != nil { + // ret <- err + // break + //} } if err := dst.SendMsg(f); err != nil { ret <- err From 3414ba9b1a81b9cd1545d22a4108e872a89912bf Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:04:28 +0800 Subject: [PATCH 04/10] fix --- proxy/handler.go | 60 +++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 2869ca2..3139a6a 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -5,6 +5,7 @@ package proxy import ( "io" + "sync" "golang.org/x/net/context" "google.golang.org/grpc" @@ -46,12 +47,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} + streamer := &handler{director: director} return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + sendHeader bool + sync.Locker } // handler is where the real magic of proxying happens. @@ -80,6 +83,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ s2cErrChan := s.forwardServerToClient(serverStream, clientStream) c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + s.forwardClientHeaderToServer(clientStream, serverStream) // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { @@ -111,15 +115,39 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } +func (s *handler) forwardClientHeaderToServer(src grpc.ClientStream, dst grpc.ServerStream, ) chan error { + ret := make(chan error, 1) + go func() { + s.Lock() + if !s.sendHeader { + md, err := src.Header() + if err != nil { + ret <- err + } + if err := dst.SendHeader(md); err != nil { + ret <- err + } + s.sendHeader = true + } + s.Unlock() + }() + return ret +} + func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { ret := make(chan error, 1) go func() { f := &frame{} for i := 0; ; i++ { - // Sometimes we will read the header first, so we need to set it first - // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go - // line 224 - if i == 0 { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + s.Lock() + if i == 0 && !s.sendHeader { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. md, err := src.Header() if err != nil { ret <- err @@ -129,25 +157,9 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt ret <- err break } + s.sendHeader = true } - if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case - break - } - if i == 0 { - // This is a bit of a hack, but client to server headers are only readable after first client msg is - // received but must be written to server stream before the first msg is flushed. - // This is the only place to do it nicely. - //md, err := src.Header() - //if err != nil { - // ret <- err - // break - //} - //if err := dst.SendHeader(md); err != nil { - // ret <- err - // break - //} - } + s.Unlock() if err := dst.SendMsg(f); err != nil { ret <- err break From 458b85ad54d63d679fc34d1e17882a24a9a76ecb Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:06:18 +0800 Subject: [PATCH 05/10] fix --- proxy/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/handler.go b/proxy/handler.go index 3139a6a..187aec5 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -24,7 +24,7 @@ var ( // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} + streamer := &handler{director: director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), From 3b8df91fedce21945a159be383a0cdc59d72ba45 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:11:51 +0800 Subject: [PATCH 06/10] fix --- proxy/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 187aec5..79dce20 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -52,9 +52,9 @@ func TransparentHandler(director StreamDirector) grpc.StreamHandler { } type handler struct { - director StreamDirector - sendHeader bool - sync.Locker + director StreamDirector + sendHeader bool + sendHeaderLock sync.Mutex } // handler is where the real magic of proxying happens. From d002dd6774477c55e38369e14b0dd2776c23a905 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:12:56 +0800 Subject: [PATCH 07/10] fix --- proxy/handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index 79dce20..eea68e5 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -47,14 +47,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director: director} + streamer := &handler{director: director, sendHeader: false,} return streamer.handler } type handler struct { - director StreamDirector - sendHeader bool - sendHeaderLock sync.Mutex + director StreamDirector + sendHeader bool + sync.Mutex } // handler is where the real magic of proxying happens. From f040d95a8ff32136c4a573aa3f013391d81c0445 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:35:25 +0800 Subject: [PATCH 08/10] optimized code and fixed markdown --- proxy/handler.go | 50 ++++++++++++++---------------------------------- 1 file changed, 14 insertions(+), 36 deletions(-) diff --git a/proxy/handler.go b/proxy/handler.go index eea68e5..18b71d2 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -4,12 +4,10 @@ package proxy import ( - "io" - "sync" - "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "io" ) var ( @@ -24,7 +22,7 @@ var ( // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director: director} + streamer := &handler{director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -47,14 +45,12 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director: director, sendHeader: false,} + streamer := &handler{director} return streamer.handler } type handler struct { - director StreamDirector - sendHeader bool - sync.Mutex + director StreamDirector } // handler is where the real magic of proxying happens. @@ -83,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ s2cErrChan := s.forwardServerToClient(serverStream, clientStream) c2sErrChan := s.forwardClientToServer(clientStream, serverStream) - s.forwardClientHeaderToServer(clientStream, serverStream) + //s.forwardClientHeaderToServer(clientStream, serverStream) // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { @@ -115,36 +111,16 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } -func (s *handler) forwardClientHeaderToServer(src grpc.ClientStream, dst grpc.ServerStream, ) chan error { - ret := make(chan error, 1) - go func() { - s.Lock() - if !s.sendHeader { - md, err := src.Header() - if err != nil { - ret <- err - } - if err := dst.SendHeader(md); err != nil { - ret <- err - } - s.sendHeader = true - } - s.Unlock() - }() - return ret -} - func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { ret := make(chan error, 1) go func() { f := &frame{} for i := 0; ; i++ { - if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case - break - } - s.Lock() - if i == 0 && !s.sendHeader { + if i == 0 { + // Because sometimes the client will read the header first + // it is necessary to advance the header data exchange to recv. + // https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go#L212 + // This is a bit of a hack, but client to server headers are only readable after first client msg is // received but must be written to server stream before the first msg is flushed. // This is the only place to do it nicely. @@ -157,9 +133,11 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt ret <- err break } - s.sendHeader = true } - s.Unlock() + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } if err := dst.SendMsg(f); err != nil { ret <- err break From 7410a977f11de9d9d6add77297d600abe0b06f87 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 22 Mar 2020 20:42:11 +0800 Subject: [PATCH 09/10] optimized code and fixed markdown --- README.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 06195e4..9d53120 100644 --- a/README.md +++ b/README.md @@ -25,22 +25,24 @@ not know about registered handlers or their data types. Please consult the docs, Defining a `StreamDirector` that decides where (if at all) to send the request ```go -director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) { +director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromIncomingContext(ctx) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. - return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) + c, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure()) + return ctx, c, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { - return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) + c, err := grpc.DialContext(ctx, "localhost:50055", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure()) + return ctx, c, err } } - return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } ``` Then you need to register it with a `grpc.Server`. The server may have other handlers that will be served From a2269fff9d7f1746fb3163198cbae4301b9e3d00 Mon Sep 17 00:00:00 2001 From: niuyufu Date: Sun, 17 May 2020 13:18:33 +0800 Subject: [PATCH 10/10] fix bug --- proxy/codec.go | 3 +++ proxy/handler.go | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/proxy/codec.go b/proxy/codec.go index 846b9c4..0ae9942 100644 --- a/proxy/codec.go +++ b/proxy/codec.go @@ -10,6 +10,7 @@ import ( // Codec returns a proxying grpc.Codec with the default protobuf codec as parent. // // See CodecWithParent. +// 构建输出函数 func Codec() grpc.Codec { return CodecWithParent(&protoCodec{}) } @@ -32,6 +33,7 @@ type frame struct { payload []byte } +//构建原始字节解码器 func (c *rawCodec) Marshal(v interface{}) ([]byte, error) { out, ok := v.(*frame) if !ok { @@ -55,6 +57,7 @@ func (c *rawCodec) String() string { } // protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC. +// 构建proto解码器 type protoCodec struct{} func (protoCodec) Marshal(v interface{}) ([]byte, error) { diff --git a/proxy/handler.go b/proxy/handler.go index 18b71d2..7c77c5f 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -67,7 +67,8 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if err != nil { return err } - + defer backendConn.Close() + clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)