Skip to content

Commit

Permalink
feat(router): add configurable connection limits for each subgraph (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
endigma authored Feb 6, 2025
1 parent 710f869 commit e9a8a3b
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 57 deletions.
29 changes: 6 additions & 23 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ type (

// newGraphServer creates a new server instance.
func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterConfig, proxy ProxyFunc) (*graphServer, error) {

/* Older versions of composition will not populate a compatibility version.
* Currently, all "old" router execution configurations are compatible as there have been no breaking
* changes.
Expand All @@ -119,7 +118,7 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
cancelFunc: cancel,
Config: &r.Config,
engineStats: r.EngineStats,
executionTransport: newHTTPTransport(r.subgraphTransportOptions.TransportTimeoutOptions, proxy),
executionTransport: newHTTPTransport(r.subgraphTransportOptions.TransportRequestOptions, proxy),
executionTransportProxy: proxy,
playgroundHandler: r.playgroundHandler,
baseRouterConfigVersion: routerConfig.GetVersion(),
Expand Down Expand Up @@ -231,7 +230,6 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
* A group where we can selectively apply middlewares to the graphql endpoint
*/
httpRouter.Group(func(cr chi.Router) {

// We are applying it conditionally because compressing 3MB playground is still slow even with stdlib gzip
cr.Use(func(h http.Handler) http.Handler {
return wrapper(h)
Expand Down Expand Up @@ -266,7 +264,6 @@ func newGraphServer(ctx context.Context, r *Router, routerConfig *nodev1.RouterC
}

func (s *graphServer) buildMultiGraphHandler(ctx context.Context, baseMux *chi.Mux, featureFlagConfigs map[string]*nodev1.FeatureFlagRouterExecutionConfig) (http.HandlerFunc, error) {

if len(featureFlagConfigs) == 0 {
return baseMux.ServeHTTP, nil
}
Expand All @@ -288,7 +285,6 @@ func (s *graphServer) buildMultiGraphHandler(ctx context.Context, baseMux *chi.M
}

return func(w http.ResponseWriter, r *http.Request) {

// Extract the feature flag and run the corresponding mux
// 1. From the request header
// 2. From the cookie
Expand Down Expand Up @@ -318,7 +314,8 @@ func (s *graphServer) setupEngineStatistics() (err error) {
// We only include the base router config version in the attributes for the engine statistics.
// Same approach is used for the runtime metrics.
baseAttributes := append([]attribute.KeyValue{
otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion)}, s.baseOtelAttributes...)
otel.WgRouterConfigVersion.String(s.baseRouterConfigVersion),
}, s.baseOtelAttributes...)

s.otlpEngineMetrics, err = rmetric.NewEngineMetrics(
s.logger,
Expand All @@ -327,7 +324,6 @@ func (s *graphServer) setupEngineStatistics() (err error) {
s.engineStats,
&s.metricConfig.OpenTelemetry.EngineStats,
)

if err != nil {
return err
}
Expand All @@ -339,7 +335,6 @@ func (s *graphServer) setupEngineStatistics() (err error) {
s.engineStats,
&s.metricConfig.Prometheus.EngineStats,
)

if err != nil {
return err
}
Expand All @@ -364,7 +359,6 @@ type graphMux struct {
// buildOperationCaches creates the caches for the graph mux.
// The caches are created based on the engine configuration.
func (s *graphMux) buildOperationCaches(srv *graphServer) (computeSha256 bool, err error) {

// We create a new execution plan cache for each operation planner which is coupled to
// the specific engine configuration. This is necessary because otherwise we would return invalid plans.
//
Expand Down Expand Up @@ -485,7 +479,6 @@ func (s *graphMux) configureCacheMetrics(srv *graphServer, baseOtelAttributes []
srv.logger,
baseOtelAttributes,
srv.otlpMeterProvider)

if err != nil {
return fmt.Errorf("failed to create cache metrics for OTLP: %w", err)
}
Expand All @@ -498,7 +491,6 @@ func (s *graphMux) configureCacheMetrics(srv *graphServer, baseOtelAttributes []
srv.logger,
baseOtelAttributes,
srv.promMeterProvider)

if err != nil {
return fmt.Errorf("failed to create cache metrics for Prometheus: %w", err)
}
Expand Down Expand Up @@ -544,7 +536,6 @@ func (s *graphMux) configureCacheMetrics(srv *graphServer, baseOtelAttributes []
}

func (s *graphMux) Shutdown(ctx context.Context) error {

var err error

if s.planCache != nil {
Expand Down Expand Up @@ -601,8 +592,8 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
featureFlagName string,
routerConfigVersion string,
engineConfig *nodev1.EngineConfiguration,
configSubgraphs []*nodev1.Subgraph) (*graphMux, error) {

configSubgraphs []*nodev1.Subgraph,
) (*graphMux, error) {
gm := &graphMux{
metricStore: rmetric.NewNoopMetrics(),
}
Expand Down Expand Up @@ -745,7 +736,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context,

httpRouter.Use(func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

reqContext := getRequestContext(r.Context())

reqContext.telemetry.addCommonTraceAttribute(baseOtelAttributes...)
Expand Down Expand Up @@ -1036,7 +1026,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
Condition: s.securityConfiguration.BlockNonPersistedOperations.Condition,
},
})

if err != nil {
return nil, fmt.Errorf("failed to create operation blocker: %w", err)
}
Expand Down Expand Up @@ -1106,7 +1095,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
// Must be mounted after the websocket middleware to ensure that we only count non-hijacked requests like WebSockets
func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

requestContext := getRequestContext(r.Context())

// We don't want to count any type of subscriptions e.g. SSE as in-flight requests because they are long-lived
Expand Down Expand Up @@ -1141,7 +1129,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
}

func (s *graphServer) buildPubSubConfiguration(ctx context.Context, engineConfig *nodev1.EngineConfiguration, routerEngineCfg *RouterEngineConfiguration) error {

datasourceConfigurations := engineConfig.GetDatasourceConfigurations()
for _, datasourceConfiguration := range datasourceConfigurations {
if datasourceConfiguration.CustomEvents == nil {
Expand Down Expand Up @@ -1194,7 +1181,6 @@ func (s *graphServer) buildPubSubConfiguration(ctx context.Context, engineConfig
}

for _, eventSource := range routerEngineCfg.Events.Providers.Kafka {

if eventSource.ID == providerID {
options, err := buildKafkaOptions(eventSource)
if err != nil {
Expand Down Expand Up @@ -1248,7 +1234,6 @@ func (s *graphServer) wait(ctx context.Context) error {
// After all requests are done, it will shut down the metric store and runtime metrics.
// Shutdown does cancel the context after all non-hijacked requests such as WebSockets has been handled.
func (s *graphServer) Shutdown(ctx context.Context) error {

// Cancel the context after the graceful shutdown is done
// to clean up resources like websocket connections, pools, etc.
defer s.cancelFunc()
Expand Down Expand Up @@ -1342,9 +1327,7 @@ func configureSubgraphOverwrites(
overrideRoutingURLConfig config.OverrideRoutingURLConfiguration,
overrides config.OverridesConfiguration,
) ([]Subgraph, error) {
var (
err error
)
var err error
subgraphs := make([]Subgraph, 0, len(configSubgraphs))
for _, sg := range configSubgraphs {

Expand Down
43 changes: 28 additions & 15 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"crypto/x509"
"errors"
"fmt"
rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"

rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis"

"connectrpc.com/connect"
"github.com/mitchellh/mapstructure"
"github.com/nats-io/nuid"
Expand Down Expand Up @@ -85,19 +86,23 @@ type (
proxy ProxyFunc
}

TransportTimeoutOptions struct {
TransportRequestOptions struct {
RequestTimeout time.Duration
ResponseHeaderTimeout time.Duration
ExpectContinueTimeout time.Duration
KeepAliveIdleTimeout time.Duration
DialTimeout time.Duration
TLSHandshakeTimeout time.Duration
KeepAliveProbeInterval time.Duration

MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
}

SubgraphTransportOptions struct {
TransportTimeoutOptions
SubgraphMap map[string]*TransportTimeoutOptions
TransportRequestOptions
SubgraphMap map[string]*TransportRequestOptions
}

GraphQLMetricsConfig struct {
Expand Down Expand Up @@ -1659,26 +1664,30 @@ func DefaultFileUploadConfig() *config.FileUpload {
}
}

func NewTransportTimeoutOptions(cfg config.GlobalSubgraphRequestRule) TransportTimeoutOptions {
return TransportTimeoutOptions{
func NewTransportRequestOptions(cfg config.GlobalSubgraphRequestRule) TransportRequestOptions {
return TransportRequestOptions{
RequestTimeout: cfg.RequestTimeout,
ResponseHeaderTimeout: cfg.ResponseHeaderTimeout,
ExpectContinueTimeout: cfg.ExpectContinueTimeout,
KeepAliveIdleTimeout: cfg.KeepAliveIdleTimeout,
DialTimeout: cfg.DialTimeout,
TLSHandshakeTimeout: cfg.TLSHandshakeTimeout,
KeepAliveProbeInterval: cfg.KeepAliveProbeInterval,

MaxConnsPerHost: cfg.MaxConnsPerHost,
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
}
}

func NewSubgraphTransportOptions(cfg config.TrafficShapingRules) *SubgraphTransportOptions {
base := &SubgraphTransportOptions{
TransportTimeoutOptions: NewTransportTimeoutOptions(cfg.All),
SubgraphMap: map[string]*TransportTimeoutOptions{},
TransportRequestOptions: NewTransportRequestOptions(cfg.All),
SubgraphMap: map[string]*TransportRequestOptions{},
}

for k, v := range cfg.Subgraphs {
opts := NewTransportTimeoutOptions(*v)
opts := NewTransportRequestOptions(*v)
base.SubgraphMap[k] = &opts
}

Expand All @@ -1687,16 +1696,20 @@ func NewSubgraphTransportOptions(cfg config.TrafficShapingRules) *SubgraphTransp

func DefaultSubgraphTransportOptions() *SubgraphTransportOptions {
return &SubgraphTransportOptions{
TransportTimeoutOptions: TransportTimeoutOptions{
TransportRequestOptions: TransportRequestOptions{
RequestTimeout: 60 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 0 * time.Second,
ExpectContinueTimeout: 0 * time.Second,
KeepAliveProbeInterval: 30 * time.Second,
KeepAliveIdleTimeout: 0 * time.Second,
DialTimeout: 30 * time.Second,

MaxConnsPerHost: 100,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 20,
},
SubgraphMap: map[string]*TransportTimeoutOptions{},
SubgraphMap: map[string]*TransportRequestOptions{},
}
}

Expand Down Expand Up @@ -1826,7 +1839,7 @@ func WithCacheWarmupConfig(cfg *config.CacheWarmupConfiguration) Option {

type ProxyFunc func(req *http.Request) (*url.URL, error)

func newHTTPTransport(opts TransportTimeoutOptions, proxy ProxyFunc) *http.Transport {
func newHTTPTransport(opts TransportRequestOptions, proxy ProxyFunc) *http.Transport {
dialer := &net.Dialer{
Timeout: opts.DialTimeout,
KeepAlive: opts.KeepAliveProbeInterval,
Expand All @@ -1839,13 +1852,13 @@ func newHTTPTransport(opts TransportTimeoutOptions, proxy ProxyFunc) *http.Trans
},
// The defaults value 0 = unbounded.
// We set to some value to prevent resource exhaustion e.g max requests and ports.
MaxConnsPerHost: 100,
MaxConnsPerHost: opts.MaxConnsPerHost,
// The defaults value 0 = unbounded. 100 is used by the default go transport.
// This value should be significant higher than MaxIdleConnsPerHost.
MaxIdleConns: 1024,
MaxIdleConns: opts.MaxIdleConns,
// The default value is 2. Such a low limit will open and close connections too often.
// Details: https://gitlab.com/gitlab-org/gitlab-pages/-/merge_requests/274
MaxIdleConnsPerHost: 20,
MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost,
ForceAttemptHTTP2: true,
IdleConnTimeout: opts.KeepAliveIdleTimeout,
// Set more timeouts https://gitlab.com/gitlab-org/gitlab-pages/-/issues/495
Expand Down
6 changes: 3 additions & 3 deletions router/core/subgraph_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestTimeoutTransport(t *testing.T) {
t.Parallel()

transportOpts := &SubgraphTransportOptions{
SubgraphMap: map[string]*TransportTimeoutOptions{
SubgraphMap: map[string]*TransportRequestOptions{
testSubgraphKey: {
ResponseHeaderTimeout: 100 * time.Millisecond,
},
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestTimeoutTransport(t *testing.T) {
t.Parallel()

transportOpts := &SubgraphTransportOptions{
SubgraphMap: map[string]*TransportTimeoutOptions{
SubgraphMap: map[string]*TransportRequestOptions{
testSubgraphKey: {
TLSHandshakeTimeout: 2 * time.Millisecond,
},
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestTimeoutTransport(t *testing.T) {
t.Parallel()

transportOpts := &SubgraphTransportOptions{
SubgraphMap: map[string]*TransportTimeoutOptions{
SubgraphMap: map[string]*TransportRequestOptions{
testSubgraphKey: {
DialTimeout: 1 * time.Millisecond,
},
Expand Down
5 changes: 5 additions & 0 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ type GlobalSubgraphRequestRule struct {
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout,omitempty" envDefault:"10s"`
KeepAliveIdleTimeout time.Duration `yaml:"keep_alive_idle_timeout,omitempty" envDefault:"0s"`
KeepAliveProbeInterval time.Duration `yaml:"keep_alive_probe_interval,omitempty" envDefault:"30s"`

// Connection configuration
MaxConnsPerHost int `yaml:"max_conns_per_host,omitempty" envDefault:"100"`
MaxIdleConns int `yaml:"max_idle_conns,omitempty" envDefault:"1024"`
MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host,omitempty" envDefault:"20"`
}

type SubgraphTrafficRequestRule struct {
Expand Down
Loading

0 comments on commit e9a8a3b

Please sign in to comment.