From 7dbb32f074935e3c98213c895ab294da58ff84ab Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Mon, 10 Feb 2025 17:30:17 +0100 Subject: [PATCH] refactor: implement rav traits for horizon Signed-off-by: Gustavo Inacio --- ...7f0539d2ffab1289130555dba0f6fdc3065ff.json | 22 ++ ...9872832048b338237c786727a6c9173e23b38.json | 66 +++++ crates/tap-agent/src/tap/context.rs | 2 + crates/tap-agent/src/tap/context/rav.rs | 227 ++++++++++++++++-- crates/tap-agent/src/test.rs | 62 ++++- .../20250210152412_tap_horizon_ravs.down.sql | 3 + .../20250210152412_tap_horizon_ravs.up.sql | 33 +++ 7 files changed, 390 insertions(+), 25 deletions(-) create mode 100644 .sqlx/query-5ee76e76f4ace2dbab69fd58eaf7f0539d2ffab1289130555dba0f6fdc3065ff.json create mode 100644 .sqlx/query-f02805d423945f35b93dde58db49872832048b338237c786727a6c9173e23b38.json create mode 100644 migrations/20250210152412_tap_horizon_ravs.down.sql create mode 100644 migrations/20250210152412_tap_horizon_ravs.up.sql diff --git a/.sqlx/query-5ee76e76f4ace2dbab69fd58eaf7f0539d2ffab1289130555dba0f6fdc3065ff.json b/.sqlx/query-5ee76e76f4ace2dbab69fd58eaf7f0539d2ffab1289130555dba0f6fdc3065ff.json new file mode 100644 index 00000000..5b1b624d --- /dev/null +++ b/.sqlx/query-5ee76e76f4ace2dbab69fd58eaf7f0539d2ffab1289130555dba0f6fdc3065ff.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO tap_horizon_ravs (\n payer,\n data_service,\n service_provider,\n metadata,\n signature,\n allocation_id,\n timestamp_ns,\n value_aggregate,\n created_at,\n updated_at\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)\n ON CONFLICT (payer, data_service, service_provider, allocation_id)\n DO UPDATE SET\n signature = $5,\n timestamp_ns = $7,\n value_aggregate = $8,\n updated_at = $9,\n metadata = $4\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Bpchar", + "Bytea", + "Bytea", + "Bpchar", + "Numeric", + "Numeric", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "5ee76e76f4ace2dbab69fd58eaf7f0539d2ffab1289130555dba0f6fdc3065ff" +} diff --git a/.sqlx/query-f02805d423945f35b93dde58db49872832048b338237c786727a6c9173e23b38.json b/.sqlx/query-f02805d423945f35b93dde58db49872832048b338237c786727a6c9173e23b38.json new file mode 100644 index 00000000..b6cf1c1c --- /dev/null +++ b/.sqlx/query-f02805d423945f35b93dde58db49872832048b338237c786727a6c9173e23b38.json @@ -0,0 +1,66 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT \n signature,\n allocation_id,\n payer,\n data_service,\n service_provider,\n timestamp_ns,\n value_aggregate,\n metadata\n FROM tap_horizon_ravs\n WHERE \n allocation_id = $1 \n AND payer = $2\n AND service_provider = $3\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "signature", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "allocation_id", + "type_info": "Bpchar" + }, + { + "ordinal": 2, + "name": "payer", + "type_info": "Bpchar" + }, + { + "ordinal": 3, + "name": "data_service", + "type_info": "Bpchar" + }, + { + "ordinal": 4, + "name": "service_provider", + "type_info": "Bpchar" + }, + { + "ordinal": 5, + "name": "timestamp_ns", + "type_info": "Numeric" + }, + { + "ordinal": 6, + "name": "value_aggregate", + "type_info": "Numeric" + }, + { + "ordinal": 7, + "name": "metadata", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar", + "Bpchar" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "f02805d423945f35b93dde58db49872832048b338237c786727a6c9173e23b38" +} diff --git a/crates/tap-agent/src/tap/context.rs b/crates/tap-agent/src/tap/context.rs index e1f44e99..c747a921 100644 --- a/crates/tap-agent/src/tap/context.rs +++ b/crates/tap-agent/src/tap/context.rs @@ -70,6 +70,7 @@ pub trait NetworkVersion: Send + Sync + 'static { /// /// A simple `struct Legacy;` would be able to instantiate and pass as /// value, while having size 1. +#[derive(Debug)] pub enum Legacy {} /// 0-sized marker for horizon network /// @@ -79,6 +80,7 @@ pub enum Legacy {} /// /// A simple `struct Legacy;` would be able to instantiate and pass as /// value, while having size 1. +#[derive(Debug)] pub enum Horizon {} impl NetworkVersion for Legacy { diff --git a/crates/tap-agent/src/tap/context/rav.rs b/crates/tap-agent/src/tap/context/rav.rs index e10d7601..15dadcb1 100644 --- a/crates/tap-agent/src/tap/context/rav.rs +++ b/crates/tap-agent/src/tap/context/rav.rs @@ -12,7 +12,10 @@ use tap_core::manager::adapters::{RavRead, RavStore}; use tap_graph::{ReceiptAggregateVoucher, SignedRav}; #[allow(deprecated)] use thegraph_core::alloy::signers::Signature; -use thegraph_core::alloy::{hex::ToHexExt, primitives::Address}; +use thegraph_core::alloy::{ + hex::ToHexExt, + primitives::{Address, Bytes}, +}; use super::{error::AdapterError, Horizon, Legacy, TapAgentContext}; @@ -149,7 +152,113 @@ impl RavRead for TapAgentContext Result, Self::AdapterError> { - unimplemented!() + // TODO add data service filter + let row = sqlx::query!( + r#" + SELECT + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + value_aggregate, + metadata + FROM tap_horizon_ravs + WHERE + allocation_id = $1 + AND payer = $2 + AND service_provider = $3 + "#, + self.allocation_id.encode_hex(), + self.sender.encode_hex(), + self.indexer_address.encode_hex() + ) + .fetch_optional(&self.pgpool) + .await + .map_err(|e| AdapterError::RavRead { + error: e.to_string(), + })?; + + match row { + Some(row) => { + #[allow(deprecated)] + let signature: Signature = + row.signature + .as_slice() + .try_into() + .map_err(|e| AdapterError::RavRead { + error: format!( + "Error decoding signature while retrieving RAV from database: {}", + e + ), + })?; + let allocation_id = + Address::from_str(&row.allocation_id).map_err(|e| AdapterError::RavRead { + error: format!( + "Error decoding allocation_id while retrieving RAV from database: {}", + e + ), + })?; + + let payer = Address::from_str(&row.payer).map_err(|e| AdapterError::RavRead { + error: format!( + "Error decoding payer while retrieving receipt from database: {}", + e + ), + })?; + + let data_service = Address::from_str(&row.data_service).map_err(|e| { + AdapterError::RavRead { + error: format!( + "Error decoding data_service while retrieving receipt from database: {}", + e + ), + } + })?; + + let service_provider = Address::from_str(&row.service_provider).map_err(|e| { + AdapterError::RavRead { + error: format!( + "Error decoding service_provider while retrieving receipt from database: {}", + e + ), + } + })?; + + let metadata = Bytes::from(row.metadata); + + let timestamp_ns = row.timestamp_ns.to_u64().ok_or(AdapterError::RavRead { + error: "Error decoding timestamp_ns while retrieving RAV from database" + .to_string(), + })?; + let value_aggregate = row + .value_aggregate + // Beware, BigDecimal::to_u128() actually uses to_u64() under the hood. + // So we're converting to BigInt to get a proper implementation of to_u128(). + .to_bigint() + .and_then(|v| v.to_u128()) + .ok_or(AdapterError::RavRead { + error: "Error decoding value_aggregate while retrieving RAV from database" + .to_string(), + })?; + + let rav = tap_graph::v2::ReceiptAggregateVoucher { + allocationId: allocation_id, + timestampNs: timestamp_ns, + valueAggregate: value_aggregate, + dataService: data_service, + serviceProvider: service_provider, + payer, + metadata, + }; + Ok(Some(tap_graph::v2::SignedRav { + message: rav, + signature, + })) + } + None => Ok(None), + } } } @@ -164,52 +273,124 @@ impl RavStore for TapAgentContext Result<(), Self::AdapterError> { - unimplemented!() + let signature_bytes: Vec = rav.signature.as_bytes().to_vec(); + + let _fut = sqlx::query!( + r#" + INSERT INTO tap_horizon_ravs ( + payer, + data_service, + service_provider, + metadata, + signature, + allocation_id, + timestamp_ns, + value_aggregate, + created_at, + updated_at + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9) + ON CONFLICT (payer, data_service, service_provider, allocation_id) + DO UPDATE SET + signature = $5, + timestamp_ns = $7, + value_aggregate = $8, + updated_at = $9, + metadata = $4 + "#, + rav.message.payer.encode_hex(), + rav.message.dataService.encode_hex(), + rav.message.serviceProvider.encode_hex(), + rav.message.metadata.as_ref(), + signature_bytes, + rav.message.allocationId.encode_hex(), + BigDecimal::from(rav.message.timestampNs), + BigDecimal::from(BigInt::from(rav.message.valueAggregate)), + chrono::Utc::now() + ) + .execute(&self.pgpool) + .await + .map_err(|e| AdapterError::RavStore { + error: e.to_string(), + })?; + Ok(()) } } #[cfg(test)] mod test { + use indexer_monitor::EscrowAccounts; + use rstest::rstest; use sqlx::PgPool; + use tap_core::signed_message::Eip712SignedMessage; use test_assets::{TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use tokio::sync::watch; use super::*; - use crate::test::{create_rav, ALLOCATION_ID_0, INDEXER}; + use crate::{ + tap::context::NetworkVersion, + test::{CreateRav, ALLOCATION_ID_0, INDEXER}, + }; #[derive(Debug)] - struct TestableRav(SignedRav); + struct TestableRav(Eip712SignedMessage); - impl Eq for TestableRav {} + impl Eq for TestableRav {} - impl PartialEq for TestableRav { + impl PartialEq for TestableRav { fn eq(&self, other: &Self) -> bool { self.0.message == other.0.message && self.0.signature.as_bytes() == other.0.signature.as_bytes() } } - #[sqlx::test(migrations = "../../migrations")] - async fn update_and_retrieve_rav(pool: PgPool) { - let timestamp_ns = u64::MAX - 10; - let value_aggregate = u128::MAX; - let context = TapAgentContext::new( - pool.clone(), + const TIMESTAMP_NS: u64 = u64::MAX - 10; + const VALUE_AGGREGATE: u128 = u128::MAX; + + async fn legacy_adapter(pgpool: PgPool) -> TapAgentContext { + TapAgentContext::new( + pgpool, ALLOCATION_ID_0, INDEXER.1, SENDER.1, watch::channel(EscrowAccounts::default()).1, - ); + ) + } + + async fn horizon_adapter(pgpool: PgPool) -> TapAgentContext { + TapAgentContext::new( + pgpool, + ALLOCATION_ID_0, + INDEXER.1, + SENDER.1, + watch::channel(EscrowAccounts::default()).1, + ) + } + /// Insert a single receipt and retrieve it from the database using the adapter. + /// The point here it to test the deserialization of large numbers. + #[rstest] + #[case(legacy_adapter(_pgpool.clone()))] + #[case(horizon_adapter(_pgpool.clone()))] + #[sqlx::test(migrations = "../../migrations")] + async fn update_and_retrieve_rav( + #[ignore] _pgpool: PgPool, + #[case] + #[future(awt)] + context: TapAgentContext, + ) where + T: CreateRav + std::fmt::Debug, + TapAgentContext: RavRead + RavStore, + { // Insert a rav - let mut new_rav = create_rav( + let mut new_rav = T::create_rav( ALLOCATION_ID_0, SIGNER.0.clone(), - timestamp_ns, - value_aggregate, + TIMESTAMP_NS, + VALUE_AGGREGATE, ); context.update_last_rav(new_rav.clone()).await.unwrap(); @@ -217,21 +398,21 @@ mod test { // we inserted let last_rav = context.last_rav().await.unwrap().unwrap(); - assert_eq!(TestableRav(new_rav.clone()), TestableRav(last_rav)); + assert_eq!(TestableRav::(new_rav.clone()), TestableRav(last_rav)); // Update the RAV 3 times in quick succession for i in 0..3 { - new_rav = create_rav( + new_rav = T::create_rav( ALLOCATION_ID_0, SIGNER.0.clone(), - timestamp_ns + i, - value_aggregate - (i as u128), + TIMESTAMP_NS + i, + VALUE_AGGREGATE - (i as u128), ); context.update_last_rav(new_rav.clone()).await.unwrap(); } // Check that the last rav is the last one we inserted let last_rav = context.last_rav().await.unwrap(); - assert_eq!(TestableRav(new_rav), TestableRav(last_rav.unwrap())); + assert_eq!(TestableRav::(new_rav), TestableRav(last_rav.unwrap())); } } diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 2a301224..f7630f85 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -23,7 +23,7 @@ use tap_core::{signed_message::Eip712SignedMessage, tap_eip712_domain}; use tap_graph::{Receipt, ReceiptAggregateVoucher, SignedRav, SignedReceipt}; use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::alloy::{ - primitives::{hex::ToHexExt, Address, U256}, + primitives::{hex::ToHexExt, Address, Bytes, U256}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, sol_types::Eip712Domain, }; @@ -47,7 +47,7 @@ use crate::{ }, }, tap::{ - context::{AdapterError, Horizon, Legacy}, + context::{AdapterError, Horizon, Legacy, NetworkVersion}, CheckingReceipt, }, }; @@ -244,6 +244,41 @@ pub async fn create_sender_accounts_manager( ) } +/// Generic implementation of create_rav +pub trait CreateRav: NetworkVersion { + /// This might seem weird at first glance since [Horizon] and [Legacy] implementation have the same + /// function signature and don't require &self. The reason is that we can not match over T to get + /// all variants because T is a trait and not an enum. + fn create_rav( + allocation_id: Address, + signer_wallet: PrivateKeySigner, + timestamp_ns: u64, + value_aggregate: u128, + ) -> Eip712SignedMessage; +} + +impl CreateRav for Legacy { + fn create_rav( + allocation_id: Address, + signer_wallet: PrivateKeySigner, + timestamp_ns: u64, + value_aggregate: u128, + ) -> Eip712SignedMessage { + create_rav(allocation_id, signer_wallet, timestamp_ns, value_aggregate) + } +} + +impl CreateRav for Horizon { + fn create_rav( + allocation_id: Address, + signer_wallet: PrivateKeySigner, + timestamp_ns: u64, + value_aggregate: u128, + ) -> Eip712SignedMessage { + create_rav_v2(allocation_id, signer_wallet, timestamp_ns, value_aggregate) + } +} + /// Fixture to generate a RAV using the wallet from `keys()` pub fn create_rav( allocation_id: Address, @@ -263,6 +298,29 @@ pub fn create_rav( .unwrap() } +/// Fixture to generate a RAV using the wallet from `keys()` +pub fn create_rav_v2( + allocation_id: Address, + signer_wallet: PrivateKeySigner, + timestamp_ns: u64, + value_aggregate: u128, +) -> tap_graph::v2::SignedRav { + Eip712SignedMessage::new( + &TAP_EIP712_DOMAIN_SEPARATOR, + tap_graph::v2::ReceiptAggregateVoucher { + allocationId: allocation_id, + timestampNs: timestamp_ns, + valueAggregate: value_aggregate, + payer: SENDER.1, + dataService: Address::ZERO, + serviceProvider: INDEXER.1, + metadata: Bytes::new(), + }, + &signer_wallet, + ) + .unwrap() +} + /// Generic implementation of create_received_receipt pub trait CreateReceipt { /// This might seem weird at first glance since [Horizon] and [Legacy] implementation have the same diff --git a/migrations/20250210152412_tap_horizon_ravs.down.sql b/migrations/20250210152412_tap_horizon_ravs.down.sql new file mode 100644 index 00000000..fb47503a --- /dev/null +++ b/migrations/20250210152412_tap_horizon_ravs.down.sql @@ -0,0 +1,3 @@ +-- Add down migration script here +DROP TABLE IF EXISTS tap_horizon_ravs CASCADE; +DROP TABLE IF EXISTS tap_horizon_rav_requests_failed CASCADE; diff --git a/migrations/20250210152412_tap_horizon_ravs.up.sql b/migrations/20250210152412_tap_horizon_ravs.up.sql new file mode 100644 index 00000000..a59418b6 --- /dev/null +++ b/migrations/20250210152412_tap_horizon_ravs.up.sql @@ -0,0 +1,33 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS tap_horizon_ravs ( + -- Values below are the individual fields of the EIP-712 RAV + signature BYTEA NOT NULL, + allocation_id CHAR(40) NOT NULL, + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + timestamp_ns NUMERIC(20) NOT NULL, + value_aggregate NUMERIC(39) NOT NULL, + metadata BYTEA NOT NULL, + + last BOOLEAN DEFAULT FALSE NOT NULL, + final BOOLEAN DEFAULT FALSE NOT NULL, + PRIMARY KEY (payer, data_service, service_provider, allocation_id), + + -- To make indexer-agent's sequelize happy + created_at TIMESTAMP WITH TIME ZONE, + updated_at TIMESTAMP WITH TIME ZONE +); + +-- This table is used to store failed RAV requests. +-- Used for logging and debugging purposes. +CREATE TABLE IF NOT EXISTS tap_horizon_rav_requests_failed ( + id BIGSERIAL PRIMARY KEY, + allocation_id CHAR(40) NOT NULL, + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + expected_rav JSON NOT NULL, + rav_response JSON NOT NULL, + reason TEXT NOT NULL +);