From 0817632ae84b34dc8f4380fc50af4a69ce675022 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 13 Feb 2025 21:50:20 +0100 Subject: [PATCH] refactor: update check escrow receipt check (#629) * refactor: update check escrow receipt check Signed-off-by: Gustavo Inacio * refactor: update deny list check for v2 Signed-off-by: Gustavo Inacio --------- Signed-off-by: Gustavo Inacio --- ...3f354c5d246db5bb1506839cd7b07cdcc7065.json | 20 ++++ Cargo.lock | 24 +--- Cargo.toml | 2 +- crates/monitor/src/escrow_accounts.rs | 4 +- crates/monitor/src/lib.rs | 2 +- crates/service/Cargo.toml | 2 +- crates/service/src/middleware/sender.rs | 32 +++-- crates/service/src/service/router.rs | 57 +++++---- crates/service/src/tap.rs | 8 +- .../service/src/tap/checks/deny_list_check.rs | 113 ++++++++++++++---- .../src/tap/checks/sender_balance_check.rs | 33 +++-- crates/service/tests/router_test.rs | 3 +- crates/tap-agent/Cargo.toml | 3 +- crates/tap-agent/src/agent.rs | 4 +- crates/test-assets/Cargo.toml | 2 +- crates/test-assets/src/lib.rs | 7 +- ...11337_tap_horizon_sender_denylist.down.sql | 4 + ...2211337_tap_horizon_sender_denylist.up.sql | 22 ++++ 18 files changed, 240 insertions(+), 102 deletions(-) create mode 100644 .sqlx/query-f1d8dcf24c9677ef789469d37733f354c5d246db5bb1506839cd7b07cdcc7065.json diff --git a/.sqlx/query-f1d8dcf24c9677ef789469d37733f354c5d246db5bb1506839cd7b07cdcc7065.json b/.sqlx/query-f1d8dcf24c9677ef789469d37733f354c5d246db5bb1506839cd7b07cdcc7065.json new file mode 100644 index 00000000..8dc48068 --- /dev/null +++ b/.sqlx/query-f1d8dcf24c9677ef789469d37733f354c5d246db5bb1506839cd7b07cdcc7065.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT sender_address FROM tap_horizon_denylist\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sender_address", + "type_info": "Bpchar" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "f1d8dcf24c9677ef789469d37733f354c5d246db5bb1506839cd7b07cdcc7065" +} diff --git a/Cargo.lock b/Cargo.lock index da53e80c..ebeb733a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3890,6 +3890,7 @@ dependencies = [ "base64 0.22.1", "bigdecimal", "bip39", + "bon 3.3.2", "build-info", "build-info-build", "clap", @@ -3932,7 +3933,6 @@ dependencies = [ "tower_governor", "tracing", "tracing-subscriber", - "typed-builder", "uuid", "wiremock", ] @@ -7449,13 +7449,13 @@ name = "test-assets" version = "0.1.0" dependencies = [ "bip39", + "bon 3.3.2", "indexer-allocation", "lazy_static", "tap_core", "tap_graph", "thegraph-core", "tokio", - "typed-builder", ] [[package]] @@ -8088,26 +8088,6 @@ dependencies = [ "utf-8", ] -[[package]] -name = "typed-builder" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75" -dependencies = [ - "typed-builder-macro", -] - -[[package]] -name = "typed-builder-macro" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.90", -] - [[package]] name = "typenum" version = "1.17.0" diff --git a/Cargo.toml b/Cargo.toml index 64a12160..084d58b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] } bip39 = "2.0.0" rstest = "0.23.0" wiremock = "0.6.1" -typed-builder = "0.20.0" +bon = "3.3" tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] } prost = "0.13.4" prost-types = "0.13.3" diff --git a/crates/monitor/src/escrow_accounts.rs b/crates/monitor/src/escrow_accounts.rs index 6c1fd047..03891a53 100644 --- a/crates/monitor/src/escrow_accounts.rs +++ b/crates/monitor/src/escrow_accounts.rs @@ -88,7 +88,7 @@ impl EscrowAccounts { pub type EscrowAccountsWatcher = Receiver; -pub async fn escrow_accounts( +pub async fn escrow_accounts_v1( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, interval: Duration, @@ -243,7 +243,7 @@ mod tests { ); mock_server.register(mock).await; - let mut accounts = escrow_accounts( + let mut accounts = escrow_accounts_v1( escrow_subgraph, test_assets::INDEXER_ADDRESS, Duration::from_secs(60), diff --git a/crates/monitor/src/lib.rs b/crates/monitor/src/lib.rs index a9a10b6a..30f6f532 100644 --- a/crates/monitor/src/lib.rs +++ b/crates/monitor/src/lib.rs @@ -15,7 +15,7 @@ pub use crate::{ deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher}, dispute_manager::{dispute_manager, DisputeManagerWatcher}, escrow_accounts::{ - escrow_accounts, escrow_accounts_v2, EscrowAccounts, EscrowAccountsError, + escrow_accounts_v1, escrow_accounts_v2, EscrowAccounts, EscrowAccountsError, EscrowAccountsWatcher, }, }; diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index b737dbab..cd072279 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -40,7 +40,7 @@ graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3 tap_core.workspace = true tap_graph.workspace = true uuid.workspace = true -typed-builder.workspace = true +bon.workspace = true tower_governor = { version = "0.5.0", features = ["axum"] } governor = "0.8.0" tower-http = { version = "0.6.2", features = [ diff --git a/crates/service/src/middleware/sender.rs b/crates/service/src/middleware/sender.rs index 99b4ee40..4e0dc54b 100644 --- a/crates/service/src/middleware/sender.rs +++ b/crates/service/src/middleware/sender.rs @@ -17,8 +17,10 @@ use crate::{error::IndexerServiceError, tap::TapReceipt}; pub struct SenderState { /// Used to recover the signer address pub domain_separator: Eip712Domain, - /// Used to get the sender address given the signer address - pub escrow_accounts: watch::Receiver, + /// Used to get the sender address given the signer address if v1 receipt + pub escrow_accounts_v1: watch::Receiver, + /// Used to get the sender address given the signer address if v2 receipt + pub escrow_accounts_v2: watch::Receiver, } /// The current query Sender address @@ -45,10 +47,16 @@ pub async fn sender_middleware( ) -> Result { if let Some(receipt) = request.extensions().get::() { let signer = receipt.recover_signer(&state.domain_separator)?; - let sender = state - .escrow_accounts - .borrow() - .get_sender_for_signer(&signer)?; + let sender = match receipt { + TapReceipt::V1(_) => state + .escrow_accounts_v1 + .borrow() + .get_sender_for_signer(&signer)?, + TapReceipt::V2(_) => state + .escrow_accounts_v2 + .borrow() + .get_sender_for_signer(&signer)?, + }; request.extensions_mut().insert(Sender(sender)); } @@ -78,14 +86,22 @@ mod tests { #[tokio::test] async fn test_sender_middleware() { - let escrow_accounts = watch::channel(EscrowAccounts::new( + let escrow_accounts_v1 = watch::channel(EscrowAccounts::new( ESCROW_ACCOUNTS_BALANCES.to_owned(), ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), )) .1; + + let escrow_accounts_v2 = watch::channel(EscrowAccounts::new( + ESCROW_ACCOUNTS_BALANCES.to_owned(), + ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )) + .1; + let state = SenderState { domain_separator: test_assets::TAP_EIP712_DOMAIN.clone(), - escrow_accounts, + escrow_accounts_v1, + escrow_accounts_v2, }; let middleware = from_fn_with_state(state, sender_middleware); diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index 69caa62d..fc9be6f3 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -17,9 +17,9 @@ use indexer_config::{ ServiceConfig, ServiceTapConfig, }; use indexer_monitor::{ - attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts, - indexer_allocations, AllocationWatcher, DisputeManagerWatcher, EscrowAccountsWatcher, - SubgraphClient, + attestation_signers, deployment_to_allocation, dispute_manager, escrow_accounts_v1, + escrow_accounts_v2, indexer_allocations, AllocationWatcher, DisputeManagerWatcher, + EscrowAccountsWatcher, SubgraphClient, }; use reqwest::Method; use tap_core::{manager::Manager, receipt::checks::CheckList}; @@ -34,7 +34,6 @@ use tower_http::{ trace::TraceLayer, validate_request::ValidateRequestHeaderLayer, }; -use typed_builder::TypedBuilder; use super::{release::IndexerServiceRelease, GraphNodeState}; use crate::{ @@ -51,7 +50,7 @@ use crate::{ wallet::public_key, }; -#[derive(TypedBuilder)] +#[derive(bon::Builder)] pub struct ServiceRouter { // database database: sqlx::PgPool, @@ -60,7 +59,6 @@ pub struct ServiceRouter { // graphnode client http_client: reqwest::Client, // release info - #[builder(default, setter(strip_option))] release: Option, // configuration @@ -71,23 +69,21 @@ pub struct ServiceRouter { timestamp_buffer_secs: Duration, // either provide subgraph or watcher - #[builder(default, setter(transform = + #[builder(with = |subgraph: &'static SubgraphClient, - config: EscrowSubgraphConfig| - Some((subgraph, config))))] + config: EscrowSubgraphConfig| + (subgraph, config))] escrow_subgraph: Option<(&'static SubgraphClient, EscrowSubgraphConfig)>, - #[builder(default, setter(strip_option))] - escrow_accounts: Option, + escrow_accounts_v1: Option, + + escrow_accounts_v2: Option, // provide network subgraph or allocations + dispute manager - #[builder(default, setter(transform = - |subgraph: &'static SubgraphClient, - config: NetworkSubgraphConfig| - Some((subgraph, config))))] + #[builder(with = |subgraph: &'static SubgraphClient, + config: NetworkSubgraphConfig| + (subgraph, config))] network_subgraph: Option<(&'static SubgraphClient, NetworkSubgraphConfig)>, - #[builder(default, setter(strip_option))] allocations: Option, - #[builder(default, setter(strip_option))] dispute_manager: Option, } @@ -141,11 +137,26 @@ impl ServiceRouter { (None, None) => panic!("No allocations or network subgraph was provided"), }; - // Monitor escrow accounts + // Monitor escrow accounts v1 + // if not provided, create monitor from subgraph + let escrow_accounts_v1 = match (self.escrow_accounts_v1, self.escrow_subgraph.as_ref()) { + (Some(escrow_account), _) => escrow_account, + (_, Some((escrow_subgraph, escrow))) => escrow_accounts_v1( + escrow_subgraph, + indexer_address, + escrow.config.syncing_interval_secs, + true, // Reject thawing signers eagerly + ) + .await + .expect("Error creating escrow_accounts channel"), + (None, None) => panic!("No escrow accounts or escrow subgraph was provided"), + }; + + // Monitor escrow accounts v2 // if not provided, create monitor from subgraph - let escrow_accounts = match (self.escrow_accounts, self.escrow_subgraph.as_ref()) { + let escrow_accounts_v2 = match (self.escrow_accounts_v2, self.escrow_subgraph.as_ref()) { (Some(escrow_account), _) => escrow_account, - (_, Some((escrow_subgraph, escrow))) => escrow_accounts( + (_, Some((escrow_subgraph, escrow))) => escrow_accounts_v2( escrow_subgraph, indexer_address, escrow.config.syncing_interval_secs, @@ -255,7 +266,8 @@ impl ServiceRouter { let checks = IndexerTapContext::get_checks( self.database, allocations.clone(), - escrow_accounts.clone(), + escrow_accounts_v1.clone(), + escrow_accounts_v2.clone(), timestamp_error_tolerance, receipt_max_value, ) @@ -299,7 +311,8 @@ impl ServiceRouter { deployment_to_allocation, }; let sender_state = SenderState { - escrow_accounts, + escrow_accounts_v1, + escrow_accounts_v2, domain_separator: self.domain_separator, }; diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index 47bc91c9..393c79c9 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -48,13 +48,17 @@ impl IndexerTapContext { pub async fn get_checks( pgpool: PgPool, indexer_allocations: Receiver>, - escrow_accounts: Receiver, + escrow_accounts_v1: Receiver, + escrow_accounts_v2: Receiver, timestamp_error_tolerance: Duration, receipt_max_value: u128, ) -> Vec> { vec![ Arc::new(AllocationEligible::new(indexer_allocations)), - Arc::new(SenderBalanceCheck::new(escrow_accounts)), + Arc::new(SenderBalanceCheck::new( + escrow_accounts_v1, + escrow_accounts_v2, + )), Arc::new(TimestampCheck::new(timestamp_error_tolerance)), Arc::new(DenyListCheck::new(pgpool.clone()).await), Arc::new(ReceiptMaxValueCheck::new(receipt_max_value)), diff --git a/crates/service/src/tap/checks/deny_list_check.rs b/crates/service/src/tap/checks/deny_list_check.rs index 39589308..9f60bd5e 100644 --- a/crates/service/src/tap/checks/deny_list_check.rs +++ b/crates/service/src/tap/checks/deny_list_check.rs @@ -16,9 +16,14 @@ use crate::{ tap::{CheckingReceipt, TapReceipt}, }; +enum DenyListVersion { + V1, + V2, +} + pub struct DenyListCheck { - sender_denylist: Arc>>, - _sender_denylist_watcher_handle: Arc>, + sender_denylist_v1: Arc>>, + sender_denylist_v2: Arc>>, sender_denylist_watcher_cancel_token: tokio_util::sync::CancellationToken, #[cfg(test)] @@ -29,8 +34,9 @@ impl DenyListCheck { pub async fn new(pgpool: PgPool) -> Self { // Listen to pg_notify events. We start it before updating the sender_denylist so that we // don't miss any updates. PG will buffer the notifications until we start consuming them. - let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap(); - pglistener + let mut pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + let mut pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap(); + pglistener_v1 .listen("scalar_tap_deny_notification") .await .expect( @@ -38,9 +44,18 @@ impl DenyListCheck { 'scalar_tap_deny_notification'", ); + pglistener_v2 + .listen("tap_horizon_deny_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'tap_horizon_deny_notification'", + ); + // Fetch the denylist from the DB - let sender_denylist = Arc::new(RwLock::new(HashSet::new())); - Self::sender_denylist_reload(pgpool.clone(), sender_denylist.clone()) + let sender_denylist_v1 = Arc::new(RwLock::new(HashSet::new())); + let sender_denylist_v2 = Arc::new(RwLock::new(HashSet::new())); + Self::sender_denylist_reload_v1(pgpool.clone(), sender_denylist_v1.clone()) .await .expect("should be able to fetch the sender_denylist from the DB on startup"); @@ -48,24 +63,36 @@ impl DenyListCheck { let notify = std::sync::Arc::new(tokio::sync::Notify::new()); let sender_denylist_watcher_cancel_token = tokio_util::sync::CancellationToken::new(); - let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher( + tokio::spawn(Self::sender_denylist_watcher( pgpool.clone(), - pglistener, - sender_denylist.clone(), + pglistener_v1, + sender_denylist_v1.clone(), sender_denylist_watcher_cancel_token.clone(), + DenyListVersion::V1, #[cfg(test)] notify.clone(), - ))); + )); + + tokio::spawn(Self::sender_denylist_watcher( + pgpool.clone(), + pglistener_v2, + sender_denylist_v1.clone(), + sender_denylist_watcher_cancel_token.clone(), + DenyListVersion::V2, + #[cfg(test)] + notify.clone(), + )); + Self { - sender_denylist, - _sender_denylist_watcher_handle: sender_denylist_watcher_handle, + sender_denylist_v1, + sender_denylist_v2, sender_denylist_watcher_cancel_token, #[cfg(test)] notify, } } - async fn sender_denylist_reload( + async fn sender_denylist_reload_v1( pgpool: PgPool, denylist_rwlock: Arc>>, ) -> anyhow::Result<()> { @@ -86,11 +113,33 @@ impl DenyListCheck { Ok(()) } + async fn sender_denylist_reload_v2( + pgpool: PgPool, + denylist_rwlock: Arc>>, + ) -> anyhow::Result<()> { + // Fetch the denylist from the DB + let sender_denylist = sqlx::query!( + r#" + SELECT sender_address FROM tap_horizon_denylist + "# + ) + .fetch_all(&pgpool) + .await? + .iter() + .map(|row| Address::from_str(&row.sender_address)) + .collect::, _>>()?; + + *(denylist_rwlock.write().unwrap()) = sender_denylist; + + Ok(()) + } + async fn sender_denylist_watcher( pgpool: PgPool, mut pglistener: PgListener, denylist: Arc>>, cancel_token: tokio_util::sync::CancellationToken, + version: DenyListVersion, #[cfg(test)] notify: std::sync::Arc, ) { #[derive(serde::Deserialize)] @@ -137,10 +186,14 @@ impl DenyListCheck { denylist.", denylist_notification.tg_op ); - - Self::sender_denylist_reload(pgpool.clone(), denylist.clone()) - .await - .expect("should be able to reload the sender denylist") + match version { + DenyListVersion::V1 => Self::sender_denylist_reload_v1(pgpool.clone(), denylist.clone()) + .await + .expect("should be able to reload the sender denylist"), + DenyListVersion::V2 => Self::sender_denylist_reload_v2(pgpool.clone(), denylist.clone()) + .await + .expect("should be able to reload the sender denylist"), + } } } #[cfg(test)] @@ -153,18 +206,30 @@ impl DenyListCheck { #[async_trait::async_trait] impl Check for DenyListCheck { - async fn check(&self, ctx: &tap_core::receipt::Context, _: &CheckingReceipt) -> CheckResult { + async fn check( + &self, + ctx: &tap_core::receipt::Context, + receipt: &CheckingReceipt, + ) -> CheckResult { let Sender(receipt_sender) = ctx .get::() .ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?; + let denied = match receipt.signed_receipt() { + TapReceipt::V1(_) => self + .sender_denylist_v1 + .read() + .unwrap() + .contains(receipt_sender), + TapReceipt::V2(_) => self + .sender_denylist_v2 + .read() + .unwrap() + .contains(receipt_sender), + }; + // Check that the sender is not denylisted - if self - .sender_denylist - .read() - .unwrap() - .contains(receipt_sender) - { + if denied { return Err(CheckError::Failed(anyhow::anyhow!( "Received a receipt from a denylisted sender: {}", receipt_sender diff --git a/crates/service/src/tap/checks/sender_balance_check.rs b/crates/service/src/tap/checks/sender_balance_check.rs index 876c2da7..9f09340d 100644 --- a/crates/service/src/tap/checks/sender_balance_check.rs +++ b/crates/service/src/tap/checks/sender_balance_check.rs @@ -13,30 +13,45 @@ use crate::{ }; pub struct SenderBalanceCheck { - escrow_accounts: Receiver, + escrow_accounts_v1: Receiver, + escrow_accounts_v2: Receiver, } impl SenderBalanceCheck { - pub fn new(escrow_accounts: Receiver) -> Self { - Self { escrow_accounts } + pub fn new( + escrow_accounts_v1: Receiver, + escrow_accounts_v2: Receiver, + ) -> Self { + Self { + escrow_accounts_v1, + escrow_accounts_v2, + } } } #[async_trait::async_trait] impl Check for SenderBalanceCheck { - async fn check(&self, ctx: &tap_core::receipt::Context, _: &CheckingReceipt) -> CheckResult { - let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + async fn check( + &self, + ctx: &tap_core::receipt::Context, + receipt: &CheckingReceipt, + ) -> CheckResult { + let escrow_accounts_snapshot_v1 = self.escrow_accounts_v1.borrow(); + let escrow_accounts_snapshot_v2 = self.escrow_accounts_v2.borrow(); let Sender(receipt_sender) = ctx .get::() .ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?; + // get balance for escrow account given receipt type + let balance_result = match receipt.signed_receipt() { + TapReceipt::V1(_) => escrow_accounts_snapshot_v1.get_balance_for_sender(receipt_sender), + TapReceipt::V2(_) => escrow_accounts_snapshot_v2.get_balance_for_sender(receipt_sender), + }; + // Check that the sender has a non-zero balance -- more advanced accounting is done in // `tap-agent`. - if !escrow_accounts_snapshot - .get_balance_for_sender(receipt_sender) - .is_ok_and(|balance| balance > U256::ZERO) - { + if !balance_result.is_ok_and(|balance| balance > U256::ZERO) { return Err(CheckError::Failed(anyhow!( "Receipt sender `{}` does not have a sufficient balance", receipt_sender, diff --git a/crates/service/tests/router_test.rs b/crates/service/tests/router_test.rs index 105d3cd7..452d6ff7 100644 --- a/crates/service/tests/router_test.rs +++ b/crates/service/tests/router_test.rs @@ -90,7 +90,8 @@ async fn full_integration_test(database: PgPool) { receipts_verifier_address: test_assets::VERIFIER_ADDRESS, }) .timestamp_buffer_secs(Duration::from_secs(10)) - .escrow_accounts(escrow_accounts) + .escrow_accounts_v1(escrow_accounts.clone()) + .escrow_accounts_v2(escrow_accounts) .dispute_manager(dispute_manager) .allocations(allocations) .build(); diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 48859968..9c31f724 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -49,7 +49,7 @@ ractor = { version = "0.14", features = [ ], default-features = false } tap_aggregator.workspace = true futures = { version = "0.3.30", default-features = false } -bon = "3.3" +bon.workspace = true test-assets = { path = "../test-assets", optional = true } rand = { version = "0.8", optional = true } itertools = "0.14.0" @@ -64,5 +64,4 @@ wiremock.workspace = true wiremock-grpc = "0.0.3-alpha3" test-assets = { path = "../test-assets" } test-log = { version = "0.2.12", features = ["trace"] } -bon = "3.3" rstest = "0.24.0" diff --git a/crates/tap-agent/src/agent.rs b/crates/tap-agent/src/agent.rs index d903a7ea..9fa6143f 100644 --- a/crates/tap-agent/src/agent.rs +++ b/crates/tap-agent/src/agent.rs @@ -40,7 +40,7 @@ use indexer_config::{ SubgraphConfig, SubgraphsConfig, TapConfig, }; use indexer_monitor::{ - escrow_accounts, escrow_accounts_v2, indexer_allocations, DeploymentDetails, SubgraphClient, + escrow_accounts_v1, escrow_accounts_v2, indexer_allocations, DeploymentDetails, SubgraphClient, }; use ractor::{concurrency::JoinHandle, Actor, ActorRef}; use sender_account::SenderAccountConfig; @@ -156,7 +156,7 @@ pub async fn start_agent() -> (ActorRef, JoinHandl .await, )); - let escrow_accounts_v1 = escrow_accounts( + let escrow_accounts_v1 = escrow_accounts_v1( escrow_subgraph, *indexer_address, *escrow_sync_interval, diff --git a/crates/test-assets/Cargo.toml b/crates/test-assets/Cargo.toml index 0585bd13..c873b5e8 100644 --- a/crates/test-assets/Cargo.toml +++ b/crates/test-assets/Cargo.toml @@ -10,5 +10,5 @@ lazy_static.workspace = true tap_core.workspace = true tap_graph.workspace = true thegraph-core.workspace = true -typed-builder.workspace = true +bon.workspace = true tokio.workspace = true diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index 70482e1e..3a9edb4b 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -21,7 +21,6 @@ use thegraph_core::{ deployment_id, DeploymentId, }; use tokio::sync::Notify; -use typed_builder::TypedBuilder; /// Assert something is true while sleeping and retrying /// @@ -310,16 +309,16 @@ lazy_static! { ); } -#[derive(TypedBuilder)] +#[derive(bon::Builder)] pub struct SignedReceiptRequest { #[builder(default = Address::ZERO)] allocation_id: Address, #[builder(default)] nonce: u64, - #[builder(default_code = r#"SystemTime::now() + #[builder(default = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_nanos() as u64"#)] + .as_nanos() as u64)] timestamp_ns: u64, #[builder(default = 1)] value: u128, diff --git a/migrations/20250212211337_tap_horizon_sender_denylist.down.sql b/migrations/20250212211337_tap_horizon_sender_denylist.down.sql index cf1166a3..11698071 100644 --- a/migrations/20250212211337_tap_horizon_sender_denylist.down.sql +++ b/migrations/20250212211337_tap_horizon_sender_denylist.down.sql @@ -1,2 +1,6 @@ -- Add down migration script here +DROP TRIGGER IF EXISTS deny_update ON tap_horizon_deny CASCADE; + +DROP FUNCTION IF EXISTS tap_horizon_deny_notify() CASCADE; + DROP TABLE IF EXISTS tap_horizon_denylist CASCADE; diff --git a/migrations/20250212211337_tap_horizon_sender_denylist.up.sql b/migrations/20250212211337_tap_horizon_sender_denylist.up.sql index fff69af0..9df4ddaf 100644 --- a/migrations/20250212211337_tap_horizon_sender_denylist.up.sql +++ b/migrations/20250212211337_tap_horizon_sender_denylist.up.sql @@ -2,3 +2,25 @@ CREATE TABLE IF NOT EXISTS tap_horizon_denylist ( sender_address CHAR(40) PRIMARY KEY ); + + +CREATE FUNCTION tap_horizon_deny_notify() +RETURNS trigger AS +$$ +BEGIN + IF TG_OP = 'DELETE' THEN + PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "DELETE", "sender_address": "%s"}', OLD.sender_address)); + RETURN OLD; + ELSIF TG_OP = 'INSERT' THEN + PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "INSERT", "sender_address": "%s"}', NEW.sender_address)); + RETURN NEW; + ELSE -- UPDATE OR TRUNCATE, should never happen + PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "%s", "sender_address": null}', TG_OP, NEW.sender_address)); + RETURN NEW; + END IF; +END; +$$ LANGUAGE 'plpgsql'; + +CREATE TRIGGER deny_update AFTER INSERT OR UPDATE OR DELETE + ON tap_horizon_denylist + FOR EACH ROW EXECUTE PROCEDURE tap_horizon_deny_notify();