Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move tests to separated files #514

Closed
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ indexer-query = { path = "../query" }
indexer-allocation = { path = "../allocation" }
indexer-attestation = { path = "../attestation" }
indexer-watcher = { path = "../watcher" }
indexer-config = { path = "../config" }
thiserror.workspace = true
alloy.workspace = true
anyhow.workspace = true
Expand Down
27 changes: 27 additions & 0 deletions crates/monitor/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,30 @@ mod monitor;
mod subgraph_client;

pub use subgraph_client::{DeploymentDetails, SubgraphClient};

use indexer_config::{GraphNodeConfig, SubgraphConfig};

/// Creates a static reference to a subgraph
pub async fn create_subgraph_client(
http_client: reqwest::Client,
graph_node: &GraphNodeConfig,
subgraph_config: &SubgraphConfig,
) -> &'static SubgraphClient {
Box::leak(Box::new(
SubgraphClient::new(
http_client,
subgraph_config.deployment_id.map(|deployment| {
DeploymentDetails::for_graph_node_url(
graph_node.status_url.clone(),
graph_node.query_url.clone(),
deployment,
)
}),
DeploymentDetails::for_query_url_with_token(
subgraph_config.query_url.clone(),
subgraph_config.query_auth_token.clone(),
),
)
.await,
))
}
2 changes: 1 addition & 1 deletion crates/monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod escrow_accounts;
pub use crate::{
allocations::{indexer_allocations, AllocationWatcher},
attestation::{attestation_signers, AttestationWatcher},
client::{DeploymentDetails, SubgraphClient},
client::{create_subgraph_client, DeploymentDetails, SubgraphClient},
deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher},
dispute_manager::{dispute_manager, DisputeManagerWatcher},
escrow_accounts::{
Expand Down
28 changes: 2 additions & 26 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::time::Duration;

use anyhow::anyhow;
use axum::{extract::Request, serve, ServiceExt};
use indexer_config::{Config, GraphNodeConfig, SubgraphConfig};
use indexer_monitor::{DeploymentDetails, SubgraphClient};
use indexer_config::Config;
use indexer_monitor::create_subgraph_client;
use release::IndexerServiceRelease;
use reqwest::Url;
use tap_core::tap_eip712_domain;
Expand Down Expand Up @@ -125,30 +125,6 @@ pub async fn run() -> anyhow::Result<()> {
.await?)
}

async fn create_subgraph_client(
http_client: reqwest::Client,
graph_node: &GraphNodeConfig,
subgraph_config: &SubgraphConfig,
) -> &'static SubgraphClient {
Box::leak(Box::new(
SubgraphClient::new(
http_client,
subgraph_config.deployment_id.map(|deployment| {
DeploymentDetails::for_graph_node_url(
graph_node.status_url.clone(),
graph_node.query_url.clone(),
deployment,
)
}),
DeploymentDetails::for_query_url_with_token(
subgraph_config.query_url.clone(),
subgraph_config.query_auth_token.clone(),
),
)
.await,
))
}

/// Graceful shutdown handler
async fn shutdown_handler() {
let ctrl_c = async {
Expand Down
1 change: 1 addition & 0 deletions crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ractor = { version = "0.13", features = [
"async-trait",
], default-features = false }
tap_aggregator.workspace = true
typed-builder.workspace = true
futures = { version = "0.3.30", default-features = false }

[dev-dependencies]
Expand Down
143 changes: 47 additions & 96 deletions crates/tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,140 +1,91 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use indexer_config::{
Config, EscrowSubgraphConfig, GraphNodeConfig, IndexerConfig, NetworkSubgraphConfig,
SubgraphConfig, SubgraphsConfig, TapConfig,
};
use indexer_monitor::{escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};
use sender_account::SenderAccountConfig;
use indexer_config::{Config, IndexerConfig, SubgraphsConfig, TapConfig};
use indexer_monitor::{create_subgraph_client, escrow_accounts, indexer_allocations};
use ractor::{concurrency::JoinHandle, Actor, ActorRef};
use tap_core::tap_eip712_domain;

use crate::agent::sender_accounts_manager::{
SenderAccountsManagerArgs, SenderAccountsManagerMessage,
use crate::{
agent::{
sender_account::SenderAccountConfig,
sender_accounts_manager::{
SenderAccountsManager, SenderAccountsManagerArgs, SenderAccountsManagerMessage,
},
},
database,
};
use crate::{database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

mod metrics;
pub mod sender_account;
pub mod sender_accounts_manager;
pub mod sender_allocation;
pub mod unaggregated_receipts;

pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
pub async fn start_agent(
config: Config,
) -> (ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>) {
let domain_separator = tap_eip712_domain(
config.blockchain.chain_id as u64,
config.blockchain.receipts_verifier_address,
);
let Config {
indexer: IndexerConfig {
indexer_address, ..
},
graph_node:
GraphNodeConfig {
status_url: graph_node_status_endpoint,
query_url: graph_node_query_endpoint,
},
database,
subgraphs:
SubgraphsConfig {
network:
NetworkSubgraphConfig {
config:
SubgraphConfig {
query_url: network_query_url,
query_auth_token: network_query_auth_token,
deployment_id: network_deployment_id,
syncing_interval_secs: network_sync_interval,
},
recently_closed_allocation_buffer_secs: recently_closed_allocation_buffer,
},
escrow:
EscrowSubgraphConfig {
config:
SubgraphConfig {
query_url: escrow_query_url,
query_auth_token: escrow_query_auth_token,
deployment_id: escrow_deployment_id,
syncing_interval_secs: escrow_sync_interval,
},
},
},
ref graph_node,
ref database,
subgraphs: SubgraphsConfig {
ref network,
ref escrow,
},
tap:
TapConfig {
// TODO: replace with a proper implementation once the gateway registry contract is ready
sender_aggregator_endpoints,
ref sender_aggregator_endpoints,
..
},
..
} = &*CONFIG;
} = config;
let pgpool = database::connect(database.clone()).await;

let http_client = reqwest::Client::new();

let network_subgraph = Box::leak(Box::new(
SubgraphClient::new(
http_client.clone(),
network_deployment_id.map(|deployment| {
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
}),
DeploymentDetails::for_query_url_with_token(
network_query_url.clone(),
network_query_auth_token.clone(),
),
)
.await,
));
let network_subgraph =
create_subgraph_client(http_client.clone(), graph_node, &network.config).await;

let indexer_allocations = indexer_allocations(
network_subgraph,
*indexer_address,
*network_sync_interval,
*recently_closed_allocation_buffer,
indexer_address,
network.config.syncing_interval_secs,
network.recently_closed_allocation_buffer_secs,
)
.await
.expect("Failed to initialize indexer_allocations watcher");

let escrow_subgraph = Box::leak(Box::new(
SubgraphClient::new(
http_client.clone(),
escrow_deployment_id.map(|deployment| {
DeploymentDetails::for_graph_node_url(
graph_node_status_endpoint.clone(),
graph_node_query_endpoint.clone(),
deployment,
)
}),
DeploymentDetails::for_query_url_with_token(
escrow_query_url.clone(),
escrow_query_auth_token.clone(),
),
)
.await,
));
let escrow_subgraph = create_subgraph_client(http_client, graph_node, &escrow.config).await;

let escrow_accounts = escrow_accounts(
escrow_subgraph,
*indexer_address,
*escrow_sync_interval,
indexer_address,
escrow.config.syncing_interval_secs,
false,
)
.await
.expect("Error creating escrow_accounts channel");

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));
let config = SenderAccountConfig::from_config(&config);

let args = SenderAccountsManagerArgs {
config,
domain_separator: EIP_712_DOMAIN.clone(),
pgpool,
indexer_allocations,
escrow_accounts,
escrow_subgraph,
network_subgraph,
sender_aggregator_endpoints: sender_aggregator_endpoints.clone(),
prefix: None,
};
let args = SenderAccountsManagerArgs::builder()
.config(config)
.domain_separator(domain_separator)
.pgpool(pgpool)
.indexer_allocations(indexer_allocations)
.escrow_accounts(escrow_accounts)
.network_subgraph(network_subgraph)
.escrow_subgraph(escrow_subgraph)
.sender_aggregator_endpoints(sender_aggregator_endpoints.clone())
.build();

SenderAccountsManager::spawn(None, SenderAccountsManager, args)
.await
Expand Down
87 changes: 87 additions & 0 deletions crates/tap-agent/src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{
register_counter_vec, register_gauge_vec, register_histogram_vec, register_int_gauge_vec,
CounterVec, GaugeVec, HistogramVec, IntGaugeVec,
};

lazy_static::lazy_static! {
pub static ref SENDER_DENIED: IntGaugeVec =
register_int_gauge_vec!("tap_sender_denied", "Sender is denied", &["sender"]).unwrap();
pub static ref ESCROW_BALANCE: GaugeVec = register_gauge_vec!(
"tap_sender_escrow_balance_grt_total",
"Sender escrow balance",
&["sender"]
)
.unwrap();
pub static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!(
"tap_unaggregated_fees_grt_total",
"Unggregated Fees value",
&["sender", "allocation"]
)
.unwrap();
pub static ref SENDER_FEE_TRACKER: GaugeVec = register_gauge_vec!(
"tap_sender_fee_tracker_grt_total",
"Sender fee tracker metric",
&["sender"]
)
.unwrap();
pub static ref INVALID_RECEIPT_FEES: GaugeVec = register_gauge_vec!(
"tap_invalid_receipt_fees_grt_total",
"Failed receipt fees",
&["sender", "allocation"]
)
.unwrap();
pub static ref PENDING_RAV: GaugeVec = register_gauge_vec!(
"tap_pending_rav_grt_total",
"Pending ravs values",
&["sender", "allocation"]
)
.unwrap();
pub static ref MAX_FEE_PER_SENDER: GaugeVec = register_gauge_vec!(
"tap_max_fee_per_sender_grt_total",
"Max fee per sender in the config",
&["sender"]
)
.unwrap();
pub static ref RAV_REQUEST_TRIGGER_VALUE: GaugeVec = register_gauge_vec!(
"tap_rav_request_trigger_value",
"RAV request trigger value divisor",
&["sender"]
)
.unwrap();
pub static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!(
"tap_receipts_received_total",
"Receipts received since start of the program.",
&["sender", "allocation"]
)
.unwrap();

// Allocation metrics
pub static ref CLOSED_SENDER_ALLOCATIONS: CounterVec = register_counter_vec!(
"tap_closed_sender_allocation_total",
"Count of sender-allocation managers closed since the start of the program",
&["sender"]
)
.unwrap();
pub static ref RAVS_CREATED: CounterVec = register_counter_vec!(
"tap_ravs_created_total",
"RAVs updated or created per sender allocation since the start of the program",
&["sender", "allocation"]
)
.unwrap();
pub static ref RAVS_FAILED: CounterVec = register_counter_vec!(
"tap_ravs_failed_total",
"RAV requests failed since the start of the program",
&["sender", "allocation"]
)
.unwrap();
pub static ref RAV_RESPONSE_TIME: HistogramVec = register_histogram_vec!(
"tap_rav_response_time_seconds",
"RAV response time per sender",
&["sender"]
)
.unwrap();

}
Loading
Loading