Skip to content

Commit

Permalink
Log traceId in proxy logger (#2347)
Browse files Browse the repository at this point in the history
Updates #2301

Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Jun 7, 2023
1 parent 87a23b9 commit a63676c
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 30 deletions.
5 changes: 4 additions & 1 deletion proxy/backendtimeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func TestFastService(t *testing.T) {
}

func TestBackendTimeoutInTheMiddleOfServiceResponse(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Write([]byte("Wish You"))
Expand Down Expand Up @@ -113,7 +116,7 @@ func TestBackendTimeoutInTheMiddleOfServiceResponse(t *testing.T) {
}

const msg = "error while copying the response stream: context deadline exceeded"
if err = tp.log.WaitFor(msg, 100*time.Millisecond); err != nil {
if err = testLog.WaitFor(msg, 100*time.Millisecond); err != nil {
t.Errorf("expected '%s' in logs", msg)
}
}
Expand Down
19 changes: 14 additions & 5 deletions proxy/clienttimeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
)

func TestClientTimeout(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

d := 200 * time.Millisecond

payload := []byte("backend reply")
Expand Down Expand Up @@ -49,16 +52,19 @@ func TestClientTimeout(t *testing.T) {
}

const msgErrClientTimeout = "context canceled"
if err = tp.log.WaitFor(msgErrClientTimeout, 3*d); err != nil {
if err = testLog.WaitFor(msgErrClientTimeout, 3*d); err != nil {
t.Errorf("log should contain '%s'", msgErrClientTimeout)
}
const msgErrClientCanceledAfter = "client canceled after"
if err = tp.log.WaitFor(msgErrClientCanceledAfter, 3*d); err != nil {
if err = testLog.WaitFor(msgErrClientCanceledAfter, 3*d); err != nil {
t.Errorf("log should contain '%s'", msgErrClientCanceledAfter)
}
}

func TestClientCancellation(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

service := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
}))
Expand All @@ -84,14 +90,14 @@ func TestClientCancellation(t *testing.T) {
}

const msg = "POST / HTTP/1.1"
if err = tp.log.WaitForN(msg, N, 500*time.Millisecond); err != nil {
if err = testLog.WaitForN(msg, N, 500*time.Millisecond); err != nil {
t.Fatalf("expected %d requests, got %d", N, tp.log.Count(msg))
}

// Look for N messages like
// 2020/12/05 17:24:00 client canceled after 1.090638ms, route with backend network http://127.0.0.1:39687, status code 499: dialing failed false: context canceled, remote host: 127.0.0.1, request: "POST / HTTP/1.1", user agent: ""
for _, m := range []string{"client canceled after", "status code 499", "context canceled"} {
count := tp.log.Count(m)
count := testLog.Count(m)
if count != N {
t.Errorf("expected '%s' %d times, got %d", m, N, tp.log.Count(m))
}
Expand All @@ -111,6 +117,9 @@ func postTruncated(addr string) (err error) {
}

func TestClientTimeoutBeforeStreaming(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

backend := startTestServer([]byte("backend reply"), 0, func(*http.Request) {})
defer backend.Close()

Expand All @@ -137,7 +146,7 @@ func TestClientTimeoutBeforeStreaming(t *testing.T) {
}

const msg = "Client request: context canceled"
if err = tp.log.WaitFor(msg, 200*time.Millisecond); err != nil {
if err = testLog.WaitFor(msg, 200*time.Millisecond); err != nil {
t.Errorf("log should contain '%s'", msg)
}
}
8 changes: 4 additions & 4 deletions proxy/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (c *context) Split() (filters.FilterContext, error) {
u.Host = originalRequest.Host
cr, body, err := cloneRequestForSplit(u, originalRequest)
if err != nil {
c.proxy.log.Errorf("context: failed to clone request: %v", err)
c.Logger().Errorf("context: failed to clone request: %v", err)
return nil, err
}
serverSpan := opentracing.SpanFromContext(originalRequest.Context())
Expand All @@ -299,11 +299,11 @@ func (c *context) Loopback() {
err := c.proxy.do(c)
if c.response != nil && c.response.Body != nil {
if _, err := io.Copy(io.Discard, c.response.Body); err != nil {
c.proxy.log.Errorf("context: error while discarding remainder response body: %v.", err)
c.Logger().Errorf("context: error while discarding remainder response body: %v.", err)
}
err := c.response.Body.Close()
if err != nil {
c.proxy.log.Errorf("context: error during closing the response body: %v", err)
c.Logger().Errorf("context: error during closing the response body: %v", err)
}
}
if c.proxySpan != nil {
Expand All @@ -317,7 +317,7 @@ func (c *context) Loopback() {
}

if err != nil {
c.proxy.log.Errorf("context: failed to execute loopback request: %v", err)
c.Logger().Errorf("context: failed to execute loopback request: %v", err)
}
}

Expand Down
30 changes: 15 additions & 15 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) error {

upgradeProxy.serveHTTP(ctx.responseWriter, req)
ctx.successfulUpgrade = true
p.log.Debugf("finished upgraded protocol %s session", getUpgradeRequest(ctx.request))
ctx.Logger().Debugf("finished upgraded protocol %s session", getUpgradeRequest(ctx.request))
return nil
}

Expand Down Expand Up @@ -1011,7 +1011,7 @@ func (p *Proxy) do(ctx *context) (err error) {
defer func() {
if r := recover(); r != nil {
p.onPanicSometimes.Do(func() {
p.log.Errorf("stacktrace of panic caused by: %v:\n%s", r, stack())
ctx.Logger().Errorf("stacktrace of panic caused by: %v:\n%s", r, stack())
})

perr := &proxyError{
Expand Down Expand Up @@ -1070,7 +1070,7 @@ func (p *Proxy) do(ctx *context) (err error) {
p.metrics.IncRoutingFailures()
}

p.log.Debugf("could not find a route for %v", ctx.request.URL)
ctx.Logger().Debugf("could not find a route for %v", ctx.request.URL)
p.makeErrorResponse(ctx, errRouteLookupFailed)
return errRouteLookupFailed
}
Expand All @@ -1080,13 +1080,13 @@ func (p *Proxy) do(ctx *context) (err error) {
processedFilters := p.applyFiltersToRequest(ctx.route.Filters, ctx)

if ctx.deprecatedShunted() {
p.log.Debugf("deprecated shunting detected in route: %s", ctx.route.Id)
ctx.Logger().Debugf("deprecated shunting detected in route: %s", ctx.route.Id)
return &proxyError{handled: true}
} else if ctx.shunted() || ctx.route.Shunt || ctx.route.BackendType == eskip.ShuntBackend {
// consume the body to prevent goroutine leaks
if ctx.request.Body != nil {
if _, err := io.Copy(io.Discard, ctx.request.Body); err != nil {
p.log.Errorf("error while discarding remainder request body: %v.", err)
ctx.Logger().Errorf("error while discarding remainder request body: %v.", err)
}
}
ctx.ensureDefaultResponse()
Expand Down Expand Up @@ -1133,7 +1133,7 @@ func (p *Proxy) do(ctx *context) (err error) {
if d, ok := ctx.StateBag()[filters.ReadTimeout].(time.Duration); ok {
e := ctx.ResponseController().SetReadDeadline(backendStart.Add(d))
if e != nil {
p.log.Errorf("Failed to set read deadline: %v", e)
ctx.Logger().Errorf("Failed to set read deadline: %v", e)
}
}
rsp, perr := p.makeBackendRequest(ctx, backendContext)
Expand All @@ -1156,7 +1156,7 @@ func (p *Proxy) do(ctx *context) (err error) {
var perr2 *proxyError
rsp, perr2 = p.makeBackendRequest(ctx, backendContext)
if perr2 != nil {
p.log.Errorf("Failed to retry backend request: %v", perr2)
ctx.Logger().Errorf("Failed to retry backend request: %v", perr2)
if perr2.code >= http.StatusInternalServerError {
p.metrics.MeasureBackend5xx(backendStart)
}
Expand Down Expand Up @@ -1215,7 +1215,7 @@ func (p *Proxy) serveResponse(ctx *context) {
if err := ctx.Request().Context().Err(); err != nil {
// deadline exceeded or canceled in stdlib, client closed request
// see https://github.com/zalando/skipper/pull/864
p.log.Debugf("Client request: %v", err)
ctx.Logger().Debugf("Client request: %v", err)
ctx.response.StatusCode = 499
p.tracing.setTag(ctx.proxySpan, ClientRequestStateTag, ClientRequestCanceled)
}
Expand All @@ -1230,7 +1230,7 @@ func (p *Proxy) serveResponse(ctx *context) {
p.tracing.logStreamEvent(ctx.proxySpan, StreamBodyEvent, strconv.FormatInt(n, 10))
if err != nil {
p.metrics.IncErrorsStreaming(ctx.route.Id)
p.log.Debugf("error while copying the response stream: %v", err)
ctx.Logger().Debugf("error while copying the response stream: %v", err)
p.tracing.setTag(ctx.proxySpan, ErrorTag, true)
p.tracing.setTag(ctx.proxySpan, StreamBodyEvent, StreamBodyError)
p.tracing.logStreamEvent(ctx.proxySpan, StreamBodyEvent, fmt.Sprintf("Failed to stream response: %v", err))
Expand Down Expand Up @@ -1284,12 +1284,12 @@ func (p *Proxy) errorResponse(ctx *context, err error) {
}

msgPrefix := "error while proxying"
logFunc := p.log.Errorf
logFunc := ctx.Logger().Errorf
if ctx.response.StatusCode == 499 {
msgPrefix = "client canceled"
logFunc = p.log.Infof
logFunc = ctx.Logger().Infof
if p.accessLogDisabled {
logFunc = p.log.Debugf
logFunc = ctx.Logger().Debugf
}
}
if id != unknownRouteID {
Expand Down Expand Up @@ -1453,7 +1453,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ctx.response != nil && ctx.response.Body != nil {
err := ctx.response.Body.Close()
if err != nil {
p.log.Errorf("error during closing the response body: %v", err)
ctx.Logger().Errorf("error during closing the response body: %v", err)
}
}
}()
Expand All @@ -1463,7 +1463,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if d, ok := ctx.StateBag()[filters.WriteTimeout].(time.Duration); ok {
e := ctx.ResponseController().SetWriteDeadline(time.Now().Add(d))
if e != nil {
p.log.Errorf("Failed to set write deadline: %v", e)
ctx.Logger().Errorf("Failed to set write deadline: %v", e)
}
}

Expand Down Expand Up @@ -1597,7 +1597,7 @@ func (p *Proxy) applyFiltersOnError(ctx *context, filters []*routing.RouteFilter
if ehf, ok := fi.Filter.(errorHandlerFilter); !ok || !ehf.HandleErrorResponse() {
continue
}
p.log.Debugf("filter %s handles error", fi.Name)
ctx.Logger().Debugf("filter %s handles error", fi.Name)

start := time.Now()
filterTracing.logStart(fi.Name)
Expand Down
83 changes: 78 additions & 5 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

log "github.com/sirupsen/logrus"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,6 +75,68 @@ type listener struct {
lastConn chan net.Conn
}

type testLog struct {
m sync.Mutex

buf bytes.Buffer
oldOut io.Writer
oldLevel log.Level
}

func NewTestLog() *testLog {
oldOut := log.StandardLogger().Out
oldLevel := log.GetLevel()
log.SetLevel(log.DebugLevel)

tl := &testLog{oldOut: oldOut, oldLevel: oldLevel}
log.SetOutput(tl)
return tl
}

func (l *testLog) Write(p []byte) (int, error) {
l.m.Lock()
defer l.m.Unlock()

return l.buf.Write(p)
}

func (l *testLog) String() string {
l.m.Lock()
defer l.m.Unlock()

return l.buf.String()
}

func (l *testLog) Close() {
log.SetOutput(l.oldOut)
log.SetLevel(l.oldLevel)
}

func (l *testLog) WaitForN(exp string, n int, to time.Duration) error {
timeout := time.After(to)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-timeout:
return fmt.Errorf("timeout waiting for log entry: %s", exp)
case <-ticker.C:
if l.Count(exp) >= n {
return nil
}
}
}
}

func (l *testLog) WaitFor(exp string, to time.Duration) error {
return l.WaitForN(exp, 1, to)
}

func (l *testLog) Count(exp string) int {
return strings.Count(l.String(), exp)
}

func (cors *preserveOriginalSpec) Name() string { return "preserveOriginal" }

func (cors *preserveOriginalSpec) CreateFilter(_ []interface{}) (filters.Filter, error) {
Expand Down Expand Up @@ -942,6 +1006,9 @@ func (*nilFilterSpec) Name() string
func (*nilFilterSpec) CreateFilter(config []interface{}) (filters.Filter, error) { return nil, nil }

func TestFilterPanic(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

var backendRequests int32
s := startTestServer([]byte("Hello World!"), 0, func(r *http.Request) {
atomic.AddInt32(&backendRequests, 1)
Expand Down Expand Up @@ -976,12 +1043,15 @@ func TestFilterPanic(t *testing.T) {
assert.NotContains(t, w.Header(), "X-Expected", "panic expected to skip all of the response filters")

const msg = "panic caused by: runtime error: invalid memory address or nil pointer dereference"
if err = tp.log.WaitFor(msg, 100*time.Millisecond); err != nil {
if err = testLog.WaitFor(msg, 100*time.Millisecond); err != nil {
t.Errorf("expected '%s' in logs", msg)
}
}

func TestFilterPanicPrintStackRate(t *testing.T) {
testLog := NewTestLog()
defer testLog.Close()

fr := make(filters.Registry)
fr.Register(new(nilFilterSpec))

Expand All @@ -1001,12 +1071,15 @@ func TestFilterPanicPrintStackRate(t *testing.T) {
tp.proxy.ServeHTTP(w, r)
}

const msg = "error while proxying"
if err = tp.log.WaitForN(msg, panicsCaused, 100*time.Millisecond); err != nil {
t.Errorf("expected '%s' in logs", msg)
const errorMsg = "error while proxying"
if err = testLog.WaitForN(errorMsg, 10, 100*time.Millisecond); err != nil {
t.Errorf(`expected "%s" to be logged exactly %d times`, errorMsg, panicsCaused)
}

assert.Equal(t, stacksPrinted, tp.log.Count("github.com/zalando/skipper/proxy.TestFilterPanicPrintStackRate"))
const stackMsg = "github.com/zalando/skipper/proxy.TestFilterPanicPrintStackRate"
if err = testLog.WaitForN(stackMsg, 3, 100*time.Millisecond); err != nil {
t.Errorf(`expected "%s" to be logged exactly %d times`, stackMsg, stacksPrinted)
}
}

func TestProcessesRequestWithShuntBackend(t *testing.T) {
Expand Down

0 comments on commit a63676c

Please sign in to comment.