Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blocking problem caused by client reading header header #47

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ not know about registered handlers or their data types. Please consult the docs,

Defining a `StreamDirector` that decides where (if at all) to send the request
```go
director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) {
director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromContext(ctx)
md, ok := metadata.FromIncomingContext(ctx)
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
c, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure())
return ctx, c, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
c, err := grpc.DialContext(ctx, "localhost:50055", grpc.WithCodec(proxy.Codec()),grpc.WithInsecure())
return ctx, c, err
}
}
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return ctx, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
```
Then you need to register it with a `grpc.Server`. The server may have other handlers that will be served
Expand Down
3 changes: 3 additions & 0 deletions proxy/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
// Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
//
// See CodecWithParent.
// 构建输出函数
func Codec() grpc.Codec {
return CodecWithParent(&protoCodec{})
}
Expand All @@ -32,6 +33,7 @@ type frame struct {
payload []byte
}

//构建原始字节解码器
func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
if !ok {
Expand All @@ -55,6 +57,7 @@ func (c *rawCodec) String() string {
}

// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
// 构建proto解码器
type protoCodec struct{}

func (protoCodec) Marshal(v interface{}) ([]byte, error) {
Expand Down
19 changes: 12 additions & 7 deletions proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
package proxy

import (
"io"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"io"
)

var (
Expand Down Expand Up @@ -68,7 +67,8 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
if err != nil {
return err
}

defer backendConn.Close()

clientCtx, clientCancel := context.WithCancel(outgoingCtx)
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
Expand All @@ -80,6 +80,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
//s.forwardClientHeaderToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
Expand Down Expand Up @@ -116,11 +117,11 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// Because sometimes the client will read the header first
// it is necessary to advance the header data exchange to recv.
// https://github.com/grpc/grpc-go/blob/master/examples/features/metadata/client/main.go#L212

// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
Expand All @@ -134,6 +135,10 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
break
}
}
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
Expand Down