Skip to content

Commit

Permalink
refactor: changes to dips to better support payment collection in ind…
Browse files Browse the repository at this point in the history
…exer-agent (#608)

* refactor: changes to dips to better support payment collection in indexer-agent

* chore: rename dipper to gateway dips, remove dipper-rpc, publish js 0.2.1

* chore: fmt
  • Loading branch information
pcarranzav authored Feb 12, 2025
1 parent d436023 commit f9453c4
Show file tree
Hide file tree
Showing 20 changed files with 561 additions and 109 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ typed-builder = "0.20.0"
tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
prost = "0.13.4"
prost-types = "0.13.3"
dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false }
tonic-build = "0.12.3"
serde_yaml = "0.9.21"

Expand Down
3 changes: 3 additions & 0 deletions crates/dips/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
axum.workspace = true
build-info.workspace = true
thiserror.workspace = true
anyhow.workspace = true
alloy-rlp = "0.3.10"
Expand All @@ -15,6 +17,7 @@ prost-types.workspace = true
uuid.workspace = true
base64.workspace = true
tokio.workspace = true
sqlx.workspace = true
futures = "0.3"

http = "0.2"
Expand Down
2 changes: 0 additions & 2 deletions crates/dips/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ fn main() {
tonic_build::configure()
.out_dir("src/proto")
.include_file("indexer.rs")
.build_client(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/indexer.proto"], &["proto/"])
.expect("Failed to compile DIPs indexer RPC proto(s)");

tonic_build::configure()
.out_dir("src/proto")
.include_file("gateway.rs")
.build_server(false)
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(&["proto/gateway.proto"], &["proto"])
.expect("Failed to compile DIPs gateway RPC proto(s)");
Expand Down
4 changes: 2 additions & 2 deletions crates/dips/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/dips/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@graphprotocol/dips-proto",
"version": "0.1.0",
"version": "0.2.1",
"main": "generated/index.js",
"types": "generated/index.d.ts",
"files": [
Expand Down
2 changes: 1 addition & 1 deletion crates/dips/proto/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

package graphprotocol.gateway.dips;

service DipsService {
service GatewayDipsService {
/**
* Cancel an _indexing agreement_.
*
Expand Down
2 changes: 1 addition & 1 deletion crates/dips/proto/indexer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

package graphprotocol.indexer.dips;

service DipsService {
service IndexerDipsService {
/**
* Propose a new _indexing agreement_ to an _indexer_.
*
Expand Down
70 changes: 39 additions & 31 deletions crates/service/src/database/dips.rs → crates/dips/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::str::FromStr;

use axum::async_trait;
use build_info::chrono::{DateTime, Utc};
use indexer_dips::{
store::AgreementStore, DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};
use sqlx::{types::BigDecimal, PgPool};
use thegraph_core::alloy::{core::primitives::U256 as uint256, hex::ToHexExt, sol_types::SolType};
use uuid::Uuid;

use crate::{
store::{AgreementStore, StoredIndexingAgreement},
DipsError, SignedCancellationRequest, SignedIndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};

#[derive(Debug)]
pub struct PsqlAgreementStore {
pub pool: PgPool,
Expand All @@ -25,10 +27,7 @@ fn uint256_to_bigdecimal(value: &uint256, field: &str) -> Result<BigDecimal, Dip

#[async_trait]
impl AgreementStore for PsqlAgreementStore {
async fn get_by_id(
&self,
id: Uuid,
) -> Result<Option<(SignedIndexingAgreementVoucher, bool)>, DipsError> {
async fn get_by_id(&self, id: Uuid) -> Result<Option<StoredIndexingAgreement>, DipsError> {
let item = sqlx::query!("SELECT * FROM indexing_agreements WHERE id=$1", id,)
.fetch_one(&self.pool)
.await;
Expand All @@ -41,8 +40,18 @@ impl AgreementStore for PsqlAgreementStore {

let signed = SignedIndexingAgreementVoucher::abi_decode(item.signed_payload.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let metadata =
SubgraphIndexingVoucherMetadata::abi_decode(signed.voucher.metadata.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
let cancelled = item.cancelled_at.is_some();
Ok(Some((signed, cancelled)))
Ok(Some(StoredIndexingAgreement {
voucher: signed,
metadata,
cancelled,
current_allocation_id: item.current_allocation_id,
last_allocation_id: item.last_allocation_id,
last_payment_collected_at: item.last_payment_collected_at,
}))
}
async fn create_agreement(
&self,
Expand Down Expand Up @@ -72,7 +81,7 @@ impl AgreementStore for PsqlAgreementStore {
let min_epochs_per_collection: i64 = agreement.voucher.minEpochsPerCollection.into();
let max_epochs_per_collection: i64 = agreement.voucher.maxEpochsPerCollection.into();
sqlx::query!(
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null)",
"INSERT INTO indexing_agreements VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,null,null,null,null,null)",
id,
agreement.signature.as_ref(),
bs,
Expand Down Expand Up @@ -126,15 +135,15 @@ pub(crate) mod test {
use std::sync::Arc;

use build_info::chrono::Duration;
use indexer_dips::{CancellationRequest, IndexingAgreementVoucher};
use sqlx::PgPool;
use thegraph_core::alloy::{
primitives::{ruint::aliases::U256, Address},
sol_types::{SolType, SolValue},
sol_types::SolValue,
};
use uuid::Uuid;

use super::*;
use crate::{CancellationRequest, IndexingAgreementVoucher};

#[sqlx::test(migrations = "../../migrations")]
async fn test_store_agreement(pool: PgPool) {
Expand Down Expand Up @@ -226,19 +235,15 @@ pub(crate) mod test {
.unwrap();

// Retrieve agreement
let (retrieved_signed_voucher, cancelled) = store.get_by_id(id).await.unwrap().unwrap();

let retrieved_voucher = &retrieved_signed_voucher.voucher;
let retrieved_metadata =
<indexer_dips::SubgraphIndexingVoucherMetadata as SolType>::abi_decode(
retrieved_voucher.metadata.as_ref(),
true,
)
.unwrap();
let stored_agreement = store.get_by_id(id).await.unwrap().unwrap();

let retrieved_voucher = &stored_agreement.voucher;
let retrieved_metadata = stored_agreement.metadata;

// Verify retrieved agreement matches original
assert_eq!(retrieved_signed_voucher.signature, agreement.signature);
assert_eq!(retrieved_voucher.signature, agreement.signature);
assert_eq!(
retrieved_voucher.durationEpochs,
retrieved_voucher.voucher.durationEpochs,
agreement.voucher.durationEpochs
);
assert_eq!(retrieved_metadata.protocolNetwork, metadata.protocolNetwork);
Expand All @@ -247,26 +252,29 @@ pub(crate) mod test {
retrieved_metadata.subgraphDeploymentId,
metadata.subgraphDeploymentId
);
assert_eq!(retrieved_voucher.payer, agreement.voucher.payer);
assert_eq!(retrieved_voucher.recipient, agreement.voucher.recipient);
assert_eq!(retrieved_voucher.service, agreement.voucher.service);
assert_eq!(retrieved_voucher.voucher.payer, agreement.voucher.payer);
assert_eq!(
retrieved_voucher.voucher.recipient,
agreement.voucher.recipient
);
assert_eq!(retrieved_voucher.voucher.service, agreement.voucher.service);
assert_eq!(
retrieved_voucher.maxInitialAmount,
retrieved_voucher.voucher.maxInitialAmount,
agreement.voucher.maxInitialAmount
);
assert_eq!(
retrieved_voucher.maxOngoingAmountPerEpoch,
retrieved_voucher.voucher.maxOngoingAmountPerEpoch,
agreement.voucher.maxOngoingAmountPerEpoch
);
assert_eq!(
retrieved_voucher.maxEpochsPerCollection,
retrieved_voucher.voucher.maxEpochsPerCollection,
agreement.voucher.maxEpochsPerCollection
);
assert_eq!(
retrieved_voucher.minEpochsPerCollection,
retrieved_voucher.voucher.minEpochsPerCollection,
agreement.voucher.minEpochsPerCollection
);
assert!(!cancelled);
assert!(!stored_agreement.cancelled);
}

#[sqlx::test(migrations = "../../migrations")]
Expand Down
19 changes: 9 additions & 10 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use thegraph_core::alloy::{
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
};

pub mod database;
pub mod ipfs;
pub mod price;
pub mod proto;
Expand Down Expand Up @@ -353,11 +354,11 @@ pub async fn validate_and_cancel_agreement(
decoded_request.request.agreement_id.into(),
))
.await?;
let (agreement, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
if cancelled {
let stored_agreement = result.ok_or(DipsError::AgreementNotFound)?;
if stored_agreement.cancelled {
return Err(DipsError::AgreementCancelled);
}
let expected_signer = agreement.voucher.payer;
let expected_signer = stored_agreement.voucher.voucher.payer;
let id = Uuid::from_bytes(decoded_request.request.agreement_id.into());
decoded_request.validate(domain, &expected_signer)?;

Expand Down Expand Up @@ -438,11 +439,10 @@ mod test {
.unwrap();
assert_eq!(actual_id, id);

let actual = store.get_by_id(actual_id).await.unwrap();
let stored_agreement = store.get_by_id(actual_id).await.unwrap().unwrap();

let (actual_voucher, actual_cancelled) = actual.unwrap();
assert_eq!(voucher, actual_voucher);
assert!(!actual_cancelled);
assert_eq!(voucher, stored_agreement.voucher);
assert!(!stored_agreement.cancelled);
Ok(())
}

Expand Down Expand Up @@ -673,9 +673,8 @@ mod test {
assert_eq!(agreement_id, cancelled_id);

// Verify agreement is cancelled
let result = store.get_by_id(agreement_id).await?;
let (_, cancelled) = result.ok_or(DipsError::AgreementNotFound)?;
assert!(cancelled);
let stored_agreement = store.get_by_id(agreement_id).await?.unwrap();
assert!(stored_agreement.cancelled);

Ok(())
}
Expand Down
Loading

0 comments on commit f9453c4

Please sign in to comment.