Skip to content

Commit

Permalink
refactor: parse v2 receipts (#631)
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored Feb 13, 2025
1 parent 0817632 commit 77dc51c
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
bip39 = "2.0.0"
rstest = "0.23.0"
wiremock = "0.6.1"
bon = "3.3"
tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
prost = "0.13.4"
prost-types = "0.13.3"
tonic-build = "0.12.3"
serde_yaml = "0.9.21"
bon = "3.3"
test-log = { version = "0.2.12", features = ["trace"] }

[patch.crates-io.tap_core]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
Expand Down
3 changes: 3 additions & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ tracing-subscriber = { workspace = true, features = ["fmt"] }
clap = { workspace = true, features = ["derive"] }
build-info.workspace = true
lazy_static.workspace = true
prost.workspace = true
async-trait.workspace = true
async-graphql = { version = "7.0.11", default-features = false }
async-graphql-axum = "7.0.11"
base64.workspace = true
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" }
tap_core.workspace = true
tap_graph.workspace = true
tap_aggregator.workspace = true
uuid.workspace = true
bon.workspace = true
tower_governor = { version = "0.5.0", features = ["axum"] }
Expand Down Expand Up @@ -71,6 +73,7 @@ tower-service = "0.3.3"
tokio-test = "0.4.4"
wiremock.workspace = true
insta = "1.41.1"
test-log.workspace = true

[build-dependencies]
build-info-build = { version = "0.0.39", default-features = false }
55 changes: 43 additions & 12 deletions crates/service/src/service/tap_receipt_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
use base64::prelude::*;
use lazy_static::lazy_static;
use prometheus::{register_counter, Counter};
use prost::Message;
use tap_aggregator::grpc;
use tap_graph::SignedReceipt;

use crate::tap::TapReceipt;
Expand All @@ -26,17 +29,28 @@ impl Header for TapHeader {
where
I: Iterator<Item = &'i HeaderValue>,
{
let mut execute = || {
let value = values.next();
let raw_receipt = value.ok_or(headers::Error::invalid())?;
let raw_receipt = raw_receipt
.to_str()
.map_err(|_| headers::Error::invalid())?;
let parsed_receipt: SignedReceipt =
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt)))
let mut execute = || -> anyhow::Result<TapHeader> {
let raw_receipt = values.next().ok_or(headers::Error::invalid())?;

// we first try to decode a v2 receipt since it's cheaper and fail earlier than using
// serde
match BASE64_STANDARD.decode(raw_receipt) {
Ok(raw_receipt) => {
tracing::debug!("Decoded v2");
let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?;
Ok(TapHeader(TapReceipt::V2(receipt.try_into()?)))
}
Err(_) => {
tracing::debug!("Could not decode v2, trying v1");
let parsed_receipt: SignedReceipt =
serde_json::from_slice(raw_receipt.as_ref())?;
Ok(TapHeader(TapReceipt::V1(parsed_receipt)))
}
}
};
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
execute()
.map_err(|_| headers::Error::invalid())
.inspect_err(|_| TAP_RECEIPT_INVALID.inc())
}

fn encode<E>(&self, _values: &mut E)
Expand All @@ -51,13 +65,16 @@ impl Header for TapHeader {
mod test {
use axum::http::HeaderValue;
use axum_extra::headers::Header;
use test_assets::{create_signed_receipt, SignedReceiptRequest};
use base64::prelude::*;
use prost::Message;
use tap_aggregator::grpc::v2::SignedReceipt;
use test_assets::{create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest};

use super::TapHeader;
use crate::tap::TapReceipt;

#[tokio::test]
async fn test_decode_valid_tap_receipt_header() {
async fn test_decode_valid_tap_v1_receipt_header() {
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
Expand All @@ -68,6 +85,20 @@ mod test {
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt)));
}

#[test_log::test(tokio::test)]
async fn test_decode_valid_tap_v2_receipt_header() {
let original_receipt = create_signed_receipt_v2().call().await;
let protobuf_receipt = SignedReceipt::from(original_receipt.clone());
let encoded = protobuf_receipt.encode_to_vec();
let base64_encoded = BASE64_STANDARD.encode(encoded);
let header_value = HeaderValue::from_str(&base64_encoded).unwrap();
let header_values = vec![&header_value];
let decoded_receipt = TapHeader::decode(&mut header_values.into_iter())
.expect("tap receipt header value should be valid");

assert_eq!(decoded_receipt, TapHeader(TapReceipt::V2(original_receipt)));
}

#[test]
fn test_decode_non_string_tap_receipt_header() {
let header_value = HeaderValue::from_static("123");
Expand Down
2 changes: 1 addition & 1 deletion crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ tempfile = "3.8.0"
wiremock.workspace = true
wiremock-grpc = "0.0.3-alpha3"
test-assets = { path = "../test-assets" }
test-log = { version = "0.2.12", features = ["trace"] }
test-log.workspace = true
rstest = "0.24.0"
30 changes: 30 additions & 0 deletions crates/test-assets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,36 @@ pub async fn create_signed_receipt(
.unwrap()
}

/// Function to generate a signed receipt using the TAP_SIGNER wallet.
#[bon::builder]
pub async fn create_signed_receipt_v2(
#[builder(default = ALLOCATION_ID_0)] allocation_id: Address,
#[builder(default)] nonce: u64,
#[builder(default = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64)]
timestamp_ns: u64,
#[builder(default = 1)] value: u128,
) -> tap_graph::v2::SignedReceipt {
let (wallet, _) = &*self::TAP_SIGNER;

Eip712SignedMessage::new(
&self::TAP_EIP712_DOMAIN,
tap_graph::v2::Receipt {
payer: TAP_SENDER.1,
service_provider: INDEXER_ADDRESS,
data_service: Address::ZERO,
allocation_id,
nonce,
timestamp_ns,
value,
},
wallet,
)
.unwrap()
}

pub async fn flush_messages(notify: &Notify) {
loop {
if tokio::time::timeout(Duration::from_millis(10), notify.notified())
Expand Down

0 comments on commit 77dc51c

Please sign in to comment.