Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: actor system for tap-agent #137

Merged
merged 41 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a74f72f
refactor: move files to agent
gusinacio Mar 18, 2024
8a1c6ea
chore: add ractor
gusinacio Mar 18, 2024
59c9302
refactor: move unaggregated_receipts to agent
gusinacio Mar 18, 2024
4683b1b
refactor: add allocation id tracker
gusinacio Mar 18, 2024
80332ad
refactor(wip): use actors for manager, account and allocation
gusinacio Mar 18, 2024
c4cbb01
refactor(wip): remove locks and guards from sender_allocation
gusinacio Mar 19, 2024
266c5cb
style: cargo fmt
gusinacio Mar 19, 2024
b015eff
refactor: add sender allocation creation
gusinacio Mar 19, 2024
7762c68
test: update sender accounts manager tests to actors
gusinacio Mar 20, 2024
293aede
test: wip update tests for sender account
gusinacio Mar 20, 2024
4ae18e5
test: wip tests on sender account working, need to allow parallel
gusinacio Mar 20, 2024
4b986d1
refactor: use sender accounts manager args
gusinacio Mar 20, 2024
d32b552
test: finish integration tests
gusinacio Apr 1, 2024
ac8aa50
refactor: use args and state instead of actor fields
gusinacio Apr 1, 2024
857ddad
chore: add license to files
gusinacio Apr 1, 2024
0bcc0ef
test: add allocation_id_tracker unit test
gusinacio Apr 2, 2024
2bf706e
test: move tests to integration tests
gusinacio Apr 2, 2024
b6c9242
test: add more tests to sender_allocation
gusinacio Apr 2, 2024
df14326
test: finish sender_allocation tests
gusinacio Apr 3, 2024
887fab0
test: add tests to sender accounts manager
gusinacio Apr 9, 2024
9effaac
style: fix clippy
gusinacio Apr 9, 2024
bd630a3
chore: add licence to file
gusinacio Apr 9, 2024
f1fdfd6
chore: add comments
gusinacio Apr 9, 2024
d04c258
test: add unit tests for sender_account
gusinacio Apr 11, 2024
3ea3cda
test: remove integration test initial file
gusinacio Apr 11, 2024
fc3097c
style: cargo clippy
gusinacio Apr 11, 2024
791f1ee
refactor: update messages and add receipt notification test
gusinacio Apr 11, 2024
b24b443
test: use single prefix_id
gusinacio Apr 11, 2024
c27f7a2
chore: fix rebase conflicts
gusinacio Apr 11, 2024
b85c1f0
feat: add logging to agents
gusinacio Apr 11, 2024
f15b6f7
refactor: add actor recovery and logging
gusinacio Apr 12, 2024
3d4a6d2
fix: add chain_id to network subgraph
gusinacio Apr 12, 2024
929dd1f
fix: handle rav request error and more logs
gusinacio Apr 12, 2024
d63c557
chore: more logging
gusinacio Apr 12, 2024
8bf0566
fix: update sender_ids list for manager
gusinacio Apr 12, 2024
f759af6
refactor: gracefully shutdown in case of manager stop
gusinacio Apr 12, 2024
21723f0
fix: add chain_id default to 1
gusinacio Apr 12, 2024
740d9e1
refactor: accept suggestions from review
gusinacio Apr 14, 2024
1641cc9
refacotr: rename sender fee, simplify logic and add overflow check
gusinacio Apr 14, 2024
7effcb5
fix: remove check about last message, now using on sender account side
gusinacio Apr 14, 2024
b804163
test: add test case when the rav request fails
gusinacio Apr 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 36 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ tracing-subscriber = { version = "0.3", features = [
enum-as-inner = "0.6.0"
ethers = "2.0.13"
typetag = "0.2.14"
ractor = "0.9.7"

[dev-dependencies]
ethers-signers = "2.0.8"
tempfile = "3.8.0"
wiremock = "0.5.19"
futures = "0.3.30"
106 changes: 69 additions & 37 deletions tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,90 +3,122 @@

use std::time::Duration;

use alloy_sol_types::eip712_domain;
use indexer_common::prelude::{
escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient,
};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};

use crate::{
aggregator_endpoints, config, database, tap::sender_accounts_manager::SenderAccountsManager,
use crate::agent::sender_accounts_manager::{
SenderAccountsManagerArgs, SenderAccountsManagerMessage,
};
use crate::config::{Cli, EscrowSubgraph, Ethereum, IndexerInfrastructure, NetworkSubgraph, Tap};
use crate::{aggregator_endpoints, database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

pub async fn start_agent(config: &'static config::Cli) -> SenderAccountsManager {
let pgpool = database::connect(&config.postgres).await;
pub mod sender_account;
pub mod sender_accounts_manager;
pub mod sender_allocation;
pub mod sender_fee_tracker;
pub mod unaggregated_receipts;

/// constant graph network used in subgraphs
const GRAPH_NETWORK_ID: u64 = 1;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
let Cli {
ethereum: Ethereum { indexer_address },
indexer_infrastructure:
IndexerInfrastructure {
graph_node_query_endpoint,
graph_node_status_endpoint,
..
},
postgres,
network_subgraph:
NetworkSubgraph {
network_subgraph_deployment,
network_subgraph_endpoint,
allocation_syncing_interval_ms,
},
escrow_subgraph:
EscrowSubgraph {
escrow_subgraph_deployment,
escrow_subgraph_endpoint,
escrow_syncing_interval_ms,
},
tap: Tap {
sender_aggregator_endpoints_file,
..
},
..
} = &*CONFIG;
let pgpool = database::connect(postgres).await;

let http_client = reqwest::Client::new();

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
config
.network_subgraph
.network_subgraph_deployment
network_subgraph_deployment
.map(|deployment| {
DeploymentDetails::for_graph_node(
&config.indexer_infrastructure.graph_node_status_endpoint,
&config.indexer_infrastructure.graph_node_query_endpoint,
graph_node_status_endpoint,
graph_node_query_endpoint,
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and network subgraph deployment"),
DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint)
DeploymentDetails::for_query_url(network_subgraph_endpoint)
.expect("Failed to parse network subgraph endpoint"),
)));

let indexer_allocations = indexer_allocations(
network_subgraph,
config.ethereum.indexer_address,
1,
Duration::from_millis(config.network_subgraph.allocation_syncing_interval_ms),
*indexer_address,
GRAPH_NETWORK_ID,
Duration::from_millis(*allocation_syncing_interval_ms),
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
config
.escrow_subgraph
.escrow_subgraph_deployment
escrow_subgraph_deployment
.map(|deployment| {
DeploymentDetails::for_graph_node(
&config.indexer_infrastructure.graph_node_status_endpoint,
&config.indexer_infrastructure.graph_node_query_endpoint,
graph_node_status_endpoint,
graph_node_query_endpoint,
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint)
DeploymentDetails::for_query_url(escrow_subgraph_endpoint)
.expect("Failed to parse escrow subgraph endpoint"),
)));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
config.ethereum.indexer_address,
Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval_ms),
*indexer_address,
Duration::from_millis(*escrow_syncing_interval_ms),
false,
);

// TODO: replace with a proper implementation once the gateway registry contract is ready
let sender_aggregator_endpoints = aggregator_endpoints::load_aggregator_endpoints(
config.tap.sender_aggregator_endpoints_file.clone(),
);
let sender_aggregator_endpoints =
aggregator_endpoints::load_aggregator_endpoints(sender_aggregator_endpoints_file.clone());

let tap_eip712_domain_separator = eip712_domain! {
name: "TAP",
version: "1",
chain_id: config.receipts.receipts_verifier_chain_id,
verifying_contract: config.receipts.receipts_verifier_address,
};

SenderAccountsManager::new(
config,
let args = SenderAccountsManagerArgs {
config: &CONFIG,
domain_separator: EIP_712_DOMAIN.clone(),
pgpool,
indexer_allocations,
escrow_accounts,
escrow_subgraph,
tap_eip712_domain_separator,
sender_aggregator_endpoints,
)
.await
prefix: None,
};

SenderAccountsManager::spawn(None, SenderAccountsManager, args)
.await
.expect("Failed to start sender accounts manager actor.")
}
Loading
Loading