diff --git a/checkup.sh b/checkup.sh index 37e9aac..fec37f6 100755 --- a/checkup.sh +++ b/checkup.sh @@ -1,21 +1,4 @@ #!/bin/bash # Script that checks up code (govet). -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd -P)" - -function print_real_go_files { - grep --files-without-match 'DO NOT EDIT!' $(find . -iname '*.go') -} - -function govet_all { - ret=0 - for i in $(print_real_go_files); do - output=$(go tool vet -all=true -tests=false ${i}) - ret=$(($ret | $?)) - echo -n ${output} - done; - return ${ret} -} - -govet_all -echo "returning $?" \ No newline at end of file +go vet proxy/*.go diff --git a/fixup.sh b/fixup.sh index 5b4a66a..95e4792 100755 --- a/fixup.sh +++ b/fixup.sh @@ -14,7 +14,7 @@ function generate_markdown { dir=${i%/*} echo "$dir" cd ${dir} - ${GOPATH}/bin/godocdown -heading=Title -o DOC.md + godocdown -heading=Title -o DOC.md #`go get github.com/robertkrimen/godocdown/godocdown` to install godocdown ln -s DOC.md README.md 2> /dev/null # can fail cd ${oldpwd} done; @@ -28,4 +28,4 @@ function goimports_all { generate_markdown goimports_all -echo "returning $?" \ No newline at end of file +echo "returning $?" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7a2ddd8 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module github.com/mwitkow/grpc-proxy + +go 1.13 + +require ( + github.com/golang/protobuf v1.4.2 + github.com/stretchr/testify v1.6.1 + golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect + golang.org/x/net v0.0.0-20200707034311-ab3426394381 + golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect + google.golang.org/grpc v1.31.0 + honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..27807f3 --- /dev/null +++ b/go.sum @@ -0,0 +1,83 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= +golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.31.0 h1:T7P4R73V3SSDPhH7WW7ATbfViLtmamH0DKrP3f9AuDI= +google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/proxy/DOC.md b/proxy/DOC.md index 85c411a..a7aa1bd 100644 --- a/proxy/DOC.md +++ b/proxy/DOC.md @@ -19,18 +19,19 @@ See examples on documented functions. #### func Codec ```go -func Codec() grpc.Codec +func Codec() encoding.Codec ``` -Codec returns a proxying grpc.Codec with the default protobuf codec as parent. +Codec returns a proxying encoding.Codec with the default protobuf codec as +parent. See CodecWithParent. #### func CodecWithParent ```go -func CodecWithParent(fallback grpc.Codec) grpc.Codec +func CodecWithParent(fallback encoding.Codec) encoding.Codec ``` -CodecWithParent returns a proxying grpc.Codec with a user provided codec as +CodecWithParent returns a proxying encoding.Codec with a user provided codec as parent. This codec is *crucial* to the functioning of the proxy. It allows the proxy @@ -64,10 +65,53 @@ backends. It should be used as a `grpc.UnknownServiceHandler`. This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. +#### type Pool + +```go +type Pool struct { + sync.Mutex +} +``` + + +#### func NewPool + +```go +func NewPool(size int, ttl time.Duration, idle int, ms int) *Pool +``` + +#### func (*Pool) GetConn + +```go +func (p *Pool) GetConn(addr string, opts ...grpc.DialOption) (*PoolConn, error) +``` + +#### func (*Pool) Release + +```go +func (p *Pool) Release(addr string, conn *PoolConn, err error) +``` + +#### type PoolConn + +```go +type PoolConn struct { + // grpc conn + *grpc.ClientConn +} +``` + + +#### func (*PoolConn) Close + +```go +func (conn *PoolConn) Close() +``` + #### type StreamDirector ```go -type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *PoolConn, error) ``` StreamDirector returns a gRPC ClientConn to be used to forward the call to. @@ -76,6 +120,12 @@ The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned. +The context returned from this function should be the context for the *outgoing* +(to backend) call. In case you want to forward any Metadata between the inbound +request and outbound requests, you should do it manually. However, you *must* +propagate the cancel function (`context.WithCancel`) of the inbound context to +the one returned. + It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors are invoked. So decisions around authorization, monitoring etc. are better to be handled there. diff --git a/proxy/codec.go b/proxy/codec.go index 846b9c4..d4e8711 100644 --- a/proxy/codec.go +++ b/proxy/codec.go @@ -3,29 +3,30 @@ package proxy import ( "fmt" + "google.golang.org/grpc/encoding" + "github.com/golang/protobuf/proto" - "google.golang.org/grpc" ) -// Codec returns a proxying grpc.Codec with the default protobuf codec as parent. +// Codec returns a proxying encoding.Codec with the default protobuf codec as parent. // // See CodecWithParent. -func Codec() grpc.Codec { +func Codec() encoding.Codec { return CodecWithParent(&protoCodec{}) } -// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent. +// CodecWithParent returns a proxying encoding.Codec with a user provided codec as parent. // // This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious // to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes. // However, if the server handler, or the client caller are not proxy-internal functions it will fall back // to trying to decode the message using a fallback codec. -func CodecWithParent(fallback grpc.Codec) grpc.Codec { +func CodecWithParent(fallback encoding.Codec) encoding.Codec { return &rawCodec{fallback} } type rawCodec struct { - parentCodec grpc.Codec + parentCodec encoding.Codec } type frame struct { @@ -51,7 +52,11 @@ func (c *rawCodec) Unmarshal(data []byte, v interface{}) error { } func (c *rawCodec) String() string { - return fmt.Sprintf("proxy>%s", c.parentCodec.String()) + return fmt.Sprintf("proxy>%s", c.parentCodec.Name()) +} + +func (c *rawCodec) Name() string { + return "proto" } // protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC. @@ -65,6 +70,6 @@ func (protoCodec) Unmarshal(data []byte, v interface{}) error { return proto.Unmarshal(data, v.(proto.Message)) } -func (protoCodec) String() string { +func (protoCodec) Name() string { return "proto" } diff --git a/proxy/conn_pool.go b/proxy/conn_pool.go new file mode 100644 index 0000000..278ae64 --- /dev/null +++ b/proxy/conn_pool.go @@ -0,0 +1,222 @@ +package proxy + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +type Pool struct { + size int + ttl int64 + + // max streams on a *PoolConn + maxStreams int + // max idle conns + maxIdle int + + sync.Mutex + conns map[string]*streamsPool +} + +type streamsPool struct { + // head of list + head *PoolConn + // busy conns list + busy *PoolConn + // the siza of list + count int + // idle conn + idle int +} + +type PoolConn struct { + // grpc conn + *grpc.ClientConn + err error + addr string + + // pool and streams pool + pool *Pool + sp *streamsPool + streams int + created int64 + + // list + pre *PoolConn + next *PoolConn + in bool +} + +func NewPool(size int, ttl time.Duration, idle int, ms int) *Pool { + if ms <= 0 { + ms = 1 + } + if idle < 0 { + idle = 0 + } + return &Pool{ + size: size, + ttl: int64(ttl.Seconds()), + maxStreams: ms, + maxIdle: idle, + conns: make(map[string]*streamsPool), + } +} + +func (p *Pool) GetConn(addr string, opts ...grpc.DialOption) (*PoolConn, error) { + now := time.Now().Unix() + p.Lock() + sp, ok := p.conns[addr] + if !ok { + sp = &streamsPool{head: &PoolConn{}, busy: &PoolConn{}, count: 0, idle: 0} + p.conns[addr] = sp + } + // while we have conns check streams and then return one + // otherwise we'll create a new conn + conn := sp.head.next + for conn != nil { + // check conn state + // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md + switch conn.GetState() { + case connectivity.Connecting: + conn = conn.next + continue + case connectivity.Shutdown: + next := conn.next + if conn.streams == 0 { + removeConn(conn) + sp.idle-- + } + conn = next + continue + case connectivity.TransientFailure: + next := conn.next + if conn.streams == 0 { + removeConn(conn) + conn.ClientConn.Close() + sp.idle-- + } + conn = next + continue + case connectivity.Ready: + case connectivity.Idle: + } + // a old conn + if now-conn.created > p.ttl { + next := conn.next + if conn.streams == 0 { + removeConn(conn) + conn.ClientConn.Close() + sp.idle-- + } + conn = next + continue + } + // a busy conn + if conn.streams >= p.maxStreams { + next := conn.next + removeConn(conn) + addConnAfter(conn, sp.busy) + conn = next + continue + } + // a idle conn + if conn.streams == 0 { + sp.idle-- + } + // a good conn + conn.streams++ + p.Unlock() + return conn, nil + } + p.Unlock() + + // create new conn + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + cc, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + return nil, err + } + + conn = &PoolConn{cc, nil, addr, p, sp, 1, time.Now().Unix(), nil, nil, false} + + // add conn to streams pool + p.Lock() + if sp.count < p.size { + addConnAfter(conn, sp.head) + } + p.Unlock() + + return conn, nil +} + +func (p *Pool) Release(addr string, conn *PoolConn, err error) { + p.Lock() + p, sp, created := conn.pool, conn.sp, conn.created + // try to add conn + if !conn.in && sp.count < p.size { + addConnAfter(conn, sp.head) + } + if !conn.in { + p.Unlock() + conn.ClientConn.Close() + return + } + // a busy conn + if conn.streams >= p.maxStreams { + removeConn(conn) + addConnAfter(conn, sp.head) + } + conn.streams-- + // if streams == 0, we can do something + if conn.streams == 0 { + // 1. it has errored + // 2. too many idle conn or + // 3. conn is too old + now := time.Now().Unix() + if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { + removeConn(conn) + p.Unlock() + conn.ClientConn.Close() + return + } + sp.idle++ + } + p.Unlock() + return +} + +func (conn *PoolConn) Close() { + conn.pool.Release(conn.addr, conn, conn.err) +} + +func removeConn(conn *PoolConn) { + if conn.pre != nil { + conn.pre.next = conn.next + } + if conn.next != nil { + conn.next.pre = conn.pre + } + conn.pre = nil + conn.next = nil + conn.in = false + conn.sp.count-- + return +} + +func addConnAfter(conn *PoolConn, after *PoolConn) { + conn.next = after.next + conn.pre = after + if after.next != nil { + after.next.pre = conn + } + after.next = conn + conn.in = true + conn.sp.count++ + return +} diff --git a/proxy/director.go b/proxy/director.go index 371ca60..df8b398 100644 --- a/proxy/director.go +++ b/proxy/director.go @@ -5,7 +5,6 @@ package proxy import ( "golang.org/x/net/context" - "google.golang.org/grpc" ) // StreamDirector returns a gRPC ClientConn to be used to forward the call to. @@ -21,4 +20,4 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *PoolConn, error) diff --git a/proxy/examples_test.go b/proxy/examples_test.go index bef4ce3..e4f26d6 100644 --- a/proxy/examples_test.go +++ b/proxy/examples_test.go @@ -5,8 +5,12 @@ package proxy_test import ( "strings" + "time" "github.com/mwitkow/grpc-proxy/proxy" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/status" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -14,31 +18,35 @@ import ( ) var ( - director proxy.StreamDirector + exampleDirector proxy.StreamDirector ) func ExampleRegisterService() { + // init grpc conn pool + examplePool = proxy.NewPool(300, time.Duration(60000)*time.Millisecond, 500, 500) // A gRPC server with the proxying codec enabled. - server := grpc.NewServer(grpc.CustomCodec(proxy.Codec())) + encoding.RegisterCodec(proxy.Codec()) + server := grpc.NewServer() // Register a TestService with 4 of its methods explicitly. - proxy.RegisterService(server, director, + proxy.RegisterService(server, exampleDirector, "mwitkow.testproto.TestService", "PingEmpty", "Ping", "PingError", "PingList") } func ExampleTransparentHandler() { + examplePool = proxy.NewPool(300, time.Duration(60000)*time.Millisecond, 500, 500) + encoding.RegisterCodec(proxy.Codec()) grpc.NewServer( - grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) + grpc.UnknownServiceHandler(proxy.TransparentHandler(exampleDirector))) } // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + exampleDirector = func(ctx context.Context, fullMethodName string) (context.Context, *proxy.PoolConn, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. @@ -48,13 +56,15 @@ func ExampleStreamDirector() { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. - conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + backendConn, err := examplePool.GetConn("api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())), + grpc.WithInsecure()) + return outCtx, backendConn, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { - conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + backendConn, err := examplePool.GetConn("api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())), + grpc.WithInsecure()) + return outCtx, backendConn, err } } - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/proxy/handler.go b/proxy/handler.go index 752f892..a6f2394 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -6,6 +6,8 @@ package proxy import ( "io" + "google.golang.org/grpc/status" + "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -61,7 +63,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // little bit of gRPC internals never hurt anyone fullMethodName, ok := grpc.MethodFromServerStream(serverStream) if !ok { - return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } // We require that the director's returned context inherits from the serverStream.Context(). outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) @@ -69,9 +71,16 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return err } + defer func() { + // defer execution of release + if backendConn != nil { + backendConn.Close() + } + }() + clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn.ClientConn, fullMethodName) if err != nil { return err } @@ -94,7 +103,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and // exit with an error to the stack clientCancel() - return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) } case c2sErr := <-c2sErrChan: // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two @@ -108,7 +117,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return nil } } - return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { diff --git a/proxy/handler_test.go b/proxy/handler_test.go index c811408..a9a2c24 100644 --- a/proxy/handler_test.go +++ b/proxy/handler_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "google.golang.org/grpc/encoding" + "github.com/mwitkow/grpc-proxy/proxy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,6 +40,10 @@ const ( countListResponses = 20 ) +var ( + examplePool *proxy.Pool +) + // asserting service is implemented on the server side and serves as a handler for stuff type assertingService struct { logger *grpclog.Logger @@ -103,7 +109,7 @@ type ProxyHappySuite struct { server *grpc.Server proxyListener net.Listener proxy *grpc.Server - serverClientConn *grpc.ClientConn + serverClientConn *proxy.PoolConn client *grpc.ClientConn testClient pb.TestServiceClient @@ -201,10 +207,13 @@ func (s *ProxyHappySuite) SetupSuite() { s.server = grpc.NewServer() pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) + // init grpc conn pool + examplePool = proxy.NewPool(300, time.Duration(60000)*time.Millisecond, 500, 500) // Setup of the proxy's Director. - s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) + s.serverClientConn, err = examplePool.GetConn("api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.Codec())), + grpc.WithInsecure()) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string) (context.Context, *proxy.PoolConn, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { @@ -216,8 +225,8 @@ func (s *ProxyHappySuite) SetupSuite() { outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) return outCtx, s.serverClientConn, nil } + encoding.RegisterCodec(proxy.Codec()) s.proxy = grpc.NewServer( - grpc.CustomCodec(proxy.Codec()), grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler.