Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: parse v2 receipts #631

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ graphql_client = { version = "0.14.0", features = ["reqwest-rustls"] }
bip39 = "2.0.0"
rstest = "0.23.0"
wiremock = "0.6.1"
bon = "3.3"
tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
prost = "0.13.4"
prost-types = "0.13.3"
tonic-build = "0.12.3"
serde_yaml = "0.9.21"
bon = "3.3"
test-log = { version = "0.2.12", features = ["trace"] }

[patch.crates-io.tap_core]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
Expand Down
3 changes: 3 additions & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ tracing-subscriber = { workspace = true, features = ["fmt"] }
clap = { workspace = true, features = ["derive"] }
build-info.workspace = true
lazy_static.workspace = true
prost.workspace = true
async-trait.workspace = true
async-graphql = { version = "7.0.11", default-features = false }
async-graphql-axum = "7.0.11"
base64.workspace = true
graphql = { git = "https://github.com/edgeandnode/toolshed", tag = "graphql-v0.3.0" }
tap_core.workspace = true
tap_graph.workspace = true
tap_aggregator.workspace = true
uuid.workspace = true
bon.workspace = true
tower_governor = { version = "0.5.0", features = ["axum"] }
Expand Down Expand Up @@ -71,6 +73,7 @@ tower-service = "0.3.3"
tokio-test = "0.4.4"
wiremock.workspace = true
insta = "1.41.1"
test-log.workspace = true

[build-dependencies]
build-info-build = { version = "0.0.39", default-features = false }
55 changes: 43 additions & 12 deletions crates/service/src/service/tap_receipt_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
use base64::prelude::*;
use lazy_static::lazy_static;
use prometheus::{register_counter, Counter};
use prost::Message;
use tap_aggregator::grpc;
use tap_graph::SignedReceipt;

use crate::tap::TapReceipt;
Expand All @@ -26,17 +29,28 @@ impl Header for TapHeader {
where
I: Iterator<Item = &'i HeaderValue>,
{
let mut execute = || {
let value = values.next();
let raw_receipt = value.ok_or(headers::Error::invalid())?;
let raw_receipt = raw_receipt
.to_str()
.map_err(|_| headers::Error::invalid())?;
let parsed_receipt: SignedReceipt =
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt)))
let mut execute = || -> anyhow::Result<TapHeader> {
let raw_receipt = values.next().ok_or(headers::Error::invalid())?;

// we first try to decode a v2 receipt since it's cheaper and fail earlier than using
// serde
match BASE64_STANDARD.decode(raw_receipt) {
Ok(raw_receipt) => {
tracing::debug!("Decoded v2");
let receipt = grpc::v2::SignedReceipt::decode(raw_receipt.as_ref())?;
Ok(TapHeader(TapReceipt::V2(receipt.try_into()?)))
}
Err(_) => {
tracing::debug!("Could not decode v2, trying v1");
let parsed_receipt: SignedReceipt =
serde_json::from_slice(raw_receipt.as_ref())?;
Ok(TapHeader(TapReceipt::V1(parsed_receipt)))
}
}
};
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
execute()
.map_err(|_| headers::Error::invalid())
.inspect_err(|_| TAP_RECEIPT_INVALID.inc())
}

fn encode<E>(&self, _values: &mut E)
Expand All @@ -51,13 +65,16 @@ impl Header for TapHeader {
mod test {
use axum::http::HeaderValue;
use axum_extra::headers::Header;
use test_assets::{create_signed_receipt, SignedReceiptRequest};
use base64::prelude::*;
use prost::Message;
use tap_aggregator::grpc::v2::SignedReceipt;
use test_assets::{create_signed_receipt, create_signed_receipt_v2, SignedReceiptRequest};

use super::TapHeader;
use crate::tap::TapReceipt;

#[tokio::test]
async fn test_decode_valid_tap_receipt_header() {
async fn test_decode_valid_tap_v1_receipt_header() {
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
Expand All @@ -68,6 +85,20 @@ mod test {
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt)));
}

#[test_log::test(tokio::test)]
async fn test_decode_valid_tap_v2_receipt_header() {
let original_receipt = create_signed_receipt_v2().call().await;
let protobuf_receipt = SignedReceipt::from(original_receipt.clone());
let encoded = protobuf_receipt.encode_to_vec();
let base64_encoded = BASE64_STANDARD.encode(encoded);
let header_value = HeaderValue::from_str(&base64_encoded).unwrap();
let header_values = vec![&header_value];
let decoded_receipt = TapHeader::decode(&mut header_values.into_iter())
.expect("tap receipt header value should be valid");

assert_eq!(decoded_receipt, TapHeader(TapReceipt::V2(original_receipt)));
}

#[test]
fn test_decode_non_string_tap_receipt_header() {
let header_value = HeaderValue::from_static("123");
Expand Down
2 changes: 1 addition & 1 deletion crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ tempfile = "3.8.0"
wiremock.workspace = true
wiremock-grpc = "0.0.3-alpha3"
test-assets = { path = "../test-assets" }
test-log = { version = "0.2.12", features = ["trace"] }
test-log.workspace = true
rstest = "0.24.0"
30 changes: 30 additions & 0 deletions crates/test-assets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,36 @@ pub async fn create_signed_receipt(
.unwrap()
}

/// Function to generate a signed receipt using the TAP_SIGNER wallet.
#[bon::builder]
pub async fn create_signed_receipt_v2(
#[builder(default = ALLOCATION_ID_0)] allocation_id: Address,
#[builder(default)] nonce: u64,
#[builder(default = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64)]
timestamp_ns: u64,
#[builder(default = 1)] value: u128,
) -> tap_graph::v2::SignedReceipt {
let (wallet, _) = &*self::TAP_SIGNER;

Eip712SignedMessage::new(
&self::TAP_EIP712_DOMAIN,
tap_graph::v2::Receipt {
payer: TAP_SENDER.1,
service_provider: INDEXER_ADDRESS,
data_service: Address::ZERO,
allocation_id,
nonce,
timestamp_ns,
value,
},
wallet,
)
.unwrap()
}

pub async fn flush_messages(notify: &Notify) {
loop {
if tokio::time::timeout(Duration::from_millis(10), notify.notified())
Expand Down
Loading