diff --git a/pkg/codegen/generators/v2/templates/helpers.go b/pkg/codegen/generators/v2/templates/helpers.go index 7d5d284a..8e4d6034 100644 --- a/pkg/codegen/generators/v2/templates/helpers.go +++ b/pkg/codegen/generators/v2/templates/helpers.go @@ -142,7 +142,7 @@ func OperationName(channel asyncapi.Channel) string { } var isFieldPointer = func(parent asyncapi.Schema, field string, schema asyncapi.Schema) bool { - return !IsRequired(parent, field) && schema.Type != "array" + return !(IsRequired(parent, field) || schema.IsRequired) && schema.Type != "array" } // ForcePointerOnFields is used to force the generation of all fields as pointers, except for arrays. diff --git a/pkg/codegen/generators/v2/templates/message.tmpl b/pkg/codegen/generators/v2/templates/message.tmpl index 0b00570d..9b0e1281 100644 --- a/pkg/codegen/generators/v2/templates/message.tmpl +++ b/pkg/codegen/generators/v2/templates/message.tmpl @@ -107,11 +107,12 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N {{- end}} {{- /* For each header */}} + {{- $headers := .Headers -}} {{- range $key, $value := $headerProperties}} case k == "{{$key}}": // Retrieving {{namify $key}} header - {{- if $value.IsRequired }} + {{- if isFieldPointer $headers $key $value }} {{- if eq $value.Type "object" }} - err := json.Unmarshal(v, &msg.Headers.{{ namify $key}}) + err := json.Unmarshal(v, msg.Headers.{{ namify $key}}) if err != nil { return msg, err } @@ -120,13 +121,14 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N if err != nil { return msg, err } - msg.Headers.{{ namify $key}} = t + msg.Headers.{{ namify $key}} = &t {{- else}} - msg.Headers.{{ namify $key}} = {{$value.Type}}(v) + h := {{$value.Type}}(v) + msg.Headers.{{ namify $key}} = &h {{- end}} {{- else}} {{- if eq $value.Type "object" }} - err := json.Unmarshal(v, msg.Headers.{{ namify $key}}) + err := json.Unmarshal(v, &msg.Headers.{{ namify $key}}) if err != nil { return msg, err } @@ -135,10 +137,9 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N if err != nil { return msg, err } - msg.Headers.{{ namify $key}} = &t + msg.Headers.{{ namify $key}} = t {{- else}} - h := {{$value.Type}}(v) - msg.Headers.{{ namify $key}} = &h + msg.Headers.{{ namify $key}} = {{$value.Type}}(v) {{- end}} {{- end}} {{- end}} @@ -198,7 +199,6 @@ func (msg {{namify .Name}}) toBrokerMessage() (extensions.BrokerMessage, error) {{/* Handle headers, if defined */}} {{ if .Headers -}} - {{- /* Get headers by reference, or not*/}} {{- $headerProperties := .Headers.Properties}} {{if .Headers.Reference }} @@ -207,21 +207,29 @@ func (msg {{namify .Name}}) toBrokerMessage() (extensions.BrokerMessage, error) // Add each headers to broker message headers := make(map[string][]byte, {{ len $headerProperties }}) - + {{- $headers := .Headers -}} {{/* For each header */ -}} - {{- range $key, $value := $headerProperties -}} + {{- range $key, $value := $headerProperties }} + // Adding {{ namify $key}} header {{- if $value.IsRequired }} + {{- $dereferenceOp := "" -}} + {{- if isFieldPointer $headers $key $value -}} + {{- $dereferenceOp = "*" }} + if msg.Headers.{{ namify $key}} == nil { + return extensions.BrokerMessage{}, fmt.Errorf("field {{ namify $key}} should not be nil") + } + {{- end -}} {{- if eq $value.Type "object" }} - h, err := json.Marshal(msg.Headers.{{ namify $key}}) + h, err := json.Marshal({{ $dereferenceOp }}msg.Headers.{{ namify $key}}) if err != nil { return extensions.BrokerMessage{}, err } headers["{{$key}}"] = h {{- else if isDateOrDateTimeGenerated $value.Format }} - headers["{{$key}}"] = []byte(msg.Headers.{{namify $key}}.Format(time.RFC3339)) + headers["{{$key}}"] = []byte({{ $dereferenceOp }}msg.Headers.{{namify $key}}.Format(time.RFC3339)) {{- else }} - headers["{{$key}}"] = []byte(msg.Headers.{{namify $key}}) + headers["{{$key}}"] = []byte({{ $dereferenceOp }}msg.Headers.{{namify $key}}) {{- end }} {{- else}} if msg.Headers.{{namify $key}} != nil { diff --git a/pkg/codegen/generators/v3/templates/helpers.go b/pkg/codegen/generators/v3/templates/helpers.go index 6df5e391..b37b96a8 100644 --- a/pkg/codegen/generators/v3/templates/helpers.go +++ b/pkg/codegen/generators/v3/templates/helpers.go @@ -128,7 +128,7 @@ func GenerateChannelAddr(ch *asyncapi.Channel) string { } var isFieldPointer = func(parent asyncapi.Schema, field string, schema asyncapi.Schema) bool { - return !IsRequired(parent, field) && schema.Type != "array" + return !(IsRequired(parent, field) || schema.IsRequired) && schema.Type != "array" } // ForcePointerOnFields is used to force the generation of all fields as pointers, except for arrays. diff --git a/pkg/codegen/generators/v3/templates/message.tmpl b/pkg/codegen/generators/v3/templates/message.tmpl index 31100bf3..e9dd4edf 100644 --- a/pkg/codegen/generators/v3/templates/message.tmpl +++ b/pkg/codegen/generators/v3/templates/message.tmpl @@ -116,11 +116,12 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N {{- end}} {{- /* For each header */}} + {{- $headers := .Headers -}} {{- range $key, $value := $headerProperties}} case k == "{{$key}}": // Retrieving {{namify $key}} header - {{- if $value.IsRequired }} + {{- if isFieldPointer $headers $key $value }} {{- if eq $value.Type "object" }} - err := json.Unmarshal(v, &msg.Headers.{{ namify $key}}) + err := json.Unmarshal(v, msg.Headers.{{ namify $key}}) if err != nil { return msg, err } @@ -129,17 +130,18 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N if err != nil { return msg, err } - msg.Headers.{{ namify $key}} = t + msg.Headers.{{ namify $key}} = &t {{- else}} {{- if $value.Reference }} - msg.Headers.{{ namify $key}} = {{$value.ReferenceTo.Name}}(v) + h := {{$value.ReferenceTo.Name}}(v) {{- else }} - msg.Headers.{{ namify $key}} = {{$value.Type}}(v) + h := {{$value.Type}}(v) {{- end}} + msg.Headers.{{ namify $key}} = &h {{- end}} {{- else}} {{- if eq $value.Type "object" }} - err := json.Unmarshal(v, msg.Headers.{{ namify $key}}) + err := json.Unmarshal(v, &msg.Headers.{{ namify $key}}) if err != nil { return msg, err } @@ -148,14 +150,13 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N if err != nil { return msg, err } - msg.Headers.{{ namify $key}} = &t + msg.Headers.{{ namify $key}} = t {{- else}} {{- if $value.Reference }} - h := {{$value.ReferenceTo.Name}}(v) + msg.Headers.{{ namify $key}} = {{$value.ReferenceTo.Name}}(v) {{- else }} - h := {{$value.Type}}(v) + msg.Headers.{{ namify $key}} = {{$value.Type}}(v) {{- end}} - msg.Headers.{{ namify $key}} = &h {{- end}} {{- end}} {{- end}} @@ -224,21 +225,29 @@ func (msg {{namify .Name}}) toBrokerMessage() (extensions.BrokerMessage, error) // Add each headers to broker message headers := make(map[string][]byte, {{ len $headerProperties }}) - + {{- $headers := .Headers -}} {{/* For each header */ -}} - {{- range $key, $value := $headerProperties -}} + {{- range $key, $value := $headerProperties }} + // Adding {{ namify $key}} header {{- if $value.IsRequired }} + {{- $dereferenceOp := "" -}} + {{- if isFieldPointer $headers $key $value -}} + {{- $dereferenceOp = "*" }} + if msg.Headers.{{ namify $key}} == nil { + return extensions.BrokerMessage{}, fmt.Errorf("field {{ namify $key}} should not be nil") + } + {{- end -}} {{- if eq $value.Type "object" }} - h, err := json.Marshal(msg.Headers.{{ namify $key}}) + h, err := json.Marshal({{ $dereferenceOp }}msg.Headers.{{ namify $key}}) if err != nil { return extensions.BrokerMessage{}, err } headers["{{$key}}"] = h {{- else if isDateOrDateTimeGenerated $value.Format }} - headers["{{$key}}"] = []byte(msg.Headers.{{namify $key}}.Format(time.RFC3339)) + headers["{{$key}}"] = []byte({{ $dereferenceOp }}msg.Headers.{{namify $key}}.Format(time.RFC3339)) {{- else }} - headers["{{$key}}"] = []byte(msg.Headers.{{namify $key}}) + headers["{{$key}}"] = []byte({{ $dereferenceOp }}msg.Headers.{{namify $key}}) {{- end }} {{- else}} if msg.Headers.{{namify $key}} != nil { diff --git a/test/v2/issues/262/asyncapi.gen.go b/test/v2/issues/262/asyncapi.gen.go new file mode 100644 index 00000000..96aa5686 --- /dev/null +++ b/test/v2/issues/262/asyncapi.gen.go @@ -0,0 +1,353 @@ +// Package "issue262" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package issue262 + +import ( + "context" + "fmt" + "time" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addAppContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// UserController is the structure that provides publishing capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addUserContextValues(ctx context.Context, path string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, path) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// AsyncAPIVersion is the version of the used AsyncAPI document +const AsyncAPIVersion = "1.2.3" + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // subscriptions is a map of all subscriptions + subscriptions map[string]extensions.BrokerChannelSubscription + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware + // handler to handle errors from consumers and middlewares + errorHandler extensions.ErrorHandler +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +// WithErrorHandler attaches a errorhandler to handle errors from subscriber functions +func WithErrorHandler(handler extensions.ErrorHandler) ControllerOption { + return func(controller *controller) { + controller.errorHandler = handler + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// TestMessageHeaders is a schema from the AsyncAPI specification required in messages +type TestMessageHeaders struct { + FieldNonReq *string `json:"fieldNonReq,omitempty"` + FieldReq *string `json:"fieldReq" validate:"required"` + SomeDateTime *time.Time `json:"someDateTime,omitempty"` +} + +// TestMessage is the message expected for 'TestMessage' channel. +type TestMessage struct { + // Headers will be used to fill the message headers + Headers TestMessageHeaders + + // Payload will be inserted in the message payload + Payload string +} + +func NewTestMessage() TestMessage { + var msg TestMessage + + return msg +} + +// brokerMessageToTestMessage will fill a new TestMessage with data from generic broker message +func brokerMessageToTestMessage(bMsg extensions.BrokerMessage) (TestMessage, error) { + var msg TestMessage + + // Convert to string + payload := string(bMsg.Payload) + msg.Payload = payload // No need for type conversion to reference + + // Get each headers from broker message + for k, v := range bMsg.Headers { + switch { + case k == "fieldNonReq": // Retrieving FieldNonReq header + h := string(v) + msg.Headers.FieldNonReq = &h + case k == "fieldReq": // Retrieving FieldReq header + h := string(v) + msg.Headers.FieldReq = &h + case k == "someDateTime": // Retrieving SomeDateTime header + t, err := time.Parse(time.RFC3339, string(v)) + if err != nil { + return msg, err + } + msg.Headers.SomeDateTime = &t + default: + // TODO: log unknown error + } + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from TestMessage data +func (msg TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Convert to []byte + payload := []byte(msg.Payload) + + // Add each headers to broker message + headers := make(map[string][]byte, 3) + + // Adding FieldNonReq header + if msg.Headers.FieldNonReq != nil { + headers["fieldNonReq"] = []byte(*msg.Headers.FieldNonReq) + } + + // Adding FieldReq header + if msg.Headers.FieldReq == nil { + return extensions.BrokerMessage{}, fmt.Errorf("field FieldReq should not be nil") + } + headers["fieldReq"] = []byte(*msg.Headers.FieldReq) + + // Adding SomeDateTime header + if msg.Headers.SomeDateTime != nil { + headers["someDateTime"] = []byte(msg.Headers.SomeDateTime.Format(time.RFC3339)) + } + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} diff --git a/test/v2/issues/262/asyncapi.yaml b/test/v2/issues/262/asyncapi.yaml new file mode 100644 index 00000000..97b086c5 --- /dev/null +++ b/test/v2/issues/262/asyncapi.yaml @@ -0,0 +1,21 @@ +asyncapi: 2.6.0 +info: + title: Sample App + version: 1.2.3 +components: + messages: + Test: + headers: + type: object + required: + - fieldReq + properties: + fieldNonReq: + type: string + fieldReq: + type: string + someDateTime: + type: string + format: date-time + payload: + type: string \ No newline at end of file diff --git a/test/v2/issues/262/suite_test.go b/test/v2/issues/262/suite_test.go new file mode 100644 index 00000000..8addb81f --- /dev/null +++ b/test/v2/issues/262/suite_test.go @@ -0,0 +1,54 @@ +//go:generate go run ../../../../cmd/asyncapi-codegen -p issue262 --force-pointers -i ./asyncapi.yaml -o ./asyncapi.gen.go + +package issue262 + +import ( + "testing" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestSuite(t *testing.T) { + suite.Run(t, NewSuite()) +} + +type Suite struct { + suite.Suite +} + +func NewSuite() *Suite { + return &Suite{} +} + +func (suite *Suite) TestToBrokerMessage() { + t := TestMessage{ + Headers: TestMessageHeaders{ + // All fields should be pointers (nillable) + FieldNonReq: nil, + FieldReq: nil, + SomeDateTime: nil, + }, + Payload: "", + } + + // toBrokerMessage should return an error due to FieldReq being required + _, err := t.toBrokerMessage() + assert.ErrorContains(suite.T(), err, "field FieldReq should not be nil") + str := "Something" + t.Headers.FieldReq = &str + _, err = t.toBrokerMessage() + assert.NoError(suite.T(), err) +} + +func (suite *Suite) TestBrokerMessageToTestMessage() { + // BrokerMessageToTestMessage should be valid + _, err := brokerMessageToTestMessage( + extensions.BrokerMessage{ + Headers: map[string][]byte{}, + }, + ) + // There is currently no check on fields returned by brokerMessageToTestMessage + assert.NoError(suite.T(), err) +} diff --git a/test/v2/issues/74/asyncapi.gen.go b/test/v2/issues/74/asyncapi.gen.go index 93234426..b9c9c227 100644 --- a/test/v2/issues/74/asyncapi.gen.go +++ b/test/v2/issues/74/asyncapi.gen.go @@ -512,7 +512,9 @@ func (msg TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { headers := make(map[string][]byte, 2) // Adding DateTime header - headers["dateTime"] = []byte(msg.Headers.DateTime.Format(time.RFC3339)) // Adding Version header + headers["dateTime"] = []byte(msg.Headers.DateTime.Format(time.RFC3339)) + + // Adding Version header headers["version"] = []byte(msg.Headers.Version) return extensions.BrokerMessage{ diff --git a/test/v3/issues/145/asyncapi.gen.go b/test/v3/issues/145/asyncapi.gen.go index 12ef13d4..c83028e4 100644 --- a/test/v3/issues/145/asyncapi.gen.go +++ b/test/v3/issues/145/asyncapi.gen.go @@ -701,7 +701,9 @@ func (msg PingMessage) toBrokerMessage() (extensions.BrokerMessage, error) { // Adding ReplyTo header if msg.Headers.ReplyTo != nil { headers["replyTo"] = []byte(*msg.Headers.ReplyTo) - } // Adding RequestId header + } + + // Adding RequestId header if msg.Headers.RequestId != nil { headers["requestId"] = []byte(*msg.Headers.RequestId) } diff --git a/test/v3/issues/241/asyncapi.gen.go b/test/v3/issues/241/asyncapi.gen.go index 4bab4776..811ced77 100644 --- a/test/v3/issues/241/asyncapi.gen.go +++ b/test/v3/issues/241/asyncapi.gen.go @@ -333,7 +333,9 @@ func (msg PingMessageFromTestChannel) toBrokerMessage() (extensions.BrokerMessag headers := make(map[string][]byte, 2) // Adding EventId header - headers["event_id"] = []byte(msg.Headers.EventId) // Adding OptionalEventId header + headers["event_id"] = []byte(msg.Headers.EventId) + + // Adding OptionalEventId header if msg.Headers.OptionalEventId != nil { headers["optional_event_id"] = []byte(*msg.Headers.OptionalEventId) } diff --git a/test/v3/issues/262/asyncapi.gen.go b/test/v3/issues/262/asyncapi.gen.go new file mode 100644 index 00000000..5770e68d --- /dev/null +++ b/test/v3/issues/262/asyncapi.gen.go @@ -0,0 +1,353 @@ +// Package "issue262" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package issue262 + +import ( + "context" + "fmt" + "time" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppController is the structure that provides sending capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addAppContextValues(ctx context.Context, addr string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// UserController is the structure that provides sending capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addUserContextValues(ctx context.Context, addr string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.2.3") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// AsyncAPIVersion is the version of the used AsyncAPI document +const AsyncAPIVersion = "1.2.3" + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // subscriptions is a map of all subscriptions + subscriptions map[string]extensions.BrokerChannelSubscription + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware + // handler to handle errors from consumers and middlewares + errorHandler extensions.ErrorHandler +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +// WithErrorHandler attaches a errorhandler to handle errors from subscriber functions +func WithErrorHandler(handler extensions.ErrorHandler) ControllerOption { + return func(controller *controller) { + controller.errorHandler = handler + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// HeadersFromTestMessage is a schema from the AsyncAPI specification required in messages +type HeadersFromTestMessage struct { + FieldNonReq *string `json:"fieldNonReq,omitempty"` + FieldReq *string `json:"fieldReq" validate:"required"` + SomeDateTime *time.Time `json:"someDateTime,omitempty"` +} + +// TestMessage is the message expected for 'TestMessage' channel. +type TestMessage struct { + // Headers will be used to fill the message headers + Headers HeadersFromTestMessage + + // Payload will be inserted in the message payload + Payload string +} + +func NewTestMessage() TestMessage { + var msg TestMessage + + return msg +} + +// brokerMessageToTestMessage will fill a new TestMessage with data from generic broker message +func brokerMessageToTestMessage(bMsg extensions.BrokerMessage) (TestMessage, error) { + var msg TestMessage + + // Convert to string + payload := string(bMsg.Payload) + msg.Payload = payload // No need for type conversion to reference + + // Get each headers from broker message + for k, v := range bMsg.Headers { + switch { + case k == "fieldNonReq": // Retrieving FieldNonReq header + h := string(v) + msg.Headers.FieldNonReq = &h + case k == "fieldReq": // Retrieving FieldReq header + h := string(v) + msg.Headers.FieldReq = &h + case k == "someDateTime": // Retrieving SomeDateTime header + t, err := time.Parse(time.RFC3339, string(v)) + if err != nil { + return msg, err + } + msg.Headers.SomeDateTime = &t + default: + // TODO: log unknown error + } + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from TestMessage data +func (msg TestMessage) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Convert to []byte + payload := []byte(msg.Payload) + + // Add each headers to broker message + headers := make(map[string][]byte, 3) + + // Adding FieldNonReq header + if msg.Headers.FieldNonReq != nil { + headers["fieldNonReq"] = []byte(*msg.Headers.FieldNonReq) + } + + // Adding FieldReq header + if msg.Headers.FieldReq == nil { + return extensions.BrokerMessage{}, fmt.Errorf("field FieldReq should not be nil") + } + headers["fieldReq"] = []byte(*msg.Headers.FieldReq) + + // Adding SomeDateTime header + if msg.Headers.SomeDateTime != nil { + headers["someDateTime"] = []byte(msg.Headers.SomeDateTime.Format(time.RFC3339)) + } + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} diff --git a/test/v3/issues/262/asyncapi.yaml b/test/v3/issues/262/asyncapi.yaml new file mode 100644 index 00000000..ed1864af --- /dev/null +++ b/test/v3/issues/262/asyncapi.yaml @@ -0,0 +1,21 @@ +asyncapi: 3.0.0 +info: + title: Sample App + version: 1.2.3 +components: + messages: + Test: + headers: + type: object + required: + - fieldReq + properties: + fieldNonReq: + type: string + fieldReq: + type: string + someDateTime: + type: string + format: date-time + payload: + type: string \ No newline at end of file diff --git a/test/v3/issues/262/suite_test.go b/test/v3/issues/262/suite_test.go new file mode 100644 index 00000000..0d85f18d --- /dev/null +++ b/test/v3/issues/262/suite_test.go @@ -0,0 +1,54 @@ +//go:generate go run ../../../../cmd/asyncapi-codegen -p issue262 --force-pointers -i ./asyncapi.yaml -o ./asyncapi.gen.go + +package issue262 + +import ( + "testing" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func TestSuite(t *testing.T) { + suite.Run(t, NewSuite()) +} + +type Suite struct { + suite.Suite +} + +func NewSuite() *Suite { + return &Suite{} +} + +func (suite *Suite) TestToBrokerMessage() { + t := TestMessage{ + Headers: HeadersFromTestMessage{ + // All fields should be pointers (nillable) + FieldNonReq: nil, + FieldReq: nil, + SomeDateTime: nil, + }, + Payload: "", + } + + // toBrokerMessage should return an error due to FieldReq being required + _, err := t.toBrokerMessage() + assert.ErrorContains(suite.T(), err, "field FieldReq should not be nil") + str := "Something" + t.Headers.FieldReq = &str + _, err = t.toBrokerMessage() + assert.NoError(suite.T(), err) +} + +func (suite *Suite) TestBrokerMessageToTestMessage() { + // BrokerMessageToTestMessage should be valid + _, err := brokerMessageToTestMessage( + extensions.BrokerMessage{ + Headers: map[string][]byte{}, + }, + ) + // There is currently no check on fields returned by brokerMessageToTestMessage + assert.NoError(suite.T(), err) +}