diff --git a/.sqlx/query-10bd83671f30f7bc2096e9158be60023577bbbdbab7a83788204d066bdd9fec5.json b/.sqlx/query-10bd83671f30f7bc2096e9158be60023577bbbdbab7a83788204d066bdd9fec5.json new file mode 100644 index 00000000..58f05261 --- /dev/null +++ b/.sqlx/query-10bd83671f30f7bc2096e9158be60023577bbbdbab7a83788204d066bdd9fec5.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM tap_horizon_receipts\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "10bd83671f30f7bc2096e9158be60023577bbbdbab7a83788204d066bdd9fec5" +} diff --git a/.sqlx/query-5f0c42c9a92a446d37b2971175df6ed0cd31da6b57918a2d600ef90adce1345d.json b/.sqlx/query-5f0c42c9a92a446d37b2971175df6ed0cd31da6b57918a2d600ef90adce1345d.json new file mode 100644 index 00000000..a38315f9 --- /dev/null +++ b/.sqlx/query-5f0c42c9a92a446d37b2971175df6ed0cd31da6b57918a2d600ef90adce1345d.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT count(*)\n FROM tap_horizon_receipts\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "5f0c42c9a92a446d37b2971175df6ed0cd31da6b57918a2d600ef90adce1345d" +} diff --git a/.sqlx/query-6ce2133a20924d5098fa2e27f897e871955e9eb8a0a6f8b9f23a3f38597793f6.json b/.sqlx/query-6ce2133a20924d5098fa2e27f897e871955e9eb8a0a6f8b9f23a3f38597793f6.json new file mode 100644 index 00000000..d3b8f818 --- /dev/null +++ b/.sqlx/query-6ce2133a20924d5098fa2e27f897e871955e9eb8a0a6f8b9f23a3f38597793f6.json @@ -0,0 +1,62 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT \n signature,\n allocation_id,\n payer,\n data_service,\n service_provider,\n timestamp_ns,\n nonce,\n value\n FROM tap_horizon_receipts\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 2, + "name": "payer", + "type_info": "Bpchar" + }, + { + "ordinal": 3, + "name": "data_service", + "type_info": "Bpchar" + }, + { + "ordinal": 4, + "name": "service_provider", + "type_info": "Bpchar" + }, + { + "ordinal": 5, + "name": "timestamp_ns", + "type_info": "Numeric" + }, + { + "ordinal": 6, + "name": "nonce", + "type_info": "Numeric" + }, + { + "ordinal": 7, + "name": "value", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "6ce2133a20924d5098fa2e27f897e871955e9eb8a0a6f8b9f23a3f38597793f6" +} diff --git a/.sqlx/query-8b904cecdd8e9c5dd0cd8a0dbc2e8a19c2197f08d576e5cbf978e39432bb3d5a.json b/.sqlx/query-8b904cecdd8e9c5dd0cd8a0dbc2e8a19c2197f08d576e5cbf978e39432bb3d5a.json new file mode 100644 index 00000000..bca31dc4 --- /dev/null +++ b/.sqlx/query-8b904cecdd8e9c5dd0cd8a0dbc2e8a19c2197f08d576e5cbf978e39432bb3d5a.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO tap_horizon_receipts (\n signer_address,\n signature,\n allocation_id,\n payer,\n data_service,\n service_provider,\n timestamp_ns,\n nonce,\n value\n ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bytea", + "Bpchar", + "Bpchar", + "Bpchar", + "Bpchar", + "Numeric", + "Numeric", + "Numeric" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b904cecdd8e9c5dd0cd8a0dbc2e8a19c2197f08d576e5cbf978e39432bb3d5a" +} diff --git a/.sqlx/query-b94514ce9abc8be15ba3b5a67f33ead0d83e409210ce5e711932bcb32888f2bf.json b/.sqlx/query-b94514ce9abc8be15ba3b5a67f33ead0d83e409210ce5e711932bcb32888f2bf.json new file mode 100644 index 00000000..08847c56 --- /dev/null +++ b/.sqlx/query-b94514ce9abc8be15ba3b5a67f33ead0d83e409210ce5e711932bcb32888f2bf.json @@ -0,0 +1,75 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT \n id,\n signature,\n allocation_id,\n payer,\n data_service,\n service_provider,\n timestamp_ns,\n nonce,\n value\n FROM tap_horizon_receipts\n WHERE\n allocation_id = $1\n AND payer = $2\n AND service_provider = $3\n AND signer_address IN (SELECT unnest($4::text[]))\n AND $5::numrange @> timestamp_ns\n ORDER BY timestamp_ns ASC\n LIMIT $6\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 2, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 3, + "name": "payer", + "type_info": "Bpchar" + }, + { + "ordinal": 4, + "name": "data_service", + "type_info": "Bpchar" + }, + { + "ordinal": 5, + "name": "service_provider", + "type_info": "Bpchar" + }, + { + "ordinal": 6, + "name": "timestamp_ns", + "type_info": "Numeric" + }, + { + "ordinal": 7, + "name": "nonce", + "type_info": "Numeric" + }, + { + "ordinal": 8, + "name": "value", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Bpchar", + "TextArray", + "NumRange", + "Int8" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "b94514ce9abc8be15ba3b5a67f33ead0d83e409210ce5e711932bcb32888f2bf" +} diff --git a/.sqlx/query-cd279b9b74e3efdb79ece086b1e6713d2fc766e6553568df8c6ab1d39a5282f6.json b/.sqlx/query-cd279b9b74e3efdb79ece086b1e6713d2fc766e6553568df8c6ab1d39a5282f6.json new file mode 100644 index 00000000..6a50789b --- /dev/null +++ b/.sqlx/query-cd279b9b74e3efdb79ece086b1e6713d2fc766e6553568df8c6ab1d39a5282f6.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM tap_horizon_receipts\n WHERE\n allocation_id = $1\n AND signer_address IN (SELECT unnest($2::text[]))\n AND $3::numrange @> timestamp_ns\n AND payer = $4\n AND service_provider = $5\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "TextArray", + "NumRange", + "Bpchar", + "Bpchar" + ] + }, + "nullable": [] + }, + "hash": "cd279b9b74e3efdb79ece086b1e6713d2fc766e6553568df8c6ab1d39a5282f6" +} diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index b2aba966..4a4e2e14 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -466,6 +466,7 @@ where let context = TapAgentContext::new( pgpool.clone(), allocation_id, + config.indexer_address, sender, escrow_accounts.clone(), ); diff --git a/crates/tap-agent/src/tap/context.rs b/crates/tap-agent/src/tap/context.rs index fba52f59..e1f44e99 100644 --- a/crates/tap-agent/src/tap/context.rs +++ b/crates/tap-agent/src/tap/context.rs @@ -155,6 +155,7 @@ pub struct TapAgentContext { pgpool: PgPool, allocation_id: Address, sender: Address, + indexer_address: Address, escrow_accounts: Receiver, /// We use phantom data as a marker since it's /// only used to define what methods are available @@ -168,12 +169,14 @@ impl TapAgentContext { pub fn new( pgpool: PgPool, allocation_id: Address, + indexer_address: Address, sender: Address, escrow_accounts: Receiver, ) -> Self { Self { pgpool, allocation_id, + indexer_address, sender, escrow_accounts, _phantom: PhantomData, diff --git a/crates/tap-agent/src/tap/context/rav.rs b/crates/tap-agent/src/tap/context/rav.rs index 6260d565..e10d7601 100644 --- a/crates/tap-agent/src/tap/context/rav.rs +++ b/crates/tap-agent/src/tap/context/rav.rs @@ -178,7 +178,7 @@ mod test { use tokio::sync::watch; use super::*; - use crate::test::{create_rav, ALLOCATION_ID_0}; + use crate::test::{create_rav, ALLOCATION_ID_0, INDEXER}; #[derive(Debug)] struct TestableRav(SignedRav); @@ -199,6 +199,7 @@ mod test { let context = TapAgentContext::new( pool.clone(), ALLOCATION_ID_0, + INDEXER.1, SENDER.1, watch::channel(EscrowAccounts::default()).1, ); diff --git a/crates/tap-agent/src/tap/context/receipt.rs b/crates/tap-agent/src/tap/context/receipt.rs index 29de8e8d..bc760c71 100644 --- a/crates/tap-agent/src/tap/context/receipt.rs +++ b/crates/tap-agent/src/tap/context/receipt.rs @@ -208,10 +208,132 @@ impl ReceiptRead for TapAgentContext { async fn retrieve_receipts_in_timestamp_range + Send>( &self, - _timestamp_range_ns: R, - _receipts_limit: Option, + timestamp_range_ns: R, + receipts_limit: Option, ) -> Result, Self::AdapterError> { - unimplemented!() + let receipts_limit = receipts_limit.map_or(1000, |limit| limit); + + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender) + .await + .map_err(|e| AdapterError::ReceiptRead { + error: format!("{:?}.", e), + })?; + + // TODO filter by data_service when we have multiple data services + + let records = sqlx::query!( + r#" + SELECT + id, + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value + FROM tap_horizon_receipts + WHERE + allocation_id = $1 + AND payer = $2 + AND service_provider = $3 + AND signer_address IN (SELECT unnest($4::text[])) + AND $5::numrange @> timestamp_ns + ORDER BY timestamp_ns ASC + LIMIT $6 + "#, + self.allocation_id.encode_hex(), + self.sender.encode_hex(), + self.indexer_address.encode_hex(), + &signers, + rangebounds_to_pgrange(timestamp_range_ns), + (receipts_limit + 1) as i64, + ) + .fetch_all(&self.pgpool) + .await?; + let mut receipts = records + .into_iter() + .map(|record| { + let signature = record.signature.as_slice().try_into() + .map_err(|e| AdapterError::ReceiptRead { + error: format!( + "Error decoding signature while retrieving receipt from database: {}", + e + ), + })?; + let allocation_id = Address::from_str(&record.allocation_id).map_err(|e| { + AdapterError::ReceiptRead { + error: format!( + "Error decoding allocation_id while retrieving receipt from database: {}", + e + ), + } + })?; + let payer = Address::from_str(&record.payer).map_err(|e| { + AdapterError::ReceiptRead { + error: format!( + "Error decoding payer while retrieving receipt from database: {}", + e + ), + } + })?; + + let data_service = Address::from_str(&record.data_service).map_err(|e| { + AdapterError::ReceiptRead { + error: format!( + "Error decoding data_service while retrieving receipt from database: {}", + e + ), + } + })?; + + let service_provider = Address::from_str(&record.service_provider).map_err(|e| { + AdapterError::ReceiptRead { + error: format!( + "Error decoding service_provider while retrieving receipt from database: {}", + e + ), + } + })?; + + let timestamp_ns = record + .timestamp_ns + .to_u64() + .ok_or(AdapterError::ReceiptRead { + error: "Error decoding timestamp_ns while retrieving receipt from database" + .to_string(), + })?; + let nonce = record.nonce.to_u64().ok_or(AdapterError::ReceiptRead { + error: "Error decoding nonce while retrieving receipt from database".to_string(), + })?; + // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood... + // So we're converting to BigInt to get a proper implementation of to_u128(). + let value = record.value.to_bigint().and_then(|v| v.to_u128()).ok_or(AdapterError::ReceiptRead { + error: "Error decoding value while retrieving receipt from database".to_string(), + })?; + + let signed_receipt = tap_graph::v2::SignedReceipt { + message: tap_graph::v2::Receipt { + payer, + data_service, + service_provider, + allocation_id, + timestamp_ns, + nonce, + value, + }, + signature, + }; + + Ok(CheckingReceipt::new(TapReceipt::V2(signed_receipt))) + + }) + .collect::, AdapterError>>()?; + + safe_truncate_receipts(&mut receipts, receipts_limit); + + Ok(receipts) } } @@ -226,9 +348,33 @@ impl ReceiptDelete for TapAgentContext { async fn remove_receipts_in_timestamp_range + Send>( &self, - _timestamp_ns: R, + timestamp_ns: R, ) -> Result<(), Self::AdapterError> { - unimplemented!() + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender) + .await + .map_err(|e| AdapterError::ReceiptDelete { + error: format!("{:?}.", e), + })?; + + sqlx::query!( + r#" + DELETE FROM tap_horizon_receipts + WHERE + allocation_id = $1 + AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns + AND payer = $4 + AND service_provider = $5 + "#, + self.allocation_id.encode_hex(), + &signers, + rangebounds_to_pgrange(timestamp_ns), + self.sender.encode_hex(), + self.indexer_address.encode_hex(), + ) + .execute(&self.pgpool) + .await?; + Ok(()) } } @@ -243,6 +389,7 @@ mod test { use bigdecimal::{num_bigint::ToBigInt, ToPrimitive}; use indexer_monitor::EscrowAccounts; use lazy_static::lazy_static; + use rstest::{fixture, rstest}; use sqlx::PgPool; use tap_core::{ manager::adapters::{ReceiptDelete, ReceiptRead}, @@ -259,7 +406,7 @@ mod test { use tokio::sync::watch::{self, Receiver}; use super::*; - use crate::test::{create_received_receipt, store_receipt, SENDER_2}; + use crate::test::{store_receipt, CreateReceipt, INDEXER, SENDER_2}; const ALLOCATION_ID_IRRELEVANT: Address = ALLOCATION_ID_1; @@ -267,32 +414,66 @@ mod test { static ref SENDER_IRRELEVANT: (PrivateKeySigner, Address) = SENDER_2.clone(); } - /// Insert a single receipt and retrieve it from the database using the adapter. - /// The point here it to test the deserialization of large numbers. - #[sqlx::test(migrations = "../../migrations")] - async fn insert_and_retrieve_single_receipt(pgpool: PgPool) { - let escrow_accounts = watch::channel(EscrowAccounts::new( + #[fixture] + fn escrow_accounts() -> Receiver { + watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), )) - .1; + .1 + } - let storage_adapter = TapAgentContext::::new( + async fn legacy_adapter( + pgpool: PgPool, + escrow_accounts: Receiver, + ) -> TapAgentContext { + TapAgentContext::new( + pgpool.clone(), + ALLOCATION_ID_0, + INDEXER.1, + SENDER.1, + escrow_accounts, + ) + } + + async fn horizon_adapter( + pgpool: PgPool, + escrow_accounts: Receiver, + ) -> TapAgentContext { + TapAgentContext::new( pgpool, ALLOCATION_ID_0, + INDEXER.1, SENDER.1, - escrow_accounts.clone(), - ); + escrow_accounts, + ) + } + /// Insert a single receipt and retrieve it from the database using the adapter. + /// The point here it to test the deserialization of large numbers. + #[rstest] + #[case(legacy_adapter(_pgpool.clone(), _escrow.clone()))] + #[case(horizon_adapter(_pgpool.clone(), _escrow.clone()))] + #[sqlx::test(migrations = "../../migrations")] + async fn insert_and_retrieve_single_receipt( + #[ignore] _pgpool: PgPool, + #[from(escrow_accounts)] _escrow: Receiver, + #[case] + #[future(awt)] + context: TapAgentContext, + ) where + T: CreateReceipt, + TapAgentContext: ReceiptRead + ReceiptDelete, + { let received_receipt = - create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, u64::MAX, u64::MAX, u128::MAX); + T::create_received_receipt(ALLOCATION_ID_0, &SIGNER.0, u64::MAX, u64::MAX, u128::MAX); // Storing the receipt - store_receipt(&storage_adapter.pgpool, received_receipt.signed_receipt()) + store_receipt(&context.pgpool, received_receipt.signed_receipt()) .await .unwrap(); - let retrieved_receipt = storage_adapter + let retrieved_receipt = context .retrieve_receipts_in_timestamp_range(.., None) .await .unwrap()[0] @@ -359,171 +540,322 @@ mod test { Ok(()) } - async fn remove_range_and_check + Send, T>( - storage_adapter: &TapAgentContext, - escrow_accounts: Receiver, - received_receipt_vec: &[CheckingReceipt], - range: R, - ) -> anyhow::Result<()> - where - TapAgentContext: ReceiptDelete, - { - let escrow_accounts_snapshot = escrow_accounts.borrow(); - - // Storing the receipts - let mut received_receipt_id_vec = Vec::new(); - for received_receipt in received_receipt_vec.iter() { - received_receipt_id_vec.push( - store_receipt(&storage_adapter.pgpool, received_receipt.signed_receipt()) - .await - .unwrap(), - ); - } - - // zip the 2 vectors together - let received_receipt_vec = received_receipt_id_vec - .into_iter() - .zip(received_receipt_vec.iter()) - .collect::>(); + trait RemoveRange: Sized { + async fn remove_range_and_check + Send>( + storage_adapter: &TapAgentContext, + escrow_accounts: Receiver, + received_receipt_vec: &[CheckingReceipt], + range: R, + ) -> anyhow::Result<()>; + } - // Remove the received receipts by timestamp range for the correct (allocation_id, - // sender) - let received_receipt_vec: Vec<_> = received_receipt_vec - .iter() - .filter(|(_, received_receipt)| { - if (received_receipt.signed_receipt().allocation_id() - == storage_adapter.allocation_id) - && escrow_accounts_snapshot - .get_sender_for_signer( - &received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap(), - ) - .is_ok_and(|v| v == storage_adapter.sender) - { - !range.contains(&received_receipt.signed_receipt().timestamp_ns()) - } else { - true - } - // !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) - }) - .cloned() - .collect(); + impl RemoveRange for Horizon { + async fn remove_range_and_check + Send>( + storage_adapter: &TapAgentContext, + escrow_accounts: Receiver, + received_receipt_vec: &[CheckingReceipt], + range: R, + ) -> anyhow::Result<()> { + let escrow_accounts_snapshot = escrow_accounts.borrow(); + + // Storing the receipts + let mut received_receipt_id_vec = Vec::new(); + for received_receipt in received_receipt_vec.iter() { + received_receipt_id_vec.push( + store_receipt(&storage_adapter.pgpool, received_receipt.signed_receipt()) + .await + .unwrap(), + ); + } + + // zip the 2 vectors together + let received_receipt_vec = received_receipt_id_vec + .into_iter() + .zip(received_receipt_vec.iter()) + .collect::>(); + + // Remove the received receipts by timestamp range for the correct (allocation_id, + // sender) + let received_receipt_vec: Vec<_> = received_receipt_vec + .iter() + .filter(|(_, received_receipt)| { + if (received_receipt.signed_receipt().allocation_id() + == storage_adapter.allocation_id) + && escrow_accounts_snapshot + .get_sender_for_signer( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .is_ok_and(|v| v == storage_adapter.sender) + { + !range.contains(&received_receipt.signed_receipt().timestamp_ns()) + } else { + true + } + // !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) + }) + .cloned() + .collect(); + + // Removing the received receipts in timestamp range from the database + storage_adapter + .remove_receipts_in_timestamp_range(range) + .await?; + + // Retrieving all receipts in DB (including irrelevant ones) + let records = sqlx::query!( + r#" + SELECT + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value + FROM tap_horizon_receipts + "# + ) + .fetch_all(&storage_adapter.pgpool) + .await?; - // Removing the received receipts in timestamp range from the database - storage_adapter - .remove_receipts_in_timestamp_range(range) + // Check length + assert_eq!(records.len(), received_receipt_vec.len()); + + // Retrieving all receipts in DB (including irrelevant ones) + let recovered_received_receipt_set: Vec<_> = records + .into_iter() + .map(|record| { + let signature = record.signature.as_slice().try_into().unwrap(); + let allocation_id = Address::from_str(&record.allocation_id).unwrap(); + let payer = Address::from_str(&record.payer).unwrap(); + let data_service = Address::from_str(&record.data_service).unwrap(); + let service_provider = Address::from_str(&record.service_provider).unwrap(); + let timestamp_ns = record.timestamp_ns.to_u64().unwrap(); + let nonce = record.nonce.to_u64().unwrap(); + // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood... + // So we're converting to BigInt to get a proper implementation of to_u128(). + let value = record + .value + .to_bigint() + .map(|v| v.to_u128()) + .unwrap() + .unwrap(); + + let signed_receipt = tap_graph::v2::SignedReceipt { + message: tap_graph::v2::Receipt { + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value, + }, + signature, + }; + signed_receipt.unique_id() + }) + .collect(); + + // Check values recovered_received_receipt_set contains values received_receipt_vec + assert!(received_receipt_vec.iter().all(|(_, received_receipt)| { + recovered_received_receipt_set + .contains(&received_receipt.signed_receipt().unique_id()) + })); + + // Removing all the receipts in the DB + sqlx::query!( + r#" + DELETE FROM tap_horizon_receipts + "# + ) + .execute(&storage_adapter.pgpool) .await?; - // Retrieving all receipts in DB (including irrelevant ones) - let records = sqlx::query!( - r#" + // Checking that there are no receipts left + let scalar_tap_receipts_db_count: i64 = sqlx::query!( + r#" + SELECT count(*) + FROM tap_horizon_receipts + "# + ) + .fetch_one(&storage_adapter.pgpool) + .await? + .count + .unwrap(); + assert_eq!(scalar_tap_receipts_db_count, 0); + Ok(()) + } + } + + impl RemoveRange for Legacy { + async fn remove_range_and_check + Send>( + storage_adapter: &TapAgentContext, + escrow_accounts: Receiver, + received_receipt_vec: &[CheckingReceipt], + range: R, + ) -> anyhow::Result<()> { + let escrow_accounts_snapshot = escrow_accounts.borrow(); + + // Storing the receipts + let mut received_receipt_id_vec = Vec::new(); + for received_receipt in received_receipt_vec.iter() { + received_receipt_id_vec.push( + store_receipt(&storage_adapter.pgpool, received_receipt.signed_receipt()) + .await + .unwrap(), + ); + } + + // zip the 2 vectors together + let received_receipt_vec = received_receipt_id_vec + .into_iter() + .zip(received_receipt_vec.iter()) + .collect::>(); + + // Remove the received receipts by timestamp range for the correct (allocation_id, + // sender) + let received_receipt_vec: Vec<_> = received_receipt_vec + .iter() + .filter(|(_, received_receipt)| { + if (received_receipt.signed_receipt().allocation_id() + == storage_adapter.allocation_id) + && escrow_accounts_snapshot + .get_sender_for_signer( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .is_ok_and(|v| v == storage_adapter.sender) + { + !range.contains(&received_receipt.signed_receipt().timestamp_ns()) + } else { + true + } + // !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) + }) + .cloned() + .collect(); + + // Removing the received receipts in timestamp range from the database + storage_adapter + .remove_receipts_in_timestamp_range(range) + .await?; + + // Retrieving all receipts in DB (including irrelevant ones) + let records = sqlx::query!( + r#" SELECT signature, allocation_id, timestamp_ns, nonce, value FROM scalar_tap_receipts "# - ) - .fetch_all(&storage_adapter.pgpool) - .await?; - - // Check length - assert_eq!(records.len(), received_receipt_vec.len()); - - // Retrieving all receipts in DB (including irrelevant ones) - let recovered_received_receipt_set: Vec<_> = records - .into_iter() - .map(|record| { - let signature = record.signature.as_slice().try_into().unwrap(); - let allocation_id = Address::from_str(&record.allocation_id).unwrap(); - let timestamp_ns = record.timestamp_ns.to_u64().unwrap(); - let nonce = record.nonce.to_u64().unwrap(); - // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood... - // So we're converting to BigInt to get a proper implementation of to_u128(). - let value = record - .value - .to_bigint() - .map(|v| v.to_u128()) - .unwrap() - .unwrap(); - - let signed_receipt = SignedReceipt { - message: Receipt { - allocation_id, - timestamp_ns, - nonce, - value, - }, - signature, - }; - signed_receipt.unique_id() - }) - .collect(); - - // Check values recovered_received_receipt_set contains values received_receipt_vec - assert!(received_receipt_vec.iter().all(|(_, received_receipt)| { - recovered_received_receipt_set.contains(&received_receipt.signed_receipt().unique_id()) - })); + ) + .fetch_all(&storage_adapter.pgpool) + .await?; - // Removing all the receipts in the DB - sqlx::query!( - r#" + // Check length + assert_eq!(records.len(), received_receipt_vec.len()); + + // Retrieving all receipts in DB (including irrelevant ones) + let recovered_received_receipt_set: Vec<_> = records + .into_iter() + .map(|record| { + let signature = record.signature.as_slice().try_into().unwrap(); + let allocation_id = Address::from_str(&record.allocation_id).unwrap(); + let timestamp_ns = record.timestamp_ns.to_u64().unwrap(); + let nonce = record.nonce.to_u64().unwrap(); + // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood... + // So we're converting to BigInt to get a proper implementation of to_u128(). + let value = record + .value + .to_bigint() + .map(|v| v.to_u128()) + .unwrap() + .unwrap(); + + let signed_receipt = SignedReceipt { + message: Receipt { + allocation_id, + timestamp_ns, + nonce, + value, + }, + signature, + }; + signed_receipt.unique_id() + }) + .collect(); + + // Check values recovered_received_receipt_set contains values received_receipt_vec + assert!(received_receipt_vec.iter().all(|(_, received_receipt)| { + recovered_received_receipt_set + .contains(&received_receipt.signed_receipt().unique_id()) + })); + + // Removing all the receipts in the DB + sqlx::query!( + r#" DELETE FROM scalar_tap_receipts "# - ) - .execute(&storage_adapter.pgpool) - .await?; + ) + .execute(&storage_adapter.pgpool) + .await?; - // Checking that there are no receipts left - let scalar_tap_receipts_db_count: i64 = sqlx::query!( - r#" + // Checking that there are no receipts left + let scalar_tap_receipts_db_count: i64 = sqlx::query!( + r#" SELECT count(*) FROM scalar_tap_receipts "# - ) - .fetch_one(&storage_adapter.pgpool) - .await? - .count - .unwrap(); - assert_eq!(scalar_tap_receipts_db_count, 0); - Ok(()) + ) + .fetch_one(&storage_adapter.pgpool) + .await? + .count + .unwrap(); + assert_eq!(scalar_tap_receipts_db_count, 0); + Ok(()) + } } + #[rstest] + #[case(legacy_adapter(_pgpool.clone(), _escrow.clone()))] + #[case(horizon_adapter(_pgpool.clone(), _escrow.clone()))] #[sqlx::test(migrations = "../../migrations")] - async fn retrieve_receipts_with_limit(pgpool: PgPool) { - let escrow_accounts = watch::channel(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(1000))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )) - .1; - - let storage_adapter = TapAgentContext::::new( - pgpool.clone(), - ALLOCATION_ID_0, - SENDER.1, - escrow_accounts.clone(), - ); - + async fn retrieve_receipts_with_limit( + #[ignore] _pgpool: PgPool, + #[from(escrow_accounts)] _escrow: Receiver, + #[case] + #[future(awt)] + context: TapAgentContext, + ) where + T: CreateReceipt, + TapAgentContext: ReceiptRead + ReceiptDelete, + { // Creating 100 receipts with timestamps 42 to 141 for i in 0..100 { - let receipt = create_received_receipt( - &ALLOCATION_ID_0, + let receipt = T::create_received_receipt( + ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, (i + 124).into(), ); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&context.pgpool, receipt.signed_receipt()) .await .unwrap(); } - let recovered_received_receipt_vec = storage_adapter + let recovered_received_receipt_vec = context .retrieve_receipts_in_timestamp_range(0..141, Some(10)) .await .unwrap(); assert_eq!(recovered_received_receipt_vec.len(), 10); - let recovered_received_receipt_vec = storage_adapter + let recovered_received_receipt_vec = context .retrieve_receipts_in_timestamp_range(0..141, Some(50)) .await .unwrap(); @@ -531,51 +863,50 @@ mod test { // add a copy in the same timestamp for i in 0..100 { - let receipt = create_received_receipt( - &ALLOCATION_ID_0, + let receipt = T::create_received_receipt( + ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 43, (i + 124).into(), ); - store_receipt(&pgpool, receipt.signed_receipt()) + store_receipt(&context.pgpool, receipt.signed_receipt()) .await .unwrap(); } - let recovered_received_receipt_vec = storage_adapter + let recovered_received_receipt_vec = context .retrieve_receipts_in_timestamp_range(0..141, Some(10)) .await .unwrap(); assert_eq!(recovered_received_receipt_vec.len(), 9); - let recovered_received_receipt_vec = storage_adapter + let recovered_received_receipt_vec = context .retrieve_receipts_in_timestamp_range(0..141, Some(50)) .await .unwrap(); assert_eq!(recovered_received_receipt_vec.len(), 49); } + #[rstest] + #[case(legacy_adapter(pgpool.clone(), escrow_accounts.clone()))] + #[case(horizon_adapter(pgpool.clone(), escrow_accounts.clone()))] #[sqlx::test(migrations = "../../migrations")] - async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts = watch::channel(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(1000))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )) - .1; - - let storage_adapter = TapAgentContext::::new( - pgpool.clone(), - ALLOCATION_ID_0, - SENDER.1, - escrow_accounts.clone(), - ); - + async fn retrieve_receipts_in_timestamp_range( + #[ignore] pgpool: PgPool, + #[from(escrow_accounts)] escrow_accounts: Receiver, + #[case] + #[future(awt)] + context: TapAgentContext, + ) where + T: CreateReceipt, + TapAgentContext: ReceiptRead + ReceiptDelete, + { // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); for i in 0..10 { - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_0, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -583,15 +914,15 @@ mod test { )); // Adding irrelevant receipts to make sure they are not retrieved - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_IRRELEVANT, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_IRRELEVANT, &SIGNER.0, i + 684, i + 42, (i + 124).into(), )); - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_0, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, @@ -616,7 +947,7 @@ mod test { { $( assert!( - retrieve_range_and_check(&storage_adapter, escrow_accounts.clone(), &received_receipt_vec, $arg) + retrieve_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) .await .is_ok()); )+ @@ -684,26 +1015,25 @@ mod test { } } + #[rstest] + #[case(legacy_adapter(_pgpool.clone(), escrow_accounts.clone()))] + #[case(horizon_adapter(_pgpool.clone(), escrow_accounts.clone()))] #[sqlx::test(migrations = "../../migrations")] - async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts = watch::channel(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(1000))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )) - .1; - - let storage_adapter = TapAgentContext::::new( - pgpool, - ALLOCATION_ID_0, - SENDER.1, - escrow_accounts.clone(), - ); - + async fn remove_receipts_in_timestamp_range( + #[ignore] _pgpool: PgPool, + #[from(escrow_accounts)] escrow_accounts: Receiver, + #[case] + #[future(awt)] + context: TapAgentContext, + ) where + T: CreateReceipt + RemoveRange, + TapAgentContext: ReceiptRead + ReceiptDelete, + { // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); for i in 0..10 { - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_0, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_0, &SIGNER.0, i + 684, i + 42, @@ -711,15 +1041,15 @@ mod test { )); // Adding irrelevant receipts to make sure they are not retrieved - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_IRRELEVANT, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_IRRELEVANT, &SIGNER.0, i + 684, i + 42, (i + 124).into(), )); - received_receipt_vec.push(create_received_receipt( - &ALLOCATION_ID_0, + received_receipt_vec.push(T::create_received_receipt( + ALLOCATION_ID_0, &SENDER_IRRELEVANT.0, i + 684, i + 42, @@ -732,7 +1062,7 @@ mod test { { $( assert!( - remove_range_and_check(&storage_adapter, escrow_accounts.clone(), &received_receipt_vec, $arg) + T::remove_range_and_check(&context, escrow_accounts.clone(), &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index e24b997e..2a301224 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -23,13 +23,13 @@ use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedRav, SignedReceipt}; use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::alloy::{ - primitives::{address, hex::ToHexExt, Address, U256}, + primitives::{hex::ToHexExt, Address, U256}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, sol_types::Eip712Domain, }; -pub const ALLOCATION_ID_0: Address = address!("abababababababababababababababababababab"); -pub const ALLOCATION_ID_1: Address = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); +pub const ALLOCATION_ID_0: Address = test_assets::ALLOCATION_ID_0; +pub const ALLOCATION_ID_1: Address = test_assets::ALLOCATION_ID_1; use tokio::sync::{ watch::{self, Sender}, Notify, @@ -46,7 +46,10 @@ use crate::{ SenderAccountsManagerMessage, }, }, - tap::{context::AdapterError, CheckingReceipt}, + tap::{ + context::{AdapterError, Horizon, Legacy}, + CheckingReceipt, + }, }; lazy_static! { @@ -260,6 +263,69 @@ pub fn create_rav( .unwrap() } +/// Generic implementation of create_received_receipt +pub trait CreateReceipt { + /// This might seem weird at first glance since [Horizon] and [Legacy] implementation have the same + /// function signature and don't require &self. The reason is that we can not match over T to get + /// all variants because T is a trait and not an enum. + fn create_received_receipt( + allocation_id: Address, + signer_wallet: &PrivateKeySigner, + nonce: u64, + timestamp_ns: u64, + value: u128, + ) -> CheckingReceipt; +} + +impl CreateReceipt for Horizon { + fn create_received_receipt( + allocation_id: Address, + signer_wallet: &PrivateKeySigner, + nonce: u64, + timestamp_ns: u64, + value: u128, + ) -> CheckingReceipt { + let receipt = Eip712SignedMessage::new( + &TAP_EIP712_DOMAIN_SEPARATOR, + tap_graph::v2::Receipt { + allocation_id, + payer: SENDER.1, + service_provider: INDEXER.1, + data_service: Address::ZERO, + nonce, + timestamp_ns, + value, + }, + signer_wallet, + ) + .unwrap(); + CheckingReceipt::new(indexer_receipt::TapReceipt::V2(receipt)) + } +} + +impl CreateReceipt for Legacy { + fn create_received_receipt( + allocation_id: Address, + signer_wallet: &PrivateKeySigner, + nonce: u64, + timestamp_ns: u64, + value: u128, + ) -> CheckingReceipt { + let receipt = Eip712SignedMessage::new( + &TAP_EIP712_DOMAIN_SEPARATOR, + Receipt { + allocation_id, + nonce, + timestamp_ns, + value, + }, + signer_wallet, + ) + .unwrap(); + CheckingReceipt::new(indexer_receipt::TapReceipt::V1(receipt)) + } +} + /// Fixture to generate a signed receipt using the wallet from `keys()` and the /// given `query_id` and `value` pub fn create_received_receipt( @@ -286,7 +352,7 @@ pub fn create_received_receipt( pub async fn store_receipt(pgpool: &PgPool, signed_receipt: &TapReceipt) -> anyhow::Result { match signed_receipt { TapReceipt::V1(signed_receipt) => store_receipt_v1(pgpool, signed_receipt).await, - TapReceipt::V2(_) => unimplemented!("V2 not supported"), + TapReceipt::V2(signed_receipt) => store_receipt_v2(pgpool, signed_receipt).await, } } @@ -296,18 +362,64 @@ pub async fn store_receipt_v1( ) -> anyhow::Result { let encoded_signature = signed_receipt.signature.as_bytes().to_vec(); + let signer = signed_receipt + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap() + .encode_hex(); + let record = sqlx::query!( r#" INSERT INTO scalar_tap_receipts (signer_address, signature, allocation_id, timestamp_ns, nonce, value) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id "#, - signed_receipt - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - .encode_hex(), + signer, + encoded_signature, + signed_receipt.message.allocation_id.encode_hex(), + BigDecimal::from(signed_receipt.message.timestamp_ns), + BigDecimal::from(signed_receipt.message.nonce), + BigDecimal::from(BigInt::from(signed_receipt.message.value)), + ) + .fetch_one(pgpool) + .await?; + + // id is BIGSERIAL, so it should be safe to cast to u64. + let id: u64 = record.id.try_into()?; + Ok(id) +} + +pub async fn store_receipt_v2( + pgpool: &PgPool, + signed_receipt: &tap_graph::v2::SignedReceipt, +) -> anyhow::Result { + let encoded_signature = signed_receipt.signature.as_bytes().to_vec(); + + let signer = signed_receipt + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap() + .encode_hex(); + + let record = sqlx::query!( + r#" + INSERT INTO tap_horizon_receipts ( + signer_address, + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING id + "#, + signer, encoded_signature, signed_receipt.message.allocation_id.encode_hex(), + signed_receipt.message.payer.encode_hex(), + signed_receipt.message.data_service.encode_hex(), + signed_receipt.message.service_provider.encode_hex(), BigDecimal::from(signed_receipt.message.timestamp_ns), BigDecimal::from(signed_receipt.message.nonce), BigDecimal::from(BigInt::from(signed_receipt.message.value)),