diff --git a/Cargo.lock b/Cargo.lock index ebeb733a..479f96be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3911,14 +3911,17 @@ dependencies = [ "lazy_static", "pin-project 1.1.9", "prometheus", + "prost", "reqwest 0.12.9", "rstest 0.23.0", "serde", "serde_json", "sqlx", + "tap_aggregator", "tap_core", "tap_graph", "test-assets", + "test-log", "thegraph-core", "thegraph-graphql-http", "thiserror 1.0.69", diff --git a/Cargo.toml b/Cargo.toml index 084d58b2..7502a4e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,12 +76,13 @@ graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] } bip39 = "2.0.0" rstest = "0.23.0" wiremock = "0.6.1" -bon = "3.3" tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] } prost = "0.13.4" prost-types = "0.13.3" tonic-build = "0.12.3" serde_yaml = "0.9.21" +bon = "3.3" +test-log = { version = "0.2.12", features = ["trace"] } [patch.crates-io.tap_core] git = "https://github.com/semiotic-ai/timeline-aggregation-protocol" diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index cd072279..81ca62ae 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -32,6 +32,7 @@ tracing-subscriber = { workspace = true, features = ["fmt"] } clap = { workspace = true, features = ["derive"] } build-info.workspace = true lazy_static.workspace = true +prost.workspace = true async-trait.workspace = true async-graphql = { version = "7.0.11", default-features = false } async-graphql-axum = "7.0.11" @@ -39,6 +40,7 @@ base64.workspace = true graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" } tap_core.workspace = true tap_graph.workspace = true +tap_aggregator.workspace = true uuid.workspace = true bon.workspace = true tower_governor = { version = "0.5.0", features = ["axum"] } @@ -71,6 +73,7 @@ tower-service = "0.3.3" tokio-test = "0.4.4" wiremock.workspace = true insta = "1.41.1" +test-log.workspace = true [build-dependencies] build-info-build = { version = "0.0.39", default-features = false } diff --git a/crates/service/src/service/tap_receipt_header.rs b/crates/service/src/service/tap_receipt_header.rs index c198c635..2bb38c78 100644 --- a/crates/service/src/service/tap_receipt_header.rs +++ b/crates/service/src/service/tap_receipt_header.rs @@ -2,8 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 use axum_extra::headers::{self, Header, HeaderName, HeaderValue}; +use base64::prelude::*; use lazy_static::lazy_static; use prometheus::{register_counter, Counter}; +use prost::Message; +use tap_aggregator::grpc; use tap_graph::SignedReceipt; use crate::tap::TapReceipt; @@ -26,17 +29,28 @@ impl Header for TapHeader { where I: Iterator, { - let mut execute = || { - let value = values.next(); - let raw_receipt = value.ok_or(headers::Error::invalid())?; - let raw_receipt = raw_receipt - .to_str() - .map_err(|_| headers::Error::invalid())?; - let parsed_receipt: SignedReceipt = - serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?; - Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt))) + let mut execute = || -> anyhow::Result { + let raw_receipt = values.next().ok_or(headers::Error::invalid())?; + + // we first try to decode a v2 receipt since it's cheaper and fail earlier than using + // serde + match BASE64_STANDARD.decode(raw_receipt) { + Ok(raw_receipt) => { + tracing::debug!("Decoded v2"); + let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?; + Ok(TapHeader(TapReceipt::V2(receipt.try_into()?))) + } + Err(_) => { + tracing::debug!("Could not decode v2, trying v1"); + let parsed_receipt: SignedReceipt = + serde_json::from_slice(raw_receipt.as_ref())?; + Ok(TapHeader(TapReceipt::V1(parsed_receipt))) + } + } }; - execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc()) + execute() + .map_err(|_| headers::Error::invalid()) + .inspect_err(|_| TAP_RECEIPT_INVALID.inc()) } fn encode(&self, _values: &mut E) @@ -51,13 +65,16 @@ impl Header for TapHeader { mod test { use axum::http::HeaderValue; use axum_extra::headers::Header; - use test_assets::{create_signed_receipt, SignedReceiptRequest}; + use base64::prelude::*; + use prost::Message; + use tap_aggregator::grpc::v2::SignedReceipt; + use test_assets::{create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest}; use super::TapHeader; use crate::tap::TapReceipt; #[tokio::test] - async fn test_decode_valid_tap_receipt_header() { + async fn test_decode_valid_tap_v1_receipt_header() { let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await; let serialized_receipt = serde_json::to_string(&original_receipt).unwrap(); let header_value = HeaderValue::from_str(&serialized_receipt).unwrap(); @@ -68,6 +85,20 @@ mod test { assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt))); } + #[test_log::test(tokio::test)] + async fn test_decode_valid_tap_v2_receipt_header() { + let original_receipt = create_signed_receipt_v2().call().await; + let protobuf_receipt = SignedReceipt::from(original_receipt.clone()); + let encoded = protobuf_receipt.encode_to_vec(); + let base64_encoded = BASE64_STANDARD.encode(encoded); + let header_value = HeaderValue::from_str(&base64_encoded).unwrap(); + let header_values = vec![&header_value]; + let decoded_receipt = TapHeader::decode(&mut header_values.into_iter()) + .expect("tap receipt header value should be valid"); + + assert_eq!(decoded_receipt, TapHeader(TapReceipt::V2(original_receipt))); + } + #[test] fn test_decode_non_string_tap_receipt_header() { let header_value = HeaderValue::from_static("123"); diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 9c31f724..54f4cdf0 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -63,5 +63,5 @@ tempfile = "3.8.0" wiremock.workspace = true wiremock-grpc = "0.0.3-alpha3" test-assets = { path = "../test-assets" } -test-log = { version = "0.2.12", features = ["trace"] } +test-log.workspace = true rstest = "0.24.0" diff --git a/crates/test-assets/src/lib.rs b/crates/test-assets/src/lib.rs index 3a9edb4b..6efde0d7 100644 --- a/crates/test-assets/src/lib.rs +++ b/crates/test-assets/src/lib.rs @@ -348,6 +348,36 @@ pub async fn create_signed_receipt( .unwrap() } +/// Function to generate a signed receipt using the TAP_SIGNER wallet. +#[bon::builder] +pub async fn create_signed_receipt_v2( + #[builder(default = ALLOCATION_ID_0)] allocation_id: Address, + #[builder(default)] nonce: u64, + #[builder(default = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64)] + timestamp_ns: u64, + #[builder(default = 1)] value: u128, +) -> tap_graph::v2::SignedReceipt { + let (wallet, _) = &*self::TAP_SIGNER; + + Eip712SignedMessage::new( + &self::TAP_EIP712_DOMAIN, + tap_graph::v2::Receipt { + payer: TAP_SENDER.1, + service_provider: INDEXER_ADDRESS, + data_service: Address::ZERO, + allocation_id, + nonce, + timestamp_ns, + value, + }, + wallet, + ) + .unwrap() +} + pub async fn flush_messages(notify: &Notify) { loop { if tokio::time::timeout(Duration::from_millis(10), notify.notified())