Skip to content

Commit

Permalink
refactor: use router builder (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusinacio authored Nov 26, 2024
1 parent a50e23d commit 430c5d4
Show file tree
Hide file tree
Showing 40 changed files with 1,145 additions and 1,018 deletions.
358 changes: 113 additions & 245 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ alloy = { version = "=0.5.4", features = [
], default-features = false }
clap = "4.4.3"
lazy_static = "1.4.0"
axum = { version = "0.7.7", default-features = false }
axum = { version = "0.7.9", default-features = false, features = [
"tokio",
"http1",
"http2",
] }
tokio = "1.40"
prometheus = "0.13.3"
anyhow = { version = "1.0.72" }
Expand Down Expand Up @@ -72,3 +76,5 @@ thegraph-graphql-http = "0.2.0"
graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
bip39 = "2.0.0"
rstest = "0.23.0"
wiremock = "0.6.1"
typed-builder = "0.20.0"
30 changes: 15 additions & 15 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::NonZeroGRT;

const SHARED_PREFIX: &str = "INDEXER_";

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct Config {
pub indexer: IndexerConfig,
Expand Down Expand Up @@ -229,14 +229,14 @@ impl Config {
}
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct IndexerConfig {
pub indexer_address: Address,
pub operator_mnemonic: Mnemonic,
}

#[derive(Clone, Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
#[cfg_attr(test, derive(PartialEq))]
#[serde(untagged)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -278,14 +278,14 @@ impl DatabaseConfig {
}
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct GraphNodeConfig {
pub query_url: Url,
pub status_url: Url,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct MetricsConfig {
pub port: u16,
Expand All @@ -297,15 +297,15 @@ impl MetricsConfig {
}
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct SubgraphsConfig {
pub network: NetworkSubgraphConfig,
pub escrow: EscrowSubgraphConfig,
}

#[serde_as]
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct NetworkSubgraphConfig {
#[serde(flatten)]
Expand All @@ -315,15 +315,15 @@ pub struct NetworkSubgraphConfig {
pub recently_closed_allocation_buffer_secs: Duration,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct EscrowSubgraphConfig {
#[serde(flatten)]
pub config: SubgraphConfig,
}

#[serde_as]
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct SubgraphConfig {
pub query_url: Url,
Expand All @@ -346,14 +346,14 @@ pub enum TheGraphChainId {
Test = 1337,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct BlockchainConfig {
pub chain_id: TheGraphChainId,
pub receipts_verifier_address: Address,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct ServiceConfig {
pub serve_network_subgraph: bool,
Expand All @@ -366,14 +366,14 @@ pub struct ServiceConfig {
}

#[serde_as]
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct ServiceTapConfig {
/// what's the maximum value we accept in a receipt
pub max_receipt_value_grt: NonZeroGRT,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct TapConfig {
/// what is the maximum amount the indexer is willing to lose in grt
Expand All @@ -383,7 +383,7 @@ pub struct TapConfig {
pub sender_aggregator_endpoints: HashMap<Address, Url>,
}

#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct DipsConfig {
pub allowed_payers: Vec<Address>,
Expand All @@ -402,7 +402,7 @@ impl TapConfig {
}

#[serde_as]
#[derive(Debug, Deserialize, Clone)]
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct RavRequestConfig {
/// what divisor of the amount willing to lose to trigger the rav request
Expand Down
2 changes: 1 addition & 1 deletion crates/monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ bip39.workspace = true
[dev-dependencies]
env_logger = { version = "0.11.0", default-features = false }
test-log = { version = "0.2.12", default-features = false }
wiremock = "0.5.19"
wiremock.workspace = true
test-assets = { path = "../test-assets" }
5 changes: 4 additions & 1 deletion crates/monitor/src/allocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ use std::{
};
use tokio::sync::watch::Receiver;

/// Receiver of Map between allocation id and allocation struct
pub type AllocationWatcher = Receiver<HashMap<Address, Allocation>>;

/// An always up-to-date list of an indexer's active and recently closed allocations.
pub async fn indexer_allocations(
network_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
recently_closed_allocation_buffer: Duration,
) -> anyhow::Result<Receiver<HashMap<Address, Allocation>>> {
) -> anyhow::Result<AllocationWatcher> {
new_watcher(interval, move || async move {
get_allocations(
network_subgraph,
Expand Down
11 changes: 8 additions & 3 deletions crates/monitor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ use thegraph_core::{Address, ChainId};
use tokio::sync::watch::Receiver;
use tracing::warn;

use crate::{AllocationWatcher, DisputeManagerWatcher};

/// Receiver for Map of allocation id and attestation signer
pub type AttestationWatcher = Receiver<HashMap<Address, AttestationSigner>>;

/// An always up-to-date list of attestation signers, one for each of the indexer's allocations.
pub fn attestation_signers(
indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
indexer_allocations_rx: AllocationWatcher,
indexer_mnemonic: Mnemonic,
chain_id: ChainId,
dispute_manager_rx: Receiver<Address>,
) -> Receiver<HashMap<Address, AttestationSigner>> {
dispute_manager_rx: DisputeManagerWatcher,
) -> AttestationWatcher {
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));
let indexer_mnemonic = Arc::new(indexer_mnemonic.to_string());
Expand Down
26 changes: 9 additions & 17 deletions crates/monitor/src/client/subgraph_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,26 @@ impl DeploymentDetails {
graph_node_base_url: &str,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Self::for_graph_node_url(
Ok(Self::for_graph_node_url(
Url::parse(graph_node_status_url)?,
Url::parse(graph_node_base_url)?,
deployment,
)
))
}

pub fn for_graph_node_url(
graph_node_status_url: Url,
graph_node_base_url: Url,
deployment: DeploymentId,
) -> Result<Self, anyhow::Error> {
Ok(Self {
) -> Self {
Self {
deployment: Some(deployment),
status_url: Some(graph_node_status_url),
query_url: graph_node_base_url.join(&format!("subgraphs/id/{deployment}"))?,
query_url: graph_node_base_url
.join(&format!("subgraphs/id/{deployment}"))
.expect("Must be correct"),
query_auth_token: None,
})
}
}

pub fn for_query_url(query_url: &str) -> Result<Self, anyhow::Error> {
Expand All @@ -55,17 +57,7 @@ impl DeploymentDetails {
})
}

pub fn for_query_url_with_token(
query_url: &str,
query_auth_token: Option<String>,
) -> Result<Self, anyhow::Error> {
Ok(Self::for_query_url_with_token_url(
Url::parse(query_url)?,
query_auth_token,
))
}

pub fn for_query_url_with_token_url(query_url: Url, query_auth_token: Option<String>) -> Self {
pub fn for_query_url_with_token(query_url: Url, query_auth_token: Option<String>) -> Self {
Self {
deployment: None,
status_url: None,
Expand Down
10 changes: 7 additions & 3 deletions crates/monitor/src/deployment_to_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use std::collections::HashMap;
use thegraph_core::{Address, DeploymentId};
use tokio::sync::watch::Receiver;

use indexer_allocation::Allocation;
use indexer_watcher::map_watcher;

use crate::AllocationWatcher;

/// Watcher for Map of deployment id and allocation id
pub type DeploymentToAllocationWatcher = Receiver<HashMap<DeploymentId, Address>>;

/// Watcher of indexer allocation
/// returning a map of subgraph deployment to allocation id
pub fn deployment_to_allocation(
indexer_allocations_rx: Receiver<HashMap<Address, Allocation>>,
) -> Receiver<HashMap<DeploymentId, Address>> {
indexer_allocations_rx: AllocationWatcher,
) -> DeploymentToAllocationWatcher {
map_watcher(indexer_allocations_rx, move |allocation| {
allocation
.iter()
Expand Down
6 changes: 5 additions & 1 deletion crates/monitor/src/dispute_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ use indexer_watcher::new_watcher;
use std::time::Duration;
use tokio::sync::watch::Receiver;

/// Watcher for Dispute Manager Address
pub type DisputeManagerWatcher = Receiver<Address>;

/// Monitors the subgraph for dispute manager address
pub async fn dispute_manager(
network_subgraph: &'static SubgraphClient,
interval: Duration,
) -> anyhow::Result<Receiver<Address>> {
) -> anyhow::Result<DisputeManagerWatcher> {
new_watcher(interval, move || async move {
let response = network_subgraph
.query::<DisputeManager, _>(dispute_manager::Variables {})
Expand Down
4 changes: 3 additions & 1 deletion crates/monitor/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ impl EscrowAccounts {
}
}

pub type EscrowAccountsWatcher = Receiver<EscrowAccounts>;

pub async fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Result<Receiver<EscrowAccounts>, anyhow::Error> {
) -> Result<EscrowAccountsWatcher, anyhow::Error> {
indexer_watcher::new_watcher(interval, move || {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
})
Expand Down
12 changes: 7 additions & 5 deletions crates/monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ mod dispute_manager;
mod escrow_accounts;

pub use crate::{
allocations::indexer_allocations,
attestation::attestation_signers,
allocations::{indexer_allocations, AllocationWatcher},
attestation::{attestation_signers, AttestationWatcher},
client::{DeploymentDetails, SubgraphClient},
deployment_to_allocation::deployment_to_allocation,
dispute_manager::dispute_manager,
escrow_accounts::{escrow_accounts, EscrowAccounts, EscrowAccountsError},
deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher},
dispute_manager::{dispute_manager, DisputeManagerWatcher},
escrow_accounts::{
escrow_accounts, EscrowAccounts, EscrowAccountsError, EscrowAccountsWatcher,
},
};
7 changes: 6 additions & 1 deletion crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3
tap_core.workspace = true
uuid.workspace = true
alloy.workspace = true
tower_governor = "0.4.0"
typed-builder.workspace = true
tower_governor = "0.4.3"
governor = "0.6.0"
tower-http = { version = "0.6.2", features = [
"auth",
"cors",
Expand All @@ -59,10 +61,13 @@ pin-project = "1.1.7"
[dev-dependencies]
hex-literal = "0.4.1"
test-assets = { path = "../test-assets" }
sqlx = { workspace = true, features = ["migrate"] }
rstest.workspace = true
tower-test = "0.4.0"
tower-service = "0.3.3"
tokio-test = "0.4.4"
wiremock.workspace = true
insta = "1.41.1"

[build-dependencies]
build-info-build = { version = "0.0.39", default-features = false }
7 changes: 5 additions & 2 deletions crates/service/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use std::time::Duration;
use sqlx::{postgres::PgPoolOptions, PgPool};
use tracing::debug;

const DATABASE_TIMEOUT: Duration = Duration::from_secs(30);
const DATABASE_MAX_CONNECTIONS: u32 = 50;

pub async fn connect(url: &str) -> PgPool {
debug!("Connecting to database");

PgPoolOptions::new()
.max_connections(50)
.acquire_timeout(Duration::from_secs(3))
.max_connections(DATABASE_MAX_CONNECTIONS)
.acquire_timeout(DATABASE_TIMEOUT)
.connect(url)
.await
.expect("Should be able to connect to the database")
Expand Down
2 changes: 2 additions & 0 deletions crates/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ mod routes;
pub mod service;
mod tap;
mod wallet;

pub use middleware::QueryBody;
Loading

0 comments on commit 430c5d4

Please sign in to comment.