Skip to content

Commit

Permalink
Streaming Interceptors (#3641)
Browse files Browse the repository at this point in the history
* Add read/write streaming payload/result methods to interceptor DSL

* Weave streaming interceptor stuff through codegen

* Do not try to use a field from an interface for the raw payload when wrapping streaming methods

* Fix golden files

* Collect attributes from a method that actually has the interceptor applied; import the correct user types packages when rendering service_interceptors.go; fix an ancient typo

* Fix StreamingResult accessor signature and return

* Add SendWithContext and RecvWithContext methods to the stream interface generation

* Progress on wrapping streams with interceptors

* Finish generating the stream wrapping for interceptors; remove the Endpoint field from goa.InterceptorInfo struct since it is redundant with the next goa.Endpoint parameter sent to interceptors

* Progress on adding comments and fixing tests; fix a bug where refs were defs

* Finish fixing tests

* Add more tests for streaming interceptor service codegen

* Fix bug where the CLI ParseEndpoint method would try to wrap every method with interceptors even if they do not apply

* Update InterceptorInfo comment and replace Send and Recv boolean fields with a CallType enum; organize interceptor wrappers file sections as types, functions, then methods

* Update Golden files to make the Interceptor tests happy

* Add a ReturnContext field to goa.InterceptorInfo to allow interceptors to modify the context returned by SendWithContext/RecvWithContext even after calling next

* Scrap ReturnContext field in favor of changing the interceptor interface to return a context itself

* Update Golden files to make the Interceptor tests happy again

* Change goa.InterceptorInfo to an interface that generated interceptor info structs implement

* Separate StreamingPayload and StreamingResult methods to Client and Server variations

* Use goa.Endpoint for next again and do not return contexts from SendWithContext/ReceiveWithContext

* Address lint issues

* Fix lint issue

---------

Co-authored-by: Raphael Simon <[email protected]>
  • Loading branch information
douglaswth and raphael authored Feb 9, 2025
1 parent 2cd5aaa commit d581e4e
Show file tree
Hide file tree
Showing 94 changed files with 3,403 additions and 586 deletions.
29 changes: 17 additions & 12 deletions codegen/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type (
Conversion string
// Example is a valid command invocation, starting with the command name.
Example string
// Interceptors contains the data for client interceptors if any apply to the endpoint method.
Interceptors *InterceptorData
}

// InterceptorData contains the data needed to generate interceptor code.
Expand Down Expand Up @@ -181,22 +183,16 @@ func BuildCommandData(data *service.Data) *CommandData {

// BuildSubcommandData builds the data needed by CLI code generators to render
// the CLI parsing of the service sub-command.
func BuildSubcommandData(svcName string, m *service.MethodData, buildFunction *BuildFunctionData, flags []*FlagData) *SubcommandData {
var (
name string
fullName string
description string

conversion string
)
func BuildSubcommandData(data *service.Data, m *service.MethodData, buildFunction *BuildFunctionData, flags []*FlagData) *SubcommandData {
en := m.Name
name = codegen.KebabCase(en)
fullName = goifyTerms(svcName, en)
description = m.Description
name := codegen.KebabCase(en)
fullName := goifyTerms(data.Name, en)
description := m.Description
if description == "" {
description = fmt.Sprintf("Make request to the %q endpoint", m.Name)
}

var conversion string
if m.Payload != "" && buildFunction == nil && len(flags) > 0 {
// No build function, just convert the arg to the body type
var convPre, convSuff string
Expand Down Expand Up @@ -226,6 +222,14 @@ func BuildSubcommandData(svcName string, m *service.MethodData, buildFunction *B
conversion += "\n}"
}
}

var interceptors *InterceptorData
if len(m.ClientInterceptors) > 0 {
interceptors = &InterceptorData{
VarName: "inter",
PkgName: data.PkgName,
}
}
sub := &SubcommandData{
Name: name,
FullName: fullName,
Expand All @@ -234,8 +238,9 @@ func BuildSubcommandData(svcName string, m *service.MethodData, buildFunction *B
MethodVarName: m.VarName,
BuildFunction: buildFunction,
Conversion: conversion,
Interceptors: interceptors,
}
generateExample(sub, svcName)
generateExample(sub, data.Name)

return sub
}
Expand Down
78 changes: 76 additions & 2 deletions codegen/service/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func interceptorFile(svc *Data, server bool) *codegen.File {
},
}
if len(interceptors) > 0 {
codegen.AddImport(sections[0], svc.UserTypeImports...)
sections = append(sections, &codegen.SectionTemplate{
Name: "interceptor-types",
Source: readTemplate("interceptors_types"),
Expand Down Expand Up @@ -139,7 +140,34 @@ func wrapperFile(svc *Data) *codegen.File {
codegen.GoaImport(""),
}))

// Generate the interceptor wrapper functions first (only once)
// Generate any interceptor stream wrapper struct types first
var wrappedServerStreams, wrappedClientStreams []*StreamInterceptorData
if len(svc.ServerInterceptors) > 0 {
wrappedServerStreams = collectWrappedStreams(svc.ServerInterceptors, true)
if len(wrappedServerStreams) > 0 {
sections = append(sections, &codegen.SectionTemplate{
Name: "server-interceptor-stream-wrapper-types",
Source: readTemplate("server_interceptor_stream_wrapper_types"),
Data: map[string]interface{}{
"WrappedServerStreams": wrappedServerStreams,
},
})
}
}
if len(svc.ClientInterceptors) > 0 {
wrappedClientStreams = collectWrappedStreams(svc.ClientInterceptors, false)
if len(wrappedClientStreams) > 0 {
sections = append(sections, &codegen.SectionTemplate{
Name: "client-interceptor-stream-wrapper-types",
Source: readTemplate("client_interceptor_stream_wrapper_types"),
Data: map[string]interface{}{
"WrappedClientStreams": wrappedClientStreams,
},
})
}
}

// Generate the interceptor wrapper functions next (only once)
if len(svc.ServerInterceptors) > 0 {
sections = append(sections, &codegen.SectionTemplate{
Name: "server-interceptor-wrappers",
Expand All @@ -161,6 +189,26 @@ func wrapperFile(svc *Data) *codegen.File {
})
}

// Generate any interceptor stream wrapper struct methods last
if len(wrappedServerStreams) > 0 {
sections = append(sections, &codegen.SectionTemplate{
Name: "server-interceptor-stream-wrappers",
Source: readTemplate("server_interceptor_stream_wrappers"),
Data: map[string]interface{}{
"WrappedServerStreams": wrappedServerStreams,
},
})
}
if len(wrappedClientStreams) > 0 {
sections = append(sections, &codegen.SectionTemplate{
Name: "client-interceptor-stream-wrappers",
Source: readTemplate("client_interceptor_stream_wrappers"),
Data: map[string]interface{}{
"WrappedClientStreams": wrappedClientStreams,
},
})
}

return &codegen.File{
Path: path,
SectionTemplates: sections,
Expand All @@ -171,9 +219,35 @@ func wrapperFile(svc *Data) *codegen.File {
// private implementation types.
func hasPrivateImplementationTypes(interceptors []*InterceptorData) bool {
for _, intr := range interceptors {
if intr.ReadPayload != nil || intr.WritePayload != nil || intr.ReadResult != nil || intr.WriteResult != nil {
if intr.ReadPayload != nil || intr.WritePayload != nil || intr.ReadResult != nil || intr.WriteResult != nil || intr.ReadStreamingPayload != nil || intr.WriteStreamingPayload != nil || intr.ReadStreamingResult != nil || intr.WriteStreamingResult != nil {
return true
}
}
return false
}

// collectWrappedStreams returns a slice of streams to be wrapped by interceptor wrapper functions.
func collectWrappedStreams(interceptors []*InterceptorData, server bool) []*StreamInterceptorData {
var (
streams []*StreamInterceptorData
streamNames = make(map[string]struct{})
)
for _, intr := range interceptors {
if intr.HasStreamingPayloadAccess || intr.HasStreamingResultAccess {
for _, method := range intr.Methods {
if server {
if _, ok := streamNames[method.ServerStream.Interface]; !ok {
streams = append(streams, method.ServerStream)
streamNames[method.ServerStream.Interface] = struct{}{}
}
} else {
if _, ok := streamNames[method.ClientStream.Interface]; !ok {
streams = append(streams, method.ClientStream)
streamNames[method.ClientStream.Interface] = struct{}{}
}
}
}
}
}
return streams
}
30 changes: 22 additions & 8 deletions codegen/service/interceptors.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Interceptors Code Generation

Goa generates interceptor code to enable request/response interception and payload/result access.
Goa generates interceptor code to enable request/response interception and payload/result access.

---

Expand All @@ -10,8 +10,8 @@ Goa generates interceptor code to enable request/response interception and paylo

Client and server interceptor code is generated in:

- `gen/client_interceptors.go`
- `gen/service_interceptors.go`
* `gen/client_interceptors.go`
* `gen/service_interceptors.go`

### Templates Used

Expand All @@ -26,12 +26,13 @@ Client and server interceptor code is generated in:
3. **`client_wrappers.go.tpl`** and **`endpoint_wrappers.go.tpl`**
Generate code that wraps client and service endpoints with interceptor callbacks.
Each template takes a map with:

```go
map[string]any{
"MethodVarName": <Implemented method name>
"Method": <Design method name>
"Service": <Service name>
"Interceptors": <Slice of InterceptorData>
"MethodVarName": <Implemented method name>
"Method": <Design method name>
"Service": <Service name>
"Interceptors": <Slice of InterceptorData>
}
```

Expand All @@ -43,12 +44,13 @@ Client and server interceptor code is generated in:

Endpoint wrapper code for both client and server interceptors is generated in:

- `gen/interceptor_wrappers.go`
* `gen/interceptor_wrappers.go`

### Templates Used

1. **`server_interceptor_wrappers.go.tpl`**
Generates server-specific wrapper implementations. This template receives a map with:

```go
map[string]any{
"Service": svc.Name,
Expand All @@ -58,6 +60,7 @@ Endpoint wrapper code for both client and server interceptors is generated in:

2. **`client_interceptor_wrappers.go.tpl`**
Generates client-specific wrapper implementations. This template receives a map with:

```go
map[string]any{
"Service": svc.Name,
Expand All @@ -78,6 +81,7 @@ Example interceptors are generated by the example command in an interceptors sub

1. **`example_client_interceptor.go.tpl` and `example_server_interceptor.go.tpl`**
Generate example interceptor implementations. Each template takes a map with:

```go
map[string]any{
"StructName": <interceptor struct name>
Expand Down Expand Up @@ -122,10 +126,16 @@ The main structure describing each interceptor’s metadata and requirements:
* `Description`: Interceptor description from the design
* `HasPayloadAccess`: Indicates if any method requires payload access
* `HasResultAccess`: Indicates if any method requires result access
* `HasStreamingPayloadAccess`: Indicates if any method requires streaming payload access
* `HasStreamingResultAccess`: Indicates if any method requires streaming result access
* `ReadPayload`: List of readable payload fields ([]AttributeData)
* `WritePayload`: List of writable payload fields ([]AttributeData)
* `ReadResult`: List of readable result fields ([]AttributeData)
* `WriteResult`: List of writable result fields ([]AttributeData)
* `ReadStreamingPayload`: List of readable streaming payload fields ([]AttributeData)
* `WriteStreamingPayload`: List of writable streaming payload fields ([]AttributeData)
* `ReadStreamingResult`: List of readable streaming result fields ([]AttributeData)
* `WriteStreamingResult`: List of writable streaming result fields ([]AttributeData)
* `Methods`: A list of MethodInterceptorData containing method-specific interceptor information
* `ServerStreamInputStruct`: Server stream variable name (used if streaming)
* `ClientStreamInputStruct`: Client stream variable name (used if streaming)
Expand All @@ -137,8 +147,12 @@ Stores per-method interceptor configuration:
* `MethodName`: The method’s Go variable name
* `PayloadAccess`: Name of the payload access type
* `ResultAccess`: Name of the result access type
* `StreamingPayloadAccess`: Name of the streaming payload access type
* `StreamingResultAccess`: Name of the streaming result access type
* `PayloadRef`: Reference to the method's payload type
* `ResultRef`: Reference to the method's result type
* `StreamingPayloadRef`: Reference to the method's streaming payload type
* `StreamingResultRef`: Reference to the method's streaming result type

### `AttributeData`

Expand Down
4 changes: 3 additions & 1 deletion codegen/service/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func TestInterceptors(t *testing.T) {
{"interceptor-with-read-result", testdata.InterceptorWithReadResultDSL, 3},
{"interceptor-with-write-result", testdata.InterceptorWithWriteResultDSL, 3},
{"interceptor-with-read-write-result", testdata.InterceptorWithReadWriteResultDSL, 3},
{"streaming-interceptors", testdata.StreamingInterceptorsDSL, 2},
{"streaming-interceptors", testdata.StreamingInterceptorsDSL, 3},
{"streaming-interceptors-with-read-payload-and-read-streaming-payload", testdata.StreamingInterceptorsWithReadPayloadAndReadStreamingPayloadDSL, 3},
{"streaming-interceptors-with-read-streaming-result", testdata.StreamingInterceptorsWithReadStreamingResultDSL, 3},
{"streaming-interceptors-with-read-payload", testdata.StreamingInterceptorsWithReadPayloadDSL, 2},
{"streaming-interceptors-with-read-result", testdata.StreamingInterceptorsWithReadResultDSL, 2},
}
Expand Down
2 changes: 1 addition & 1 deletion codegen/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Files(genpkg string, service *expr.ServiceExpr, userTypePkgs map[string][]s
if m.StreamingPayloadDef != "" {
if _, ok := seen[m.StreamingPayload]; !ok {
addTypeDefSection(payloadPath, m.StreamingPayload, &codegen.SectionTemplate{
Name: "service-streamig-payload",
Name: "service-streaming-payload",
Source: readTemplate("streaming_payload"),
Data: m,
})
Expand Down
Loading

0 comments on commit d581e4e

Please sign in to comment.