Skip to content

Commit

Permalink
feature: apply filters on response that want to be processed (#2239)
Browse files Browse the repository at this point in the history
* feature: apply filters on response that want to be processed
fix: admissionControl 499 cases should be counted

Signed-off-by: Sandor Szücs <[email protected]>

* add test that 499 is counted by admission control and make routing close filters on quit

Signed-off-by: Sandor Szücs <[email protected]>

* test: add loopback with admissioncontrol test to cover edge cases
feature: close filters if routing.Routing is closed, right now it is only relevant for tests

Signed-off-by: Sandor Szücs <[email protected]>

* fix: staticcheck found unused code in test

Signed-off-by: Sandor Szücs <[email protected]>

* add PostProcessor and PreProcessor as intended

Signed-off-by: Sandor Szücs <[email protected]>

* fix: cleanup all filters that are FilterCloser on datasource receiver shutdown

Signed-off-by: Sandor Szücs <[email protected]>

* close schedulers to cleanup resources

Signed-off-by: Sandor Szücs <[email protected]>

* move sync.Mutex in auth filters

Signed-off-by: Sandor Szücs <[email protected]>

* fix condition when not to close the filter

Signed-off-by: Sandor Szücs <[email protected]>

* close routeTable on close Routing to cleanup filters that wants to cleanup resources

Signed-off-by: Sandor Szücs <[email protected]>

* fix: store actually the routes for later cleanup
refactor: public -> private

Signed-off-by: Sandor Szücs <[email protected]>

* remove mutexes that are not needed

Signed-off-by: Sandor Szücs <[email protected]>

* add docs

Signed-off-by: Sandor Szücs <[email protected]>

* move closing as suggested

Signed-off-by: Sandor Szücs <[email protected]>

* use io.Closer compliant interface

Signed-off-by: Sandor Szücs <[email protected]>

* doc: fix comment

Signed-off-by: Sandor Szücs <[email protected]>

* refactor: use io.Closer as interface composite
fix: dev tutorial for FilterCloser filters

Signed-off-by: Sandor Szücs <[email protected]>

---------

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Mar 22, 2023
1 parent 693a042 commit 254fac9
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 8 deletions.
80 changes: 80 additions & 0 deletions docs/tutorials/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,86 @@ func (f *myFilter) Response(ctx filters.FilterContext) {

Find a detailed example at [how to develop a filter](../reference/development.md#how-to-develop-a-filter).

### Filters with cleanup

Sometimes your filter needs to cleanup resources on shutdown. In Go
functions that do this have often the name `Close()`.
There is the `filters.FilterCloser` interface that if you comply with
it, the routing.Route will make sure your filters are closed in case
of `routing.Routing` was closed.

```go
type myFilter struct{}

func NewMyFilter() filters.Spec {
return &myFilter{}
}

func (spec *myFilter) Name() string { return "myFilter" }

func (spec *myFilter) CreateFilter(config []interface{}) (filters.Filter, error) {
return NewMyFilter(), nil
}

func (f *myFilter) Request(ctx filters.FilterContext) {
// change data in ctx.Request() for example
}

func (f *myFilter) Response(ctx filters.FilterContext) {
// change data in ctx.Response() for example
}

func (f *myFilter) Close() error {
// cleanup your filter
}
```

### Filters with error handling

Sometimes you want to have a filter that wants to get called
`Response()` even if the proxy will not send a response from the
backend, for example you want to count error status codes, like
the [admissionControl](../reference/filters.md#admissioncontrol)
filter.
In this case you need to comply with the following proxy interface:

```go
// errorHandlerFilter is an opt-in for filters to get called
// Response(ctx) in case of errors.
type errorHandlerFilter interface {
// HandleErrorResponse returns true in case a filter wants to get called
HandleErrorResponse() bool
}
```

Example:
```go
type myFilter struct{}

func NewMyFilter() filters.Spec {
return &myFilter{}
}

func (spec *myFilter) Name() string { return "myFilter" }

func (spec *myFilter) CreateFilter(config []interface{}) (filters.Filter, error) {
return NewMyFilter(), nil
}

func (f *myFilter) Request(ctx filters.FilterContext) {
// change data in ctx.Request() for example
}

func (f *myFilter) Response(ctx filters.FilterContext) {
// change data in ctx.Response() for example
}

func (f *myFilter) HandleErrorResponse() bool() {
return true
}
```


## Predicates

Predicates allow to match a condition, that can be based on arbitrary
Expand Down
10 changes: 10 additions & 0 deletions filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filters

import (
"errors"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -151,6 +152,15 @@ type Filter interface {
Response(FilterContext)
}

// FilterCloser are Filters that need to cleanup resources after
// filter termination. For example Filters, that create a goroutine
// for some reason need to cleanup their goroutine or they would leak
// goroutines.
type FilterCloser interface {
Filter
io.Closer
}

// Spec objects are specifications for filters. When initializing the routes,
// the Filter instances are created using the Spec objects found in the
// registry.
Expand Down
12 changes: 12 additions & 0 deletions filters/scheduler/lifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ func (l *lifoFilter) GetQueue() *scheduler.Queue {
return l.queue
}

// Close will cleanup underlying queues
func (l *lifoFilter) Close() error {
l.queue.Close()
return nil
}

// Request is the filter.Filter interface implementation. Request will
// increase the number of inflight requests and respond to the caller,
// if the bounded queue returns an error. Status code by Error:
Expand Down Expand Up @@ -273,6 +279,12 @@ func (l *lifoGroupFilter) GetQueue() *scheduler.Queue {
return l.queue
}

// Close will cleanup underlying queues
func (l *lifoGroupFilter) Close() error {
l.queue.Close()
return nil
}

// Request is the filter.Filter interface implementation. Request will
// increase the number of inflight requests and respond to the caller,
// if the bounded queue returns an error. Status code by Error:
Expand Down
11 changes: 8 additions & 3 deletions filters/shedder/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,12 @@ func (spec *AdmissionControlSpec) CreateFilter(args []interface{}) (filters.Filt
}

// Close stops the background goroutine. The filter keeps working on stale data.
func (ac *admissionControl) Close() {
func (ac *admissionControl) Close() error {
ac.once.Do(func() {
ac.closed = true
close(ac.quit)
})
return nil
}

func (ac *admissionControl) tickWindows(d time.Duration) {
Expand Down Expand Up @@ -433,8 +434,7 @@ func (ac *admissionControl) Response(ctx filters.FilterContext) {
return
}

code := ctx.Response().StatusCode
if code < 499 {
if ctx.Response().StatusCode < 499 {
ac.successCounter.Add(1)
}
ac.counter.Add(1)
Expand All @@ -450,3 +450,8 @@ func (ac *admissionControl) startSpan(ctx context.Context) (span opentracing.Spa
}
return
}

// HandleErrorResponse is to opt-in for filters to get called
// Response(ctx) in case of errors via proxy. It has to return true to
// opt-in.
func (ac *admissionControl) HandleErrorResponse() bool { return true }
Loading

0 comments on commit 254fac9

Please sign in to comment.