Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds consensus cap config proto #1023

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ type capability struct {
wg sync.WaitGroup
lggr logger.Logger

requestTimeout time.Duration
clock clockwork.Clock
requestTimeout time.Duration
requestTimeoutLock sync.RWMutex

clock clockwork.Clock

aggregatorFactory types.AggregatorFactory
aggregators map[string]types.Aggregator
Expand Down Expand Up @@ -182,6 +184,12 @@ func (o *capability) UnregisterFromWorkflow(ctx context.Context, request capabil
return nil
}

func (o *capability) setRequestTimeout(timeout time.Duration) {
o.requestTimeoutLock.Lock()
defer o.requestTimeoutLock.Unlock()
o.requestTimeout = timeout
}

// Execute enqueues a new consensus request, passing it to the reporting plugin as needed.
// IMPORTANT: OCR3 only exposes signatures via the contractTransmitter, which is located
// in a separate process to the reporting plugin LOOPP. However, only the reporting plugin
Expand Down Expand Up @@ -266,10 +274,12 @@ func (o *capability) queueRequestForProcessing(
// Use the capability-level request timeout unless the request's config specifies
// its own timeout, in which case we'll use that instead. This allows the workflow spec
// to configure more granular timeouts depending on the circumstances.
o.requestTimeoutLock.RLock()
requestTimeout := o.requestTimeout
if c.RequestTimeoutMS != 0 {
requestTimeout = time.Duration(c.RequestTimeoutMS) * time.Millisecond
}
o.requestTimeoutLock.RUnlock()

r := &requests.Request{
StopCh: make(chan struct{}),
Expand Down
81 changes: 58 additions & 23 deletions pkg/capabilities/consensus/ocr3/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,89 @@ package ocr3

import (
"context"
"time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/requests"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

const (
defaultMaxPhaseOutputBytes = 1000000 // 1 MB
defaultMaxReportCount = 20
defaultBatchSize = 20
defaultOutcomePruningThreshold = 3600
defaultRequestExpiry = 20 * time.Second
)

type factory struct {
store *requests.Store
capability *capability
capability *Capability
batchSize int
outcomePruningThreshold uint64
lggr logger.Logger

services.StateMachine
}

const (
// TODO(KS-617): read this from contract config
defaultMaxPhaseOutputBytes = 1000000 // 1 MB
defaultMaxReportCount = 20
)

func newFactory(s *requests.Store, c *capability, batchSize int, outcomePruningThreshold uint64, lggr logger.Logger) (*factory, error) {
func newFactory(s *requests.Store, c *Capability, lggr logger.Logger) (*factory, error) {
return &factory{
store: s,
capability: c,
batchSize: batchSize,
outcomePruningThreshold: outcomePruningThreshold,
lggr: logger.Named(lggr, "OCR3ReportingPluginFactory"),
store: s,
capability: c,
lggr: logger.Named(lggr, "OCR3ReportingPluginFactory"),
}, nil
}

func (o *factory) NewReportingPlugin(ctx context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
rp, err := newReportingPlugin(o.store, o.capability, o.batchSize, config, o.outcomePruningThreshold, o.lggr)
info := ocr3types.ReportingPluginInfo{
func (o *factory) NewReportingPlugin(_ context.Context, config ocr3types.ReportingPluginConfig) (ocr3types.ReportingPlugin[[]byte], ocr3types.ReportingPluginInfo, error) {
var configProto types.ReportingPluginConfig
err := proto.Unmarshal(config.OffchainConfig, &configProto)
if err != nil {
// an empty byte array will be unmarshalled into zero values without error
return nil, ocr3types.ReportingPluginInfo{}, err
bolekk marked this conversation as resolved.
Show resolved Hide resolved
}
bolekk marked this conversation as resolved.
Show resolved Hide resolved
if configProto.MaxQueryLengthBytes <= 0 {
configProto.MaxQueryLengthBytes = defaultMaxPhaseOutputBytes
}
if configProto.MaxObservationLengthBytes <= 0 {
configProto.MaxObservationLengthBytes = defaultMaxPhaseOutputBytes
}
if configProto.MaxOutcomeLengthBytes <= 0 {
configProto.MaxOutcomeLengthBytes = defaultMaxPhaseOutputBytes
}
if configProto.MaxReportLengthBytes <= 0 {
configProto.MaxReportLengthBytes = defaultMaxPhaseOutputBytes
}
if configProto.MaxBatchSize <= 0 {
configProto.MaxBatchSize = defaultBatchSize
}
if configProto.OutcomePruningThreshold <= 0 {
configProto.OutcomePruningThreshold = defaultOutcomePruningThreshold
}
if configProto.MaxReportCount <= 0 {
configProto.MaxReportCount = defaultMaxReportCount
}
if configProto.RequestTimeout == nil {
configProto.RequestTimeout = durationpb.New(defaultRequestExpiry)
}
o.capability.SetRequestTimeout(configProto.RequestTimeout.AsDuration())
rp, err := newReportingPlugin(o.store, o.capability.config.capability, int(configProto.MaxBatchSize), config, configProto.OutcomePruningThreshold, o.lggr)
rpInfo := ocr3types.ReportingPluginInfo{
Name: "OCR3 Capability Plugin",
Limits: ocr3types.ReportingPluginLimits{
MaxQueryLength: defaultMaxPhaseOutputBytes,
MaxObservationLength: defaultMaxPhaseOutputBytes,
MaxOutcomeLength: defaultMaxPhaseOutputBytes,
MaxReportLength: defaultMaxPhaseOutputBytes,
MaxReportCount: defaultMaxReportCount,
MaxQueryLength: int(configProto.MaxQueryLengthBytes),
MaxObservationLength: int(configProto.MaxObservationLengthBytes),
MaxOutcomeLength: int(configProto.MaxOutcomeLengthBytes),
MaxReportLength: int(configProto.MaxReportLengthBytes),
MaxReportCount: int(configProto.MaxReportCount),
},
}
return rp, info, err
return rp, rpInfo, err
}

func (o *factory) Start(ctx context.Context) error {
Expand Down
34 changes: 13 additions & 21 deletions pkg/capabilities/consensus/ocr3/ocr3.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,19 @@ type Capability struct {
}

type Config struct {
RequestTimeout *time.Duration
BatchSize int
OutcomePruningThreshold uint64
Logger logger.Logger
AggregatorFactory types.AggregatorFactory
EncoderFactory types.EncoderFactory
SendBufferSize int
RequestTimeout *time.Duration
Logger logger.Logger
AggregatorFactory types.AggregatorFactory
EncoderFactory types.EncoderFactory
SendBufferSize int

store *requests.Store
capability *capability
clock clockwork.Clock
}

const (
defaultRequestExpiry time.Duration = 20 * time.Second
defaultBatchSize = 20
defaultSendBufferSize = 10
defaultOutcomePruningThreshold = 3600
defaultSendBufferSize = 10
)

func NewOCR3(config Config) *Capability {
Expand All @@ -52,14 +47,6 @@ func NewOCR3(config Config) *Capability {
config.RequestTimeout = &dre
}

if config.BatchSize == 0 {
config.BatchSize = defaultBatchSize
}

if config.OutcomePruningThreshold == 0 {
config.OutcomePruningThreshold = defaultOutcomePruningThreshold
}

if config.SendBufferSize == 0 {
config.SendBufferSize = defaultSendBufferSize
}
Expand Down Expand Up @@ -88,11 +75,16 @@ func NewOCR3(config Config) *Capability {
return cp
}

func (o *Capability) SetRequestTimeout(timeout time.Duration) {
o.config.RequestTimeout = &timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks odd, why do we need to set in both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, we wanted to pass Capability around. I just pushed it as is to demonstrate to @bolekk that we might not need to pass it around and just access o.config.capability.setRequestTimeout() instead.

So I'm fully expecting to just remove this setter altogether and the passing around of Capability from this PR.

o.config.capability.setRequestTimeout(timeout)
}

func (o *Capability) NewReportingPluginFactory(ctx context.Context, cfg core.ReportingPluginServiceConfig,
provider commontypes.PluginProvider, pipelineRunner core.PipelineRunnerService, telemetry core.TelemetryClient,
errorLog core.ErrorLog, capabilityRegistry core.CapabilitiesRegistry, keyValueStore core.KeyValueStore,
relayerSet core.RelayerSet) (core.OCR3ReportingPluginFactory, error) {
factory, err := newFactory(o.config.store, o.config.capability, o.config.BatchSize, o.config.OutcomePruningThreshold, o.config.Logger)
f, err := newFactory(o.config.store, o, o.config.Logger)
if err != nil {
return nil, err
}
Expand All @@ -104,7 +96,7 @@ func (o *Capability) NewReportingPluginFactory(ctx context.Context, cfg core.Rep

o.capabilityRegistry = capabilityRegistry

return factory, err
return f, err
}

func (o *Capability) NewValidationService(ctx context.Context) (core.ValidationService, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/capabilities/consensus/ocr3/types/generate.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
//go:generate protoc --go_out=../../../../ --go_opt=paths=source_relative --go-grpc_out=../../../../ --go-grpc_opt=paths=source_relative --proto_path=../../../../ capabilities/consensus/ocr3/types/ocr3_types.proto values/pb/values.proto
//go:generate protoc --go_out=../../../../ --go_opt=paths=source_relative --go-grpc_out=../../../../ --go-grpc_opt=paths=source_relative --proto_path=../../../../ capabilities/consensus/ocr3/types/ocr3_config_types.proto values/pb/values.proto
package types
Loading
Loading