Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: complete tap-agent docs #605

Merged
merged 12 commits into from
Feb 6, 2025
37 changes: 37 additions & 0 deletions crates/tap-agent/src/adaptative_concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,43 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

//! # Adaptative concurrency
//! This module provides [AdaptiveLimiter] as a tool to allow concurrency.
//! It's implemented with an Additive increase, Multiplicative decrease
//! ([AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease))
//! strategy.
//!
//!
//!
//! This allows us to have a big number of rav requests running
//! concurrently, but if any of them fails we limit
//! the following requests until the aggregator recovers.
//!
//! ## Behaviour
//! On every request, the caller acquires a slot by calling [AdaptiveLimiter::acquire()].
//! This will increment the number of in_flight connections.
//!
//! If we receive a successful response, we increment our limit to be able to process
//! one more request concurrently.
//!
//! If we receive a failed response, we decrement our limit by half to quickly
//! relieve the pressure in the system.

use std::ops::Range;

/// Simple struct that keeps track of concurrent requests
///
/// More information on [crate::adaptative_concurrency]
pub struct AdaptiveLimiter {
range: Range<usize>,
current_limit: usize,
in_flight: usize,
}

impl AdaptiveLimiter {
/// Creates an instance of [AdaptiveLimiter] with an `initial_limit`
/// and a `range` that contains the minimum and maximum of concurrent
/// requests
pub fn new(initial_limit: usize, range: Range<usize>) -> Self {
Self {
range,
Expand All @@ -18,24 +46,33 @@ impl AdaptiveLimiter {
}
}

/// Acquires a slot in our limiter, returning `bool`
/// representing if we had limit available or not
pub fn acquire(&mut self) -> bool {
self.has_limit() && {
self.in_flight += 1;
true
}
}

/// Returns if there're slots available
pub fn has_limit(&self) -> bool {
self.in_flight < self.current_limit
}

/// Callback function that removes in_flight counter
/// and if the current limit is lower than the provided
/// limit, increase the current limit by 1.
pub fn on_success(&mut self) {
self.in_flight -= 1;
if self.current_limit < self.range.end {
self.current_limit += 1; // Additive Increase
}
}

/// Callback function that removes in_flight counter
/// and decreasing the current limit by half, with
/// minimum value to configured value.
pub fn on_failure(&mut self) {
// Multiplicative Decrease
self.in_flight -= 1;
Expand Down
42 changes: 42 additions & 0 deletions crates/tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,40 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

//! # agent
//!
//! The agent is a set of 3 actors:
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
//! - [sender_accounts_manager::SenderAccountsManager]
//! - [sender_account::SenderAccount]
//! - [sender_allocation::SenderAllocation]
//!
//! They run under a supervision tree and it goes like the following:
//!
//! [sender_accounts_manager::SenderAccountsManager] monitors allocations provided
//! by the subgraph via a [Watcher](::indexer_watcher). Every time it detects a
//! new escrow account created, it automatically spawns a [sender_account::SenderAccount].
//!
//! Manager is also responsible for spawning an pgnotify task that monitors new receipts.
//!
//! [sender_account::SenderAccount] is then responsible for keeping track of all fees
//! distributed across different allocations and also spawning [sender_allocation::SenderAllocation]s
//! that are going to process receipts and RAV requests.
//!
//! [sender_allocation::SenderAllocation] receives notifications from the spawned task and then
//! it updates its state an notifies its parent actor.
//!
//! Once [sender_account::SenderAccount] gets enough receipts, it uses its tracker to decide
//! what is the allocation with the most amount of fees and send a message to trigger a RavRequest.
//!
//! When the allocation is closed by the indexer, [sender_allocation::SenderAllocation] is
//! responsible for triggering the last rav, that will flush all pending receipts and mark the rav
//! as last to be redeemed by indexer-agent.
//!
//! ## Actors
//! Actors are implemented using the [ractor] library and contain their own message queue.
//! They process one message at a time and that's why concurrent primitives like
//! [std::sync::Mutex]s aren't needed.

use indexer_config::{
Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig,
SubgraphConfig, SubgraphsConfig, TapConfig,
Expand All @@ -15,11 +49,19 @@ use crate::{
database, CONFIG, EIP_712_DOMAIN,
};

/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_account::SenderAccount]
pub mod sender_account;
/// Actor, Arguments, State, Messages and implementation for
/// [crate::agent::sender_accounts_manager::SenderAccountsManager]
pub mod sender_accounts_manager;
/// Actor, Arguments, State, Messages and implementation for [crate::agent::sender_allocation::SenderAllocation]
pub mod sender_allocation;
/// Unaggregated receipts containing total value and last id stored in the table
pub mod unaggregated_receipts;

/// This is the main entrypoint for starting up tap-agent
///
/// It uses the static [crate::CONFIG] to configure the agent.
pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
let Config {
indexer: IndexerConfig {
Expand Down
20 changes: 19 additions & 1 deletion crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ type Balance = U256;
/// Information for Ravs that are abstracted away from the SignedRav itself
#[derive(Debug, Default, PartialEq, Eq)]
pub struct RavInformation {
/// Allocation Id of a Rav
pub allocation_id: Address,
/// Value Aggregate of a Rav
pub value_aggregate: u128,
}

Expand Down Expand Up @@ -160,7 +162,7 @@ pub enum ReceiptFees {
Retry,
}

/// Enum containing all types of messages that a SenderAccount can receive
/// Enum containing all types of messages that a [SenderAccount] can receive
#[derive(Debug)]
pub enum SenderAccountMessage {
/// Updates the sender balance and
Expand Down Expand Up @@ -205,6 +207,7 @@ pub struct SenderAccount;

/// Arguments received in startup while spawing [SenderAccount] actor
pub struct SenderAccountArgs {
/// Configuration derived from config.toml
pub config: &'static SenderAccountConfig,

/// Connection to database
Expand Down Expand Up @@ -318,20 +321,32 @@ pub struct State {
config: &'static SenderAccountConfig,
}

/// Configuration derived from config.toml
pub struct SenderAccountConfig {
/// Buffer used for the receipts
pub rav_request_buffer: Duration,
/// Maximum amount is willing to lose
pub max_amount_willing_to_lose_grt: u128,
/// What value triggers a new Rav request
pub trigger_value: u128,

// allocation config
/// Timeout config for rav requests
pub rav_request_timeout: Duration,
/// Limit of receipts sent in a Rav Request
pub rav_request_receipt_limit: u64,
/// Current indexer address
pub indexer_address: Address,
/// Polling interval for escrow subgraph
pub escrow_polling_interval: Duration,
/// Timeout used while creating [SenderAccount]
///
/// This is reached if the database is too slow
pub tap_sender_timeout: Duration,
}

impl SenderAccountConfig {
/// Creates a [SenderAccountConfig] by getting a reference of [indexer_config::Config]
pub fn from_config(config: &indexer_config::Config) -> Self {
Self {
rav_request_buffer: config.tap.rav_request.timestamp_buffer_secs,
Expand Down Expand Up @@ -1232,6 +1247,7 @@ impl Actor for SenderAccount {
}

impl SenderAccount {
/// Deny sender by giving `sender` [Address]
pub async fn deny_sender(pool: &PgPool, sender: Address) {
sqlx::query!(
r#"
Expand All @@ -1248,6 +1264,7 @@ impl SenderAccount {

#[cfg(test)]
pub mod tests {
#![allow(missing_docs)]
use std::{
collections::{HashMap, HashSet},
sync::atomic::AtomicU32,
Expand Down Expand Up @@ -1326,6 +1343,7 @@ pub mod tests {
}
}

/// Prefix shared between tests so we don't have conflicts in the global registry
pub static PREFIX_ID: AtomicU32 = AtomicU32::new(0);
const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
Expand Down
28 changes: 28 additions & 0 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,24 @@ lazy_static! {
.unwrap();
}

/// Notification received by pgnotify
///
/// This contains a list of properties that are sent by postgres when a receipt is inserted
#[derive(Deserialize, Debug, PartialEq, Eq)]
pub struct NewReceiptNotification {
/// id inside the table
pub id: u64,
/// address of the allocation
pub allocation_id: Address,
/// address of wallet that signed this receipt
pub signer_address: Address,
/// timestamp of the receipt
pub timestamp_ns: u64,
/// value of the receipt
pub value: u128,
}

/// Manager Actor
pub struct SenderAccountsManager;

/// Wrapped AllocationId Address with two possible variants
Expand All @@ -53,7 +62,9 @@ pub struct SenderAccountsManager;
/// Rav and Receipt types
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum AllocationId {
/// Legacy allocation
Legacy(Address),
/// New Subgraph DataService allocation
Horizon(Address),
}

Expand All @@ -72,25 +83,42 @@ impl Display for AllocationId {
}
}

/// Enum containing all types of messages that a [SenderAccountsManager] can receive
#[derive(Debug)]
pub enum SenderAccountsManagerMessage {
/// Spawn and Stop [SenderAccount]s that were added or removed
/// in comparison with it current state and updates the state
UpdateSenderAccounts(HashSet<Address>),
}

/// Arguments received in startup while spawing [SenderAccount] actor
pub struct SenderAccountsManagerArgs {
/// Config forwarded to [SenderAccount]
pub config: &'static SenderAccountConfig,
/// Domain separator used for tap
pub domain_separator: Eip712Domain,

/// Database connection
pub pgpool: PgPool,
/// Watcher that returns a map of open and recently closed allocation ids
pub indexer_allocations: Receiver<HashMap<Address, Allocation>>,
/// Watcher containing the escrow accounts
pub escrow_accounts: Receiver<EscrowAccounts>,
/// SubgraphClient of the escrow subgraph
pub escrow_subgraph: &'static SubgraphClient,
/// SubgraphClient of the network subgraph
pub network_subgraph: &'static SubgraphClient,
/// Map containing all endpoints for senders provided in the config
pub sender_aggregator_endpoints: HashMap<Address, Url>,

/// Prefix used to bypass limitations of global actor registry (used for tests)
pub prefix: Option<String>,
}

/// State for [SenderAccountsManager] actor
///
/// This is a separate instance that makes it easier to have mutable
/// reference, for more information check ractor library
pub struct State {
sender_ids: HashSet<Address>,
new_receipts_watcher_handle: Option<tokio::task::JoinHandle<()>>,
Expand Down
21 changes: 21 additions & 0 deletions crates/tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,27 @@ lazy_static! {
/// This is used to give better error messages to users so they have a better understanding
#[derive(Error, Debug)]
pub enum RavError {
/// Database Errors
#[error(transparent)]
Sqlx(#[from] sqlx::Error),

/// Tap Core lib errors
#[error(transparent)]
TapCore(#[from] tap_core::Error),

/// Errors while aggregating
#[error(transparent)]
AggregationError(#[from] AggregationError),

/// Errors with gRPC client
#[error(transparent)]
Grpc(#[from] tonic::Status),

/// All receipts are invalid
#[error("All receipts are invalid")]
AllReceiptsInvalid,

/// Other kind of error
#[error(transparent)]
Other(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -151,15 +157,21 @@ pub struct SenderAllocationState<T: NetworkVersion> {
rav_request_receipt_limit: u64,
}

/// Configuration derived from config.toml
#[derive(Clone)]
pub struct AllocationConfig {
/// Buffer used for the receipts
pub timestamp_buffer_ns: u64,
/// Limit of receipts sent in a Rav Request
pub rav_request_receipt_limit: u64,
/// Current indexer address
pub indexer_address: Address,
/// Polling interval for escrow subgraph
pub escrow_polling_interval: Duration,
}

impl AllocationConfig {
/// Creates a [SenderAccountConfig] by getting a reference of [super::sender_account::SenderAccountConfig]
pub fn from_sender_config(config: &SenderAccountConfig) -> Self {
Self {
timestamp_buffer_ns: config.rav_request_buffer.as_nanos() as u64,
Expand Down Expand Up @@ -199,11 +211,18 @@ pub struct SenderAllocationArgs<T: NetworkVersion> {
pub config: AllocationConfig,
}

/// Enum containing all types of messages that a [SenderAllocation] can receive
#[derive(Debug)]
pub enum SenderAllocationMessage {
/// New receipt message, sent by the task spawned by
/// [super::sender_accounts_manager::SenderAccountsManager]
NewReceipt(NewReceiptNotification),
/// Triggers a Rav Request for the current allocation
///
/// It notifies its parent with the response
TriggerRavRequest,
#[cfg(any(test, feature = "test"))]
/// Return the internal state (used for tests)
GetUnaggregatedReceipts(ractor::RpcReplyPort<UnaggregatedReceipts>),
}

Expand Down Expand Up @@ -756,6 +775,7 @@ where
}
}

/// Sends a database query and mark the allocation rav as last
pub async fn mark_rav_last(&self) -> anyhow::Result<()> {
tracing::info!(
sender = %self.sender,
Expand Down Expand Up @@ -932,6 +952,7 @@ where

#[cfg(test)]
pub mod tests {
#![allow(missing_docs)]
use std::{
collections::HashMap,
future::Future,
Expand Down
Loading
Loading