Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module with declare and import #5968

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a73e2e3
Implement new modules with import and declare keywords.
wildum Nov 30, 2023
ebc9028
Merge branch 'main' into modules-declare-import
wildum Jan 12, 2024
aca2289
minor changes following review
wildum Jan 15, 2024
8f50ca1
use Declare type instead of plain string to use the AST instead of pa…
wildum Jan 15, 2024
69a1d69
introduces a new componentNode interface
wildum Jan 16, 2024
f020082
rename according to new terminology
wildum Jan 16, 2024
130a98e
some additional renaming
wildum Jan 16, 2024
c883d1a
introduce the component node manager
wildum Jan 16, 2024
cb983c5
use deprecation notice on module component constructor
wildum Jan 16, 2024
e7fb05e
fix test after changing error message
wildum Jan 16, 2024
e2f9c99
custom component should return the managed component via Component()
wildum Jan 16, 2024
3c24a3c
introduces LoaderConfigOptions used to pass options like additional d…
wildum Jan 17, 2024
b7aa47e
ignore linter warning on deprecated func
wildum Jan 17, 2024
189a2b2
add deprecated doc on module Exports type
wildum Jan 17, 2024
7bfc196
rename function that decides if a component is a custom one
wildum Jan 17, 2024
1cb8a31
rename firstPart to namespace
wildum Jan 17, 2024
b93184e
make declare fields private
wildum Jan 17, 2024
d40e409
add comment regarding the redundancy in the Declare struct
wildum Jan 17, 2024
ce8ec22
rename blocks in loader to prevent confusion between componentBlocks …
wildum Jan 17, 2024
28d8f5f
remove unused field in loader
wildum Jan 17, 2024
3a9d20d
fix mutex and flaky test
wildum Jan 17, 2024
32e7811
remove unecessary mutex in declare
wildum Jan 17, 2024
fc0770e
add more doc for the Apply function
wildum Jan 18, 2024
e0e018b
rename and document interfaces
wildum Jan 18, 2024
3ca3692
cleanup on comments, namings and mutex
wildum Jan 18, 2024
40f5fd9
update running children in case of an update of the content in import…
wildum Jan 18, 2024
79b98ea
check for go routines leak
wildum Jan 18, 2024
65de374
comment cleanup
wildum Jan 18, 2024
219474a
can cancel on all paths for linter
wildum Jan 18, 2024
4f2d64d
improve import children handling
wildum Jan 18, 2024
3fdc372
renaming
wildum Jan 19, 2024
d53e29e
optimization to avoid reloading a module if the imported content did …
wildum Jan 19, 2024
f384f0b
improve tests
wildum Jan 19, 2024
fdcf18a
remove forgotten print
wildum Jan 19, 2024
d8e37c0
merge main
wildum Jan 22, 2024
c0bfd61
Merge branch 'main' into modules-declare-import
wildum Jan 23, 2024
fea7a8a
store args in custom component
wildum Jan 23, 2024
dbc4be7
merge main
wildum Jan 23, 2024
5ef4670
refactor integration tests to remove redundancy
wildum Jan 23, 2024
dcbc6d6
merge main
wildum Jan 25, 2024
60d6eca
cleanup
wildum Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ jobs:
go-version: "1.21"
- name: Set OTEL Exporter Endpoint
run: echo "OTEL_EXPORTER_ENDPOINT=172.17.0.1:4318" >> $GITHUB_ENV
- name: Set Module HTTP Endpoint
run: echo "MODULE_HTTP_ENDPOINT=http://172.17.0.1:8090/module.river" >> $GITHUB_ENV
- name: Run tests
run: make integration-test
2 changes: 1 addition & 1 deletion cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func getEnabledComponentsFunc(f *flow.Flow) func() map[string]interface{} {
components := component.GetAllComponents(f, component.InfoOptions{})
componentNames := map[string]struct{}{}
for _, c := range components {
componentNames[c.Registration.Name] = struct{}{}
componentNames[c.BlockName] = struct{}{}
}
return map[string]interface{}{"enabled-components": maps.Keys(componentNames)}
}
Expand Down
6 changes: 3 additions & 3 deletions component/component_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ type Info struct {
// this component depends on, or is depended on by, respectively.
References, ReferencedBy []string

Registration Registration // Component registration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not remove this field? I think it's an important part of this API to be able to know what the arguments/exports types of a builtin component are, as this will come into play in the future when we explore plugins and other tooling around components.

Keeping this the same also means we'll have even more consistency between native and non-native components, which I find ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the registration does not have the types? It has "Args Arguments" and "Exports Exports" which are already inside of the Info struct as "Arguments Arguments" and "Exports Exports". Removing the Registration field removes this repetition and the "Build" function (which should not be there because retrieving info about a particular instance of a component to create a new component of the same type should not be a correct way to create a component)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the registration does not have the types?

The registration does have the types; it holds the zero value for the arguments and exports types of a given component, and it's used for unmarshaling. In the future, those fields could also be used for additional validity checking without instantiating components (e.g., #3844).

(which should not be there because retrieving info about a particular instance of a component to create a new component of the same type should not be a correct way to create a component

I agree, but I also didn't suggest that the registration from this API would be the way to construct new components; just for the full set of information associated with a component.

Let me think about this a bit. This is a public API, which I think we need to be cautious about changing. I need to consider what the future implications are for not including the registration in this API, particularly around whether that limits us with the web UI or any other future services.

We should change APIs deliberately and not to make another feature easier. So if we're going to change this API, we should make sure it's the right change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In builtin component, the args and exports are initialized by the registration when the node is created. That means that they also have the types and that we don't need the registration object.
Also registration is currently something specific for the builtin components. It is not used by the custom components so that would require some workaround to keep it in the component_provider info

Health Health // Current component health.
BlockName string // Component block name.
Health Health // Current component health.

Arguments Arguments // Current arguments value of the component.
Exports Exports // Current exports value of the component.
Expand Down Expand Up @@ -157,7 +157,7 @@ func (info *Info) MarshalJSON() ([]byte, error) {
}

return json.Marshal(&componentDetailJSON{
Name: info.Registration.Name,
Name: info.BlockName,
Type: "block",
ModuleID: info.ID.ModuleID,
LocalID: info.ID.LocalID,
Expand Down
11 changes: 7 additions & 4 deletions component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/local/file"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.file",
Args: Arguments{},
Name: "module.file",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -58,6 +60,7 @@ var (

// New creates a new module.file component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -88,7 +91,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}
}

Expand Down Expand Up @@ -135,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
11 changes: 7 additions & 4 deletions component/module/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import (
"github.com/go-kit/log"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/component/module/git/internal/vcs"
"github.com/grafana/agent/internal/vcs"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/agent/pkg/flow/logging/level"
)

func init() {
component.Register(component.Registration{
Name: "module.git",
Args: Arguments{},
Name: "module.git",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -74,6 +76,7 @@ var (

// New creates a new module.git component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -239,7 +242,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error {
return err
}

return c.mod.LoadFlowSource(args.Arguments, string(bb))
return c.mod.LoadFlowSource(args.Arguments, string(bb), config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
11 changes: 7 additions & 4 deletions component/module/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
remote_http "github.com/grafana/agent/component/remote/http"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.http",
Args: Arguments{},
Name: "module.http",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -57,6 +59,7 @@ var (

// New creates a new module.http component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,7 +90,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http.

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}
}

Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
63 changes: 53 additions & 10 deletions component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/agent/pkg/flow/logging/level"
)

Expand All @@ -16,22 +17,40 @@ type ModuleComponent struct {
opts component.Options
mod component.Module

mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
mut sync.RWMutex
health component.Health
latestContent string
latestArgs map[string]any
latestLoaderConfigOptions config.LoaderConfigOptions
}

// Exports holds values which are exported from the run module.
// Deprecated: Exports holds values which are exported from the run module. New modules use map[string]any directly.
type Exports struct {
// Exports exported from the running module.
Exports map[string]any `river:"exports,block"`
}

// NewModuleComponent initializes a new ModuleComponent.
var _ component.Component = (*ModuleComponent)(nil)

// NewModuleComponentV2 initializes a new ModuleComponent.
// Compared to the previous constructor, the export is simply map[string]any instead of the Exports type containing the map.
func NewModuleComponentV2(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(),
}
var err error
c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) {
c.opts.OnStateChange(exports)
})
return c, err
}

// Deprecated: Use NewModuleComponentV2 instead.
func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
c := &ModuleComponent{
opts: o,
opts: o,
latestLoaderConfigOptions: config.DefaultLoaderConfigOptions(),
}
var err error
c.mod, err = o.ModuleController.NewModule("", func(exports map[string]any) {
Expand All @@ -43,12 +62,12 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
// LoadFlowSource loads the flow controller with the current component source.
// It will set the component health in addition to return the error so that the consumer can rely on either or both.
// If the content is the same as the last time it was successfully loaded, it will not be reloaded.
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() {
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string, options config.LoaderConfigOptions) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() && reflect.DeepEqual(options, c.getLatestLoaderConfigOptions()) {
return nil
}

err := c.mod.LoadConfig([]byte(contentValue), args)
err := c.mod.LoadConfig([]byte(contentValue), args, options)
if err != nil {
c.setHealth(component.Health{
Health: component.HealthTypeUnhealthy,
Expand All @@ -61,6 +80,7 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin

c.setLatestArgs(args)
c.setLatestContent(contentValue)
c.setLatestLoaderConfigOptions(options)
c.setHealth(component.Health{
Health: component.HealthTypeHealthy,
Message: "module content loaded",
Expand All @@ -70,6 +90,17 @@ func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue strin
return nil
}

// Run implements component.Component.
func (c *ModuleComponent) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Update implements component.Component.
func (c *ModuleComponent) Update(_ component.Arguments) error {
return nil
}

// RunFlowController runs the flow controller that all module components start.
func (c *ModuleComponent) RunFlowController(ctx context.Context) {
err := c.mod.Run(ctx)
Expand Down Expand Up @@ -104,6 +135,18 @@ func (c *ModuleComponent) getLatestContent() string {
return c.latestContent
}

func (c *ModuleComponent) setLatestLoaderConfigOptions(options config.LoaderConfigOptions) {
c.mut.Lock()
defer c.mut.Unlock()
c.latestLoaderConfigOptions = options
}

func (c *ModuleComponent) getLatestLoaderConfigOptions() config.LoaderConfigOptions {
c.mut.RLock()
defer c.mut.RUnlock()
return c.latestLoaderConfigOptions
}

func (c *ModuleComponent) setLatestArgs(args map[string]any) {
c.mut.Lock()
defer c.mut.Unlock()
Expand Down
9 changes: 6 additions & 3 deletions component/module/string/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/river/rivertypes"
)

func init() {
component.Register(component.Registration{
Name: "module.string",
Args: Arguments{},
Name: "module.string",
Args: Arguments{},
//nolint:staticcheck
Exports: module.Exports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
Expand Down Expand Up @@ -42,6 +44,7 @@ var (

// New creates a new module.string component.
func New(o component.Options, args Arguments) (*Component, error) {
//nolint:staticcheck
m, err := module.NewModuleComponent(o)
if err != nil {
return nil, err
Expand All @@ -66,7 +69,7 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value)
return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value, config.DefaultLoaderConfigOptions())
}

// CurrentHealth implements component.HealthComponent.
Expand Down
3 changes: 2 additions & 1 deletion component/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/config"
"github.com/grafana/regexp"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -44,7 +45,7 @@ type Module interface {
// LoadConfig parses River config and loads it into the Module.
// LoadConfig can be called multiple times, and called prior to
// [Module.Run].
LoadConfig(config []byte, args map[string]any) error
LoadConfig(config []byte, args map[string]any, options config.LoaderConfigOptions) error

// Run starts the Module. No components within the Module
// will be run until Run is called.
Expand Down
21 changes: 21 additions & 0 deletions integration-tests/configs/http-module/module.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
declare "myModule" {
argument "scrape_endpoint" {}

argument "forward_to" {}

argument "scrape_interval" {
optional = true
default = "1s"
}

prometheus.scrape "scrape_prom_metrics_module_file" {
targets = [
{"__address__" = argument.scrape_endpoint.value},
]
forward_to = argument.forward_to.value
scrape_classic_histograms = true
enable_protobuf_negotiation = true
scrape_interval = argument.scrape_interval.value
scrape_timeout = "500ms"
}
}
9 changes: 8 additions & 1 deletion integration-tests/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ services:
dockerfile: ./integration-tests/configs/prom-gen/Dockerfile
context: ..
ports:
- "9001:9001"
- "9001:9001"

http-module:
image: nginx:alpine
ports:
- "8090:80"
volumes:
- ./configs/http-module/module.river:/usr/share/nginx/html/module.river:ro
Loading