diff --git a/pkg/codegen/generators/v3/templates/message.tmpl b/pkg/codegen/generators/v3/templates/message.tmpl index 33eaf138..4198235c 100644 --- a/pkg/codegen/generators/v3/templates/message.tmpl +++ b/pkg/codegen/generators/v3/templates/message.tmpl @@ -131,7 +131,11 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N } msg.Headers.{{ namify $key}} = t {{- else}} + {{- if $value.Reference }} + msg.Headers.{{ namify $key}} = {{$value.ReferenceTo.Name}}(v) + {{- else }} msg.Headers.{{ namify $key}} = {{$value.Type}}(v) + {{- end}} {{- end}} {{- else}} {{- if eq $value.Type "object" }} @@ -146,7 +150,11 @@ func brokerMessageTo{{namify .Name}}(bMsg extensions.BrokerMessage) ({{namify .N } msg.Headers.{{ namify $key}} = &t {{- else}} + {{- if $value.Reference }} + h := {{$value.ReferenceTo.Name}}(v) + {{- else }} h := {{$value.Type}}(v) + {{- end}} msg.Headers.{{ namify $key}} = &h {{- end}} {{- end}} diff --git a/test/v3/issues/241/asyncapi.gen.go b/test/v3/issues/241/asyncapi.gen.go new file mode 100644 index 00000000..b2bf2b27 --- /dev/null +++ b/test/v3/issues/241/asyncapi.gen.go @@ -0,0 +1,358 @@ +// Package "issue333" provides primitives to interact with the AsyncAPI specification. +// +// Code generated by github.com/lerenn/asyncapi-codegen version (devel) DO NOT EDIT. +package issue333 + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/lerenn/asyncapi-codegen/pkg/extensions" +) + +// AppController is the structure that provides sending capabilities to the +// developer and and connect the broker with the App +type AppController struct { + controller +} + +// NewAppController links the App to the broker +func NewAppController(bc extensions.BrokerController, options ...ControllerOption) (*AppController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &AppController{controller: controller}, nil +} + +func (c AppController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c AppController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addAppContextValues(ctx context.Context, addr string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "app") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) +} + +// Close will clean up any existing resources on the controller +func (c *AppController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// UserController is the structure that provides sending capabilities to the +// developer and and connect the broker with the User +type UserController struct { + controller +} + +// NewUserController links the User to the broker +func NewUserController(bc extensions.BrokerController, options ...ControllerOption) (*UserController, error) { + // Check if broker controller has been provided + if bc == nil { + return nil, extensions.ErrNilBrokerController + } + + // Create default controller + controller := controller{ + broker: bc, + subscriptions: make(map[string]extensions.BrokerChannelSubscription), + logger: extensions.DummyLogger{}, + middlewares: make([]extensions.Middleware, 0), + errorHandler: extensions.DefaultErrorHandler(), + } + + // Apply options + for _, option := range options { + option(&controller) + } + + return &UserController{controller: controller}, nil +} + +func (c UserController) wrapMiddlewares( + middlewares []extensions.Middleware, + callback extensions.NextMiddleware, +) func(ctx context.Context, msg *extensions.BrokerMessage) error { + var called bool + + // If there is no more middleware + if len(middlewares) == 0 { + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the callback if it exists and it has not been called already + if callback != nil && !called { + called = true + return callback(ctx) + } + + // Nil can be returned, as the callback has already been called + return nil + } + } + + // Get the next function to call from next middlewares or callback + next := c.wrapMiddlewares(middlewares[1:], callback) + + // Wrap middleware into a check function that will call execute the middleware + // and call the next wrapped middleware if the returned function has not been + // called already + return func(ctx context.Context, msg *extensions.BrokerMessage) error { + // Call the middleware and the following if it has not been done already + if !called { + // Create the next call with the context and the message + nextWithArgs := func(ctx context.Context) error { + return next(ctx, msg) + } + + // Call the middleware and register it as already called + called = true + if err := middlewares[0](ctx, msg, nextWithArgs); err != nil { + return err + } + + // If next has already been called in middleware, it should not be executed again + return nextWithArgs(ctx) + } + + // Nil can be returned, as the next middleware has already been called + return nil + } +} + +func (c UserController) executeMiddlewares(ctx context.Context, msg *extensions.BrokerMessage, callback extensions.NextMiddleware) error { + // Wrap middleware to have 'next' function when calling them + wrapped := c.wrapMiddlewares(c.middlewares, callback) + + // Execute wrapped middlewares + return wrapped(ctx, msg) +} + +func addUserContextValues(ctx context.Context, addr string) context.Context { + ctx = context.WithValue(ctx, extensions.ContextKeyIsVersion, "1.0.0") + ctx = context.WithValue(ctx, extensions.ContextKeyIsProvider, "user") + return context.WithValue(ctx, extensions.ContextKeyIsChannel, addr) +} + +// Close will clean up any existing resources on the controller +func (c *UserController) Close(ctx context.Context) { + // Unsubscribing remaining channels +} + +// AsyncAPIVersion is the version of the used AsyncAPI document +const AsyncAPIVersion = "1.0.0" + +// controller is the controller that will be used to communicate with the broker +// It will be used internally by AppController and UserController +type controller struct { + // broker is the broker controller that will be used to communicate + broker extensions.BrokerController + // subscriptions is a map of all subscriptions + subscriptions map[string]extensions.BrokerChannelSubscription + // logger is the logger that will be used² to log operations on controller + logger extensions.Logger + // middlewares are the middlewares that will be executed when sending or + // receiving messages + middlewares []extensions.Middleware + // handler to handle errors from consumers and middlewares + errorHandler extensions.ErrorHandler +} + +// ControllerOption is the type of the options that can be passed +// when creating a new Controller +type ControllerOption func(controller *controller) + +// WithLogger attaches a logger to the controller +func WithLogger(logger extensions.Logger) ControllerOption { + return func(controller *controller) { + controller.logger = logger + } +} + +// WithMiddlewares attaches middlewares that will be executed when sending or receiving messages +func WithMiddlewares(middlewares ...extensions.Middleware) ControllerOption { + return func(controller *controller) { + controller.middlewares = middlewares + } +} + +// WithErrorHandler attaches a errorhandler to handle errors from subscriber functions +func WithErrorHandler(handler extensions.ErrorHandler) ControllerOption { + return func(controller *controller) { + controller.errorHandler = handler + } +} + +type MessageWithCorrelationID interface { + CorrelationID() string + SetCorrelationID(id string) +} + +type Error struct { + Channel string + Err error +} + +func (e *Error) Error() string { + return fmt.Sprintf("channel %q: err %v", e.Channel, e.Err) +} + +// HeadersFromPingMessageFromTestChannel is a schema from the AsyncAPI specification required in messages +type HeadersFromPingMessageFromTestChannel struct { + EventId EventIdSchema `json:"event_id" validate:"required"` + OptionalEventId *EventIdSchema `json:"optional_event_id"` +} + +// PingMessageFromTestChannelPayload is a schema from the AsyncAPI specification required in messages +type PingMessageFromTestChannelPayload struct { + Event *string `json:"event" validate:"omitempty,eq=ping"` +} + +// PingMessageFromTestChannel is the message expected for 'PingMessageFromTestChannel' channel. +type PingMessageFromTestChannel struct { + // Headers will be used to fill the message headers + Headers HeadersFromPingMessageFromTestChannel + + // Payload will be inserted in the message payload + Payload PingMessageFromTestChannelPayload +} + +func NewPingMessageFromTestChannel() PingMessageFromTestChannel { + var msg PingMessageFromTestChannel + + return msg +} + +// brokerMessageToPingMessageFromTestChannel will fill a new PingMessageFromTestChannel with data from generic broker message +func brokerMessageToPingMessageFromTestChannel(bMsg extensions.BrokerMessage) (PingMessageFromTestChannel, error) { + var msg PingMessageFromTestChannel + + // Unmarshal payload to expected message payload format + err := json.Unmarshal(bMsg.Payload, &msg.Payload) + if err != nil { + return msg, err + } + + // Get each headers from broker message + for k, v := range bMsg.Headers { + switch { + case k == "event_id": // Retrieving EventId header + msg.Headers.EventId = EventIdSchema(v) + case k == "optional_event_id": // Retrieving OptionalEventId header + h := EventIdSchema(v) + msg.Headers.OptionalEventId = &h + default: + // TODO: log unknown error + } + } + + // TODO: run checks on msg type + + return msg, nil +} + +// toBrokerMessage will generate a generic broker message from PingMessageFromTestChannel data +func (msg PingMessageFromTestChannel) toBrokerMessage() (extensions.BrokerMessage, error) { + // TODO: implement checks on message + + // Marshal payload to JSON + payload, err := json.Marshal(msg.Payload) + if err != nil { + return extensions.BrokerMessage{}, err + } + + // Add each headers to broker message + headers := make(map[string][]byte, 2) + + // Adding EventId header + headers["event_id"] = []byte(msg.Headers.EventId) // Adding OptionalEventId header + if msg.Headers.OptionalEventId != nil { + headers["optional_event_id"] = []byte(*msg.Headers.OptionalEventId) + } + + return extensions.BrokerMessage{ + Headers: headers, + Payload: payload, + }, nil +} + +// EventIdSchema is a schema from the AsyncAPI specification required in messages +type EventIdSchema string + +const ( + // TestChannelPath is the constant representing the 'TestChannel' channel path. + TestChannelPath = "" +) + +// ChannelsPaths is an array of all channels paths +var ChannelsPaths = []string{ + TestChannelPath, +} diff --git a/test/v3/issues/241/asyncapi.yaml b/test/v3/issues/241/asyncapi.yaml new file mode 100644 index 00000000..30b02975 --- /dev/null +++ b/test/v3/issues/241/asyncapi.yaml @@ -0,0 +1,32 @@ +# Issue: https://github.com/lerenn/asyncapi-codegen/issues/241 + +asyncapi: 3.0.0 +info: + title: Sample App + version: 1.0.0 +channels: + test: + messages: + ping: + name: Ping + payload: + type: object + properties: + event: + type: string + const: ping + headers: + name: test_headers + type: object + properties: + # Headers that references custom schema. + event_id: + $ref: "#/components/schemas/eventId" + optional_event_id: + $ref: "#/components/schemas/eventId" + required: + - event_id +components: + schemas: + eventId: + type: string diff --git a/test/v3/issues/241/generate.go b/test/v3/issues/241/generate.go new file mode 100644 index 00000000..a183d75e --- /dev/null +++ b/test/v3/issues/241/generate.go @@ -0,0 +1,5 @@ +//go:generate go run ../../../../cmd/asyncapi-codegen -p issue333 -i ./asyncapi.yaml -o ./asyncapi.gen.go + +package issue333 + +// This is just to test that the generation is correct