Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Interceptors #3641

Merged
merged 28 commits into from
Feb 9, 2025
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9f853b8
Add read/write streaming payload/result methods to interceptor DSL
douglaswth Jan 24, 2025
e547e64
Weave streaming interceptor stuff through codegen
douglaswth Jan 24, 2025
d6d779d
Do not try to use a field from an interface for the raw payload when …
douglaswth Jan 25, 2025
fa0bdde
Fix golden files
douglaswth Jan 25, 2025
946b677
Collect attributes from a method that actually has the interceptor ap…
douglaswth Jan 28, 2025
28789b0
Fix StreamingResult accessor signature and return
douglaswth Jan 28, 2025
905c5d8
Add SendWithContext and RecvWithContext methods to the stream interfa…
douglaswth Jan 29, 2025
5905958
Progress on wrapping streams with interceptors
douglaswth Jan 30, 2025
6e8e746
Finish generating the stream wrapping for interceptors; remove the En…
douglaswth Jan 31, 2025
3826f70
Progress on adding comments and fixing tests; fix a bug where refs we…
douglaswth Jan 31, 2025
0261a0c
Finish fixing tests
douglaswth Feb 1, 2025
9c35929
Add more tests for streaming interceptor service codegen
douglaswth Feb 1, 2025
cc0e16a
Fix bug where the CLI ParseEndpoint method would try to wrap every me…
douglaswth Feb 1, 2025
79eafc5
Merge branch 'v3' into streaming-interceptors
douglaswth Feb 3, 2025
81835c3
Update InterceptorInfo comment and replace Send and Recv boolean fiel…
douglaswth Feb 3, 2025
ebd3114
Merge branch 'v3' into streaming-interceptors
douglaswth Feb 3, 2025
1e1c7ab
Update Golden files to make the Interceptor tests happy
douglaswth Feb 3, 2025
712ab19
Add a ReturnContext field to goa.InterceptorInfo to allow interceptor…
douglaswth Feb 4, 2025
3161e0b
Scrap ReturnContext field in favor of changing the interceptor interf…
douglaswth Feb 4, 2025
c236fce
Merge branch 'v3' into streaming-interceptors
douglaswth Feb 4, 2025
6a734fe
Update Golden files to make the Interceptor tests happy again
douglaswth Feb 4, 2025
c36dfb6
Change goa.InterceptorInfo to an interface that generated interceptor…
douglaswth Feb 5, 2025
8035d69
Separate StreamingPayload and StreamingResult methods to Client and S…
douglaswth Feb 5, 2025
2ba73ae
Use goa.Endpoint for next again and do not return contexts from SendW…
douglaswth Feb 6, 2025
a1f2a91
Merge branch 'v3' into streaming-interceptors
douglaswth Feb 6, 2025
edbbca8
Address lint issues
raphael Feb 8, 2025
e4eb8a1
Merge branch 'v3' into streaming-interceptors
raphael Feb 8, 2025
77667dc
Fix lint issue
raphael Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more tests for streaming interceptor service codegen
douglaswth committed Feb 1, 2025
commit 9c35929b9529d2d0c24de994c139b80769be8057
4 changes: 3 additions & 1 deletion codegen/service/interceptors_test.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,9 @@ func TestInterceptors(t *testing.T) {
{"interceptor-with-read-result", testdata.InterceptorWithReadResultDSL, 3},
{"interceptor-with-write-result", testdata.InterceptorWithWriteResultDSL, 3},
{"interceptor-with-read-write-result", testdata.InterceptorWithReadWriteResultDSL, 3},
{"streaming-interceptors", testdata.StreamingInterceptorsDSL, 2},
{"streaming-interceptors", testdata.StreamingInterceptorsDSL, 3},
{"streaming-interceptors-with-read-payload-and-read-streaming-payload", testdata.StreamingInterceptorsWithReadPayloadAndReadStreamingPayloadDSL, 3},
{"streaming-interceptors-with-read-streaming-result", testdata.StreamingInterceptorsWithReadStreamingResultDSL, 3},
{"streaming-interceptors-with-read-payload", testdata.StreamingInterceptorsWithReadPayloadDSL, 2},
{"streaming-interceptors-with-read-result", testdata.StreamingInterceptorsWithReadResultDSL, 2},
}
22 changes: 14 additions & 8 deletions codegen/service/templates/client_interceptor_wrappers.go.tpl
Original file line number Diff line number Diff line change
@@ -26,7 +26,8 @@ func wrapClient{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i
sendWithContext: func(ctx context.Context, req {{ .ClientStream.SendTypeRef }}) (context.Context, error) {
info := &{{ $interceptor.Name }}Info{
Service: "{{ $.Service }}",
Method: "{{ .MethodName }}.{{ .ClientStream.SendWithContextName }}",
Method: "{{ .MethodName }}",
Send: true,
RawPayload: req,
}
var rCtx context.Context
@@ -42,7 +43,8 @@ func wrapClient{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i
recvWithContext: func(ctx context.Context) ({{ .ClientStream.RecvTypeRef }}, context.Context, error) {
info := &{{ $interceptor.Name }}Info{
Service: "{{ $.Service }}",
Method: "{{ .MethodName }}.{{ .ClientStream.RecvWithContextName }}",
Method: "{{ .MethodName }}",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
@@ -76,11 +78,15 @@ func wrapClient{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i
{{ comment (printf "wrapped%s is a client interceptor wrapper for the %s stream." .Interface .Interface) }}
type wrapped{{ .Interface }} struct {
ctx context.Context
{{- if ne .SendTypeRef "" }}
sendWithContext func(context.Context, {{ .SendTypeRef }}) (context.Context, error)
{{- end }}
{{- if ne .RecvTypeRef "" }}
recvWithContext func(context.Context) ({{ .RecvTypeRef }}, context.Context, error)
{{- end }}
stream {{ .Interface }}
}
{{- if ne .SendWithContextName "" }}
{{- if ne .SendTypeRef "" }}

{{ comment (printf "%s streams instances of \"%s\" after executing the applied interceptor." .SendName .Interface) }}
func (w *wrapped{{ .Interface }}) {{ .SendName }}(v {{ .SendTypeRef }}) error {
@@ -95,8 +101,8 @@ func (w *wrapped{{ .Interface }}) {{ .SendWithContextName }}(ctx context.Context
}
return w.sendWithContext(ctx, v)
}
{{- end }}
{{- if ne .RecvWithContextName "" }}
{{- end }}
{{- if ne .RecvTypeRef "" }}

{{ comment (printf "%s reads instances of \"%s\" from the stream after executing the applied interceptor." .RecvName .Interface) }}
func (w *wrapped{{ .Interface }}) {{ .RecvName }}() ({{ .RecvTypeRef }}, error) {
@@ -111,12 +117,12 @@ func (w *wrapped{{ .Interface }}) {{ .RecvWithContextName }}(ctx context.Context
}
return w.recvWithContext(ctx)
}
{{- end }}
{{- if .MustClose }}
{{- end }}
{{- if .MustClose }}

// Close closes the stream.
func (w *wrapped{{ .Interface }}) Close() error {
return w.stream.Close()
}
{{- end }}
{{- end }}
{{- end }}
22 changes: 14 additions & 8 deletions codegen/service/templates/server_interceptor_wrappers.go.tpl
Original file line number Diff line number Diff line change
@@ -26,7 +26,8 @@ func wrap{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i Serve
sendWithContext: func(ctx context.Context, req {{ .ServerStream.SendTypeRef }}) (context.Context, error) {
info := &{{ $interceptor.Name }}Info{
Service: "{{ $.Service }}",
Method: "{{ .MethodName }}.{{ .ServerStream.SendWithContextName }}",
Method: "{{ .MethodName }}",
Send: true,
RawPayload: req,
}
var rCtx context.Context
@@ -42,7 +43,8 @@ func wrap{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i Serve
recvWithContext: func(ctx context.Context) ({{ .ServerStream.RecvTypeRef }}, context.Context, error) {
info := &{{ $interceptor.Name }}Info{
Service: "{{ $.Service }}",
Method: "{{ .MethodName }}.{{ .ServerStream.RecvWithContextName }}",
Method: "{{ .MethodName }}",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
@@ -76,11 +78,15 @@ func wrap{{ .MethodName }}{{ $interceptor.Name }}(endpoint goa.Endpoint, i Serve
{{ comment (printf "wrapped%s is a server interceptor wrapper for the %s stream." .Interface .Interface) }}
type wrapped{{ .Interface }} struct {
ctx context.Context
{{- if ne .SendTypeRef "" }}
sendWithContext func(context.Context, {{ .SendTypeRef }}) (context.Context, error)
{{- end }}
{{- if ne .RecvTypeRef "" }}
recvWithContext func(context.Context) ({{ .RecvTypeRef }}, context.Context, error)
{{- end }}
stream {{ .Interface }}
}
{{- if ne .SendWithContextName "" }}
{{- if ne .SendTypeRef "" }}

{{ comment (printf "%s streams instances of \"%s\" after executing the applied interceptor." .SendName .Interface) }}
func (w *wrapped{{ .Interface }}) {{ .SendName }}(v {{ .SendTypeRef }}) error {
@@ -95,8 +101,8 @@ func (w *wrapped{{ .Interface }}) {{ .SendWithContextName }}(ctx context.Context
}
return w.sendWithContext(ctx, v)
}
{{- end }}
{{- if ne .RecvWithContextName "" }}
{{- end }}
{{- if ne .RecvTypeRef "" }}

{{ comment (printf "%s reads instances of \"%s\" from the stream after executing the applied interceptor." .RecvName .Interface) }}
func (w *wrapped{{ .Interface }}) {{ .RecvName }}() ({{ .RecvTypeRef }}, error) {
@@ -111,12 +117,12 @@ func (w *wrapped{{ .Interface }}) {{ .RecvWithContextName }}(ctx context.Context
}
return w.recvWithContext(ctx)
}
{{- end }}
{{- if .MustClose }}
{{- end }}
{{- if .MustClose }}

// Close closes the stream.
func (w *wrapped{{ .Interface }}) Close() error {
return w.stream.Close()
}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// ClientInterceptors defines the interface for all client-side interceptors.
// Client interceptors execute after the payload is encoded and before the request
// is sent to the server. The implementation is responsible for calling next to
// complete the request.
type ClientInterceptors interface {
Logging(ctx context.Context, info *LoggingInfo, next goa.Endpoint) (any, error)
}

// WrapMethodClientEndpoint wraps the Method endpoint with the client
// interceptors defined in the design.
func WrapMethodClientEndpoint(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
endpoint = wrapClientMethodlogging(endpoint, i)
return endpoint
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@


// wrapLoggingMethod applies the logging server interceptor to endpoints.
func wrapMethodLogging(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint {
return func(ctx context.Context, req any) (any, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadPayloadAndReadStreamingPayload",
Method: "Method",
RawPayload: req,
}
res, err := i.Logging(ctx, info, endpoint)
if err != nil {
return nil, err
}
stream := res.(MethodServerStream)
return &wrappedMethodServerStream{
ctx: ctx,
recvWithContext: func(ctx context.Context) (*MethodStreamingPayload, context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadPayloadAndReadStreamingPayload",
Method: "Method",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
res, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var (
res *MethodStreamingPayload
err error
)
res, rCtx, err = stream.RecvWithContext(ctx)
return res, err
})
return res.(*MethodStreamingPayload), rCtx, err
},
stream: stream,
}, nil
}
}

// wrappedMethodServerStream is a server interceptor wrapper for the
// MethodServerStream stream.
type wrappedMethodServerStream struct {
ctx context.Context
recvWithContext func(context.Context) (*MethodStreamingPayload, context.Context, error)
stream MethodServerStream
}

// Recv reads instances of "MethodServerStream" from the stream after executing
// the applied interceptor.
func (w *wrappedMethodServerStream) Recv() (*MethodStreamingPayload, error) {
res, _, err := w.RecvWithContext(w.ctx)
return res, err
}

// RecvWithContext reads instances of "MethodServerStream" from the stream
// after executing the applied interceptor with context.
func (w *wrappedMethodServerStream) RecvWithContext(ctx context.Context) (*MethodStreamingPayload, context.Context, error) {
if w.recvWithContext == nil {
return w.stream.RecvWithContext(ctx)
}
return w.recvWithContext(ctx)
}

// Close closes the stream.
func (w *wrappedMethodServerStream) Close() error {
return w.stream.Close()
}

// wrapClientLoggingMethod applies the logging client interceptor to endpoints.
func wrapClientMethodLogging(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
return func(ctx context.Context, req any) (any, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadPayloadAndReadStreamingPayload",
Method: "Method",
RawPayload: req,
}
res, err := i.Logging(ctx, info, endpoint)
if err != nil {
return nil, err
}
stream := res.(MethodClientStream)
return &wrappedMethodClientStream{
ctx: ctx,
sendWithContext: func(ctx context.Context, req *MethodStreamingPayload) (context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadPayloadAndReadStreamingPayload",
Method: "Method",
Send: true,
RawPayload: req,
}
var rCtx context.Context
_, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var err error
rCtx, err = stream.SendWithContext(ctx, req.(*MethodStreamingPayload))
return nil, err
})
return rCtx, err
},
stream: stream,
}, nil
}
}

// wrappedMethodClientStream is a client interceptor wrapper for the
// MethodClientStream stream.
type wrappedMethodClientStream struct {
ctx context.Context
sendWithContext func(context.Context, *MethodStreamingPayload) (context.Context, error)
stream MethodClientStream
}

// Send streams instances of "MethodClientStream" after executing the applied
// interceptor.
func (w *wrappedMethodClientStream) Send(v *MethodStreamingPayload) error {
_, err := w.SendWithContext(w.ctx, v)
return err
}

// SendWithContext streams instances of "MethodClientStream" after executing
// the applied interceptor with context.
func (w *wrappedMethodClientStream) SendWithContext(ctx context.Context, v *MethodStreamingPayload) (context.Context, error) {
if w.sendWithContext == nil {
return w.stream.SendWithContext(ctx, v)
}
return w.sendWithContext(ctx, v)
}

// Close closes the stream.
func (w *wrappedMethodClientStream) Close() error {
return w.stream.Close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// ServerInterceptors defines the interface for all server-side interceptors.
// Server interceptors execute after the request is decoded and before the
// payload is sent to the service. The implementation is responsible for calling
// next to complete the request.
type ServerInterceptors interface {
Logging(ctx context.Context, info *LoggingInfo, next goa.Endpoint) (any, error)
}

// Access interfaces for interceptor payloads and results
type (
// LoggingInfo provides metadata about the current interception.
// It includes service name, method name, and access to the endpoint.
LoggingInfo goa.InterceptorInfo

// LoggingPayload provides type-safe access to the method payload.
// It allows reading and writing specific fields of the payload as defined
// in the design.
LoggingPayload interface {
Chunk() string
}

// LoggingStreamingPayload provides type-safe access to the method streaming payload.
// It allows reading and writing specific fields of the streaming payload as defined
// in the design.
LoggingStreamingPayload interface {
Chunk() string
}
)

// Private implementation types
type (
loggingMethodPayload struct {
payload *MethodPayload
}
loggingMethodStreamingPayload struct {
payload *MethodStreamingPayload
}
)

// WrapMethodEndpoint wraps the Method endpoint with the server-side
// interceptors defined in the design.
func WrapMethodEndpoint(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint {
endpoint = wrapMethodlogging(endpoint, i)
return endpoint
}

// Public accessor methods for Info types

// Payload returns a type-safe accessor for the method payload.
func (info *LoggingInfo) Payload() LoggingPayload {
return &loggingMethodPayload{payload: info.RawPayload.(*MethodPayload)}
}

// StreamingPayload returns a type-safe accessor for the method streaming payload.
func (info *LoggingInfo) StreamingPayload() LoggingStreamingPayload {
return &loggingMethodStreamingPayload{payload: info.RawPayload.(*MethodStreamingPayload)}
}

// Private implementation methods

func (p *loggingMethodPayload) Chunk() string {
if p.payload.Chunk == nil {
var zero string
return zero
}
return *p.payload.Chunk
}
func (p *loggingMethodStreamingPayload) Chunk() string {
if p.payload.Chunk == nil {
var zero string
return zero
}
return *p.payload.Chunk
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// ClientInterceptors defines the interface for all client-side interceptors.
// Client interceptors execute after the payload is encoded and before the request
// is sent to the server. The implementation is responsible for calling next to
// complete the request.
type ClientInterceptors interface {
Logging(ctx context.Context, info *LoggingInfo, next goa.Endpoint) (any, error)
}

// WrapMethodClientEndpoint wraps the Method endpoint with the client
// interceptors defined in the design.
func WrapMethodClientEndpoint(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
endpoint = wrapClientMethodlogging(endpoint, i)
return endpoint
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@


// wrapLoggingMethod applies the logging server interceptor to endpoints.
func wrapMethodLogging(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint {
return func(ctx context.Context, req any) (any, error) {
res, err := endpoint(ctx, req)
if err != nil {
return nil, err
}
stream := res.(MethodServerStream)
return &wrappedMethodServerStream{
ctx: ctx,
sendWithContext: func(ctx context.Context, req *MethodResult) (context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadStreamingResult",
Method: "Method",
Send: true,
RawPayload: req,
}
var rCtx context.Context
_, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var err error
rCtx, err = stream.SendWithContext(ctx, req.(*MethodResult))
return nil, err
})
return rCtx, err
},
stream: stream,
}, nil
}
}

// wrappedMethodServerStream is a server interceptor wrapper for the
// MethodServerStream stream.
type wrappedMethodServerStream struct {
ctx context.Context
sendWithContext func(context.Context, *MethodResult) (context.Context, error)
stream MethodServerStream
}

// Send streams instances of "MethodServerStream" after executing the applied
// interceptor.
func (w *wrappedMethodServerStream) Send(v *MethodResult) error {
_, err := w.SendWithContext(w.ctx, v)
return err
}

// SendWithContext streams instances of "MethodServerStream" after executing
// the applied interceptor with context.
func (w *wrappedMethodServerStream) SendWithContext(ctx context.Context, v *MethodResult) (context.Context, error) {
if w.sendWithContext == nil {
return w.stream.SendWithContext(ctx, v)
}
return w.sendWithContext(ctx, v)
}

// Close closes the stream.
func (w *wrappedMethodServerStream) Close() error {
return w.stream.Close()
}

// wrapClientLoggingMethod applies the logging client interceptor to endpoints.
func wrapClientMethodLogging(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
return func(ctx context.Context, req any) (any, error) {
res, err := endpoint(ctx, req)
if err != nil {
return nil, err
}
stream := res.(MethodClientStream)
return &wrappedMethodClientStream{
ctx: ctx,
recvWithContext: func(ctx context.Context) (*MethodResult, context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptorsWithReadStreamingResult",
Method: "Method",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
res, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var (
res *MethodResult
err error
)
res, rCtx, err = stream.RecvWithContext(ctx)
return res, err
})
return res.(*MethodResult), rCtx, err
},
stream: stream,
}, nil
}
}

// wrappedMethodClientStream is a client interceptor wrapper for the
// MethodClientStream stream.
type wrappedMethodClientStream struct {
ctx context.Context
recvWithContext func(context.Context) (*MethodResult, context.Context, error)
stream MethodClientStream
}

// Recv reads instances of "MethodClientStream" from the stream after executing
// the applied interceptor.
func (w *wrappedMethodClientStream) Recv() (*MethodResult, error) {
res, _, err := w.RecvWithContext(w.ctx)
return res, err
}

// RecvWithContext reads instances of "MethodClientStream" from the stream
// after executing the applied interceptor with context.
func (w *wrappedMethodClientStream) RecvWithContext(ctx context.Context) (*MethodResult, context.Context, error) {
if w.recvWithContext == nil {
return w.stream.RecvWithContext(ctx)
}
return w.recvWithContext(ctx)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// ServerInterceptors defines the interface for all server-side interceptors.
// Server interceptors execute after the request is decoded and before the
// payload is sent to the service. The implementation is responsible for calling
// next to complete the request.
type ServerInterceptors interface {
Logging(ctx context.Context, info *LoggingInfo, next goa.Endpoint) (any, error)
}

// Access interfaces for interceptor payloads and results
type (
// LoggingInfo provides metadata about the current interception.
// It includes service name, method name, and access to the endpoint.
LoggingInfo goa.InterceptorInfo

// LoggingStreamingResult provides type-safe access to the method streaming result.
// It allows reading and writing specific fields of the streaming result as defined
// in the design.
LoggingStreamingResult interface {
Data() string
}
)

// Private implementation types
type (
loggingMethodStreamingResult struct {
result *MethodResult
}
)

// WrapMethodEndpoint wraps the Method endpoint with the server-side
// interceptors defined in the design.
func WrapMethodEndpoint(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint {
endpoint = wrapMethodlogging(endpoint, i)
return endpoint
}

// Public accessor methods for Info types
// StreamingResult returns a type-safe accessor for the method streaming result.
func (info *LoggingInfo) StreamingResult(res any) LoggingStreamingResult {
return &loggingMethodStreamingResult{result: res.(*MethodResult)}
}

// Private implementation methods

func (r *loggingMethodStreamingResult) Data() string {
if r.result.Data == nil {
var zero string
return zero
}
return *r.result.Data
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// ClientInterceptors defines the interface for all client-side interceptors.
// Client interceptors execute after the payload is encoded and before the request
// is sent to the server. The implementation is responsible for calling next to
// complete the request.
type ClientInterceptors interface {
Logging(ctx context.Context, info *LoggingInfo, next goa.Endpoint) (any, error)
}

// WrapMethodClientEndpoint wraps the Method endpoint with the client
// interceptors defined in the design.
func WrapMethodClientEndpoint(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
endpoint = wrapClientMethodlogging(endpoint, i)
return endpoint
}
Original file line number Diff line number Diff line change
@@ -13,7 +13,8 @@ func wrapMethodLogging(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint
sendWithContext: func(ctx context.Context, req *MethodResult) (context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptors",
Method: "Method.SendWithContext",
Method: "Method",
Send: true,
RawPayload: req,
}
var rCtx context.Context
@@ -27,7 +28,8 @@ func wrapMethodLogging(endpoint goa.Endpoint, i ServerInterceptors) goa.Endpoint
recvWithContext: func(ctx context.Context) (*MethodStreamingPayload, context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptors",
Method: "Method.RecvWithContext",
Method: "Method",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
@@ -91,3 +93,97 @@ func (w *wrappedMethodServerStream) RecvWithContext(ctx context.Context) (*Metho
func (w *wrappedMethodServerStream) Close() error {
return w.stream.Close()
}

// wrapClientLoggingMethod applies the logging client interceptor to endpoints.
func wrapClientMethodLogging(endpoint goa.Endpoint, i ClientInterceptors) goa.Endpoint {
return func(ctx context.Context, req any) (any, error) {
res, err := endpoint(ctx, req)
if err != nil {
return nil, err
}
stream := res.(MethodClientStream)
return &wrappedMethodClientStream{
ctx: ctx,
sendWithContext: func(ctx context.Context, req *MethodStreamingPayload) (context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptors",
Method: "Method",
Send: true,
RawPayload: req,
}
var rCtx context.Context
_, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var err error
rCtx, err = stream.SendWithContext(ctx, req.(*MethodStreamingPayload))
return nil, err
})
return rCtx, err
},
recvWithContext: func(ctx context.Context) (*MethodResult, context.Context, error) {
info := &LoggingInfo{
Service: "StreamingInterceptors",
Method: "Method",
Recv: true,
RawPayload: req,
}
var rCtx context.Context
res, err := i.Logging(ctx, info, func(ctx context.Context, req any) (any, error) {
var (
res *MethodResult
err error
)
res, rCtx, err = stream.RecvWithContext(ctx)
return res, err
})
return res.(*MethodResult), rCtx, err
},
stream: stream,
}, nil
}
}

// wrappedMethodClientStream is a client interceptor wrapper for the
// MethodClientStream stream.
type wrappedMethodClientStream struct {
ctx context.Context
sendWithContext func(context.Context, *MethodStreamingPayload) (context.Context, error)
recvWithContext func(context.Context) (*MethodResult, context.Context, error)
stream MethodClientStream
}

// Send streams instances of "MethodClientStream" after executing the applied
// interceptor.
func (w *wrappedMethodClientStream) Send(v *MethodStreamingPayload) error {
_, err := w.SendWithContext(w.ctx, v)
return err
}

// SendWithContext streams instances of "MethodClientStream" after executing
// the applied interceptor with context.
func (w *wrappedMethodClientStream) SendWithContext(ctx context.Context, v *MethodStreamingPayload) (context.Context, error) {
if w.sendWithContext == nil {
return w.stream.SendWithContext(ctx, v)
}
return w.sendWithContext(ctx, v)
}

// Recv reads instances of "MethodClientStream" from the stream after executing
// the applied interceptor.
func (w *wrappedMethodClientStream) Recv() (*MethodResult, error) {
res, _, err := w.RecvWithContext(w.ctx)
return res, err
}

// RecvWithContext reads instances of "MethodClientStream" from the stream
// after executing the applied interceptor with context.
func (w *wrappedMethodClientStream) RecvWithContext(ctx context.Context) (*MethodResult, context.Context, error) {
if w.recvWithContext == nil {
return w.stream.RecvWithContext(ctx)
}
return w.recvWithContext(ctx)
}

// Close closes the stream.
func (w *wrappedMethodClientStream) Close() error {
return w.stream.Close()
}
43 changes: 43 additions & 0 deletions codegen/service/testdata/interceptors_dsls.go
Original file line number Diff line number Diff line change
@@ -230,6 +230,7 @@ var StreamingInterceptorsDSL = func() {
})
Service("StreamingInterceptors", func() {
ServerInterceptor("logging")
ClientInterceptor("logging")
Method("Method", func() {
StreamingPayload(func() {
Attribute("chunk", String)
@@ -242,6 +243,48 @@ var StreamingInterceptorsDSL = func() {
})
}

var StreamingInterceptorsWithReadPayloadAndReadStreamingPayloadDSL = func() {
Interceptor("logging", func() {
ReadPayload(func() {
Attribute("chunk")
})
ReadStreamingPayload(func() {
Attribute("chunk")
})
})
Service("StreamingInterceptorsWithReadPayloadAndReadStreamingPayload", func() {
ServerInterceptor("logging")
ClientInterceptor("logging")
Method("Method", func() {
Payload(func() {
Field(1, "chunk", String)
})
StreamingPayload(func() {
Field(1, "chunk", String)
})
GRPC(func() {})
})
})
}

var StreamingInterceptorsWithReadStreamingResultDSL = func() {
Interceptor("logging", func() {
ReadStreamingResult(func() {
Attribute("data")
})
})
Service("StreamingInterceptorsWithReadStreamingResult", func() {
ServerInterceptor("logging")
ClientInterceptor("logging")
Method("Method", func() {
StreamingResult(func() {
Field(1, "data", String)
})
GRPC(func() {})
})
})
}

var StreamingInterceptorsWithReadPayloadDSL = func() {
Interceptor("logging", func() {
ReadPayload(func() {
4 changes: 4 additions & 0 deletions pkg/interceptor.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,10 @@ type (
Service string
// Name of method handling request
Method string
// Send is true if the request is a streaming Send
Send bool
// Recv is true if the request is a streaming Recv
Recv bool
// Payload of request
RawPayload any
}