Skip to content

Commit

Permalink
refactor: use sender accounts manager args
Browse files Browse the repository at this point in the history
  • Loading branch information
gusinacio committed Mar 20, 2024
1 parent c40e089 commit 9681a5a
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 178 deletions.
93 changes: 58 additions & 35 deletions tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

use std::time::Duration;

use alloy_sol_types::eip712_domain;
use indexer_common::prelude::{
escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient,
};
use ractor::{Actor, ActorRef};

use crate::{aggregator_endpoints, config, database};
use crate::agent::sender_accounts_manager::{
SenderAccountsManagerArgs, SenderAccountsManagerMessage,
};
use crate::config::{Cli, EscrowSubgraph, Ethereum, IndexerInfrastructure, NetworkSubgraph, Tap};
use crate::{aggregator_endpoints, database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

mod allocation_id_tracker;
Expand All @@ -17,81 +21,100 @@ pub mod sender_accounts_manager;
mod sender_allocation;
mod unaggregated_receipts;

pub async fn start_agent(config: &'static config::Cli) -> SenderAccountsManager {
let pgpool = database::connect(&config.postgres).await;
pub async fn start_agent() -> ActorRef<SenderAccountsManagerMessage> {
let Cli {
ethereum: Ethereum { indexer_address },
indexer_infrastructure:
IndexerInfrastructure {
graph_node_query_endpoint,
graph_node_status_endpoint,
..
},
postgres,
network_subgraph:
NetworkSubgraph {
network_subgraph_deployment,
network_subgraph_endpoint,
allocation_syncing_interval_ms,
},
escrow_subgraph:
EscrowSubgraph {
escrow_subgraph_deployment,
escrow_subgraph_endpoint,
escrow_syncing_interval_ms,
},
tap: Tap {
sender_aggregator_endpoints_file,
..
},
..
} = &*CONFIG;
let pgpool = database::connect(postgres).await;

let http_client = reqwest::Client::new();

let network_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
config
.network_subgraph
.network_subgraph_deployment
network_subgraph_deployment
.map(|deployment| {
DeploymentDetails::for_graph_node(
&config.indexer_infrastructure.graph_node_status_endpoint,
&config.indexer_infrastructure.graph_node_query_endpoint,
graph_node_status_endpoint,
graph_node_query_endpoint,
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and network subgraph deployment"),
DeploymentDetails::for_query_url(&config.network_subgraph.network_subgraph_endpoint)
DeploymentDetails::for_query_url(network_subgraph_endpoint)
.expect("Failed to parse network subgraph endpoint"),
)));

let indexer_allocations = indexer_allocations(
network_subgraph,
config.ethereum.indexer_address,
*indexer_address,
1,
Duration::from_millis(config.network_subgraph.allocation_syncing_interval_ms),
Duration::from_millis(*allocation_syncing_interval_ms),
);

let escrow_subgraph = Box::leak(Box::new(SubgraphClient::new(
http_client.clone(),
config
.escrow_subgraph
.escrow_subgraph_deployment
escrow_subgraph_deployment
.map(|deployment| {
DeploymentDetails::for_graph_node(
&config.indexer_infrastructure.graph_node_status_endpoint,
&config.indexer_infrastructure.graph_node_query_endpoint,
graph_node_status_endpoint,
graph_node_query_endpoint,
deployment,
)
})
.transpose()
.expect("Failed to parse graph node query endpoint and escrow subgraph deployment"),
DeploymentDetails::for_query_url(&config.escrow_subgraph.escrow_subgraph_endpoint)
DeploymentDetails::for_query_url(escrow_subgraph_endpoint)
.expect("Failed to parse escrow subgraph endpoint"),
)));

let escrow_accounts = escrow_accounts(
escrow_subgraph,
config.ethereum.indexer_address,
Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval_ms),
*indexer_address,
Duration::from_millis(*escrow_syncing_interval_ms),
false,
);

// TODO: replace with a proper implementation once the gateway registry contract is ready
let sender_aggregator_endpoints = aggregator_endpoints::load_aggregator_endpoints(
config.tap.sender_aggregator_endpoints_file.clone(),
);

let tap_eip712_domain_separator = eip712_domain! {
name: "TAP",
version: "1",
chain_id: config.receipts.receipts_verifier_chain_id,
verifying_contract: config.receipts.receipts_verifier_address,
};
let sender_aggregator_endpoints =
aggregator_endpoints::load_aggregator_endpoints(sender_aggregator_endpoints_file.clone());

SenderAccountsManager::new(
config,
let args = SenderAccountsManagerArgs {
config: &CONFIG,
domain_separator: EIP_712_DOMAIN.clone(),
pgpool,
indexer_allocations,
escrow_accounts,
escrow_subgraph,
tap_eip712_domain_separator,
sender_aggregator_endpoints,
)
.await
};

let (manager, _) = SenderAccountsManager::spawn(None, SenderAccountsManager, args)
.await
.expect("Failed to start sender accounts manager actor.");
manager
}
4 changes: 2 additions & 2 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ mod tests {
let allocation = ActorRef::<SenderAllocationMsg>::where_is(format!(
"{sender}:{allocation_id}",
sender = SENDER.1,
allocation_id = ALLOCATION_ID_0.to_string()
allocation_id = *ALLOCATION_ID_0
))
.unwrap();

Expand Down Expand Up @@ -478,7 +478,7 @@ mod tests {
let allocation = ActorRef::<SenderAllocationMsg>::where_is(format!(
"{sender}:{allocation_id}",
sender = SENDER.1,
allocation_id = ALLOCATION_ID_0.to_string()
allocation_id = *ALLOCATION_ID_0
))
.unwrap();

Expand Down
Loading

0 comments on commit 9681a5a

Please sign in to comment.