Skip to content

Commit

Permalink
feat: Add DIPS ipfs validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 10, 2025
1 parent 704c49c commit 065670e
Show file tree
Hide file tree
Showing 8 changed files with 663 additions and 26 deletions.
340 changes: 321 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [
"crates/attestation",
"crates/config",
"crates/dips",
"crates/indexer-receipt",
"crates/indexer-receipt",
"crates/monitor",
"crates/query",
"crates/service",
Expand Down Expand Up @@ -82,6 +82,7 @@ prost = "0.13.4"
prost-types = "0.13.3"
dipper-rpc = { git = "https://github.com/edgeandnode/dipper/", rev = "c8700e2", default-features = false }
tonic-build = "0.12.3"
serde_yaml = "0.9.21"

[patch.crates-io.tap_core]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
Expand Down
9 changes: 9 additions & 0 deletions crates/dips/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ prost-types.workspace = true
uuid.workspace = true
base64.workspace = true
tokio.workspace = true
futures = "0.3"

http = "0.2"
derivative = "2.2.0"
ipfs-api-backend-hyper = {version = "0.6.0", features = ["with-send-sync"] }
ipfs-api-prelude = {version = "0.6.0", features = ["with-send-sync"] }
bytes = "1.10.0"
serde_yaml.workspace = true
serde.workspace = true

[build-dependencies]
tonic-build = { workspace = true }
234 changes: 234 additions & 0 deletions crates/dips/src/ipfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{str::FromStr, sync::Arc};

use derivative::Derivative;
use futures::TryStreamExt;
use http::Uri;
use ipfs_api_prelude::{IpfsApi, TryFromUri};
use serde::Deserialize;
use tonic::async_trait;

use crate::DipsError;

#[async_trait]
pub trait IpfsFetcher: Send + Sync + std::fmt::Debug {
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError>;
}

#[async_trait]
impl<T: IpfsFetcher> IpfsFetcher for Arc<T> {
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
self.as_ref().fetch(file).await
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct IpfsClient {
#[derivative(Debug = "ignore")]
client: ipfs_api_backend_hyper::IpfsClient,
}

impl IpfsClient {
pub fn new(url: &str) -> anyhow::Result<Self> {
Ok(Self {
client: ipfs_api_backend_hyper::IpfsClient::build_with_base_uri(
Uri::from_str(url).map_err(|err| anyhow::Error::from(err))?,
),
})
}
}

#[async_trait]
impl IpfsFetcher for IpfsClient {
async fn fetch(&self, file: &str) -> Result<GraphManifest, DipsError> {
let content = self
.client
.get(file.as_ref())
.map_ok(|chunk| chunk.to_vec())
.try_concat()
.await
.unwrap();

let manifest: GraphManifest = serde_yaml::from_slice(&content)
.map_err(|_| DipsError::InvalidSubgraphManifest(file.to_string()))?;

Ok(manifest)
}
}

#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DataSource {
network: String,
}

#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GraphManifest {
data_sources: Vec<DataSource>,
}

impl GraphManifest {
pub fn network(&self) -> Option<String> {
self.data_sources.first().map(|ds| ds.network.clone())
}
}

#[cfg(test)]
#[derive(Debug)]
pub struct TestIpfsClient {
manifest: GraphManifest,
}

#[cfg(test)]
impl TestIpfsClient {
pub fn mainnet() -> Self {
Self {
manifest: GraphManifest {
data_sources: vec![DataSource {
network: "mainnet".to_string(),
}],
},
}
}
}

#[cfg(test)]
#[async_trait]
impl IpfsFetcher for TestIpfsClient {
async fn fetch(&self, _file: &str) -> Result<GraphManifest, DipsError> {
Ok(self.manifest.clone())
}
}

#[cfg(test)]
mod test {
use crate::ipfs::{DataSource, GraphManifest};

#[test]
fn test_deserialize_manifest() {
let manifest: GraphManifest = serde_yaml::from_str(&MANIFEST).unwrap();
assert_eq!(
manifest,
GraphManifest {
data_sources: vec![
DataSource {
network: "scroll".to_string()
},
DataSource {
network: "scroll".to_string()
}
],
}
)
}

const MANIFEST: &'static str = "
dataSources:
- kind: ethereum/contract
mapping:
abis:
- file:
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
name: Factory
apiVersion: 0.0.6
entities: []
eventHandlers:
- event: >-
PoolCreated(indexed address,indexed address,indexed
uint24,int24,address)
handler: handlePoolCreated
file:
/: /ipfs/Qmbj3ituUaFRnTuahJ8yCG9GPiPqsRYq2T7umucZzPpLFn
kind: ethereum/events
language: wasm/assemblyscript
name: Factory
network: scroll
source:
abi: Factory
address: '0x46B3fDF7b5CDe91Ac049936bF0bDb12c5d22202e'
startBlock: 82522
- kind: ethereum/contract
mapping:
abis:
- file:
/: /ipfs/QmaxxqQ7xzbGDPWu184uoq2g5sofazB9B9tEDrpPjmRZ8q
name: NonfungiblePositionManager
apiVersion: 0.0.6
entities: []
eventHandlers:
- event: 'IncreaseLiquidity(indexed uint256,uint128,uint256,uint256)'
handler: handleIncreaseLiquidity
file:
/: /ipfs/QmcWrYawVufpST4u2Ed8Jz6jxFFaYXxERGwqstrpniY8C5
kind: ethereum/events
language: wasm/assemblyscript
name: NonfungiblePositionManager
network: scroll
source:
abi: NonfungiblePositionManager
address: '0x0389879e0156033202C44BF784ac18fC02edeE4f'
startBlock: 82597
features:
- nonFatalErrors
schema:
file:
/: /ipfs/QmSCM39NPLAjNQXsnkqq6H8z8KBi5YkfYyApPYLQbbC2kb
specVersion: 0.0.4
templates:
- kind: ethereum/contract
mapping:
abis:
- file:
/: /ipfs/QmULRc8Ac1J6YFy11z7JRpyThb6f7nmL5mMTQvN7LKj2Vy
name: Pool
- file:
/: /ipfs/QmTU8eKx6pCgtff6Uvc7srAwR8BPiM3jTMBw9ahrXBjRzY
name: Factory
- file:
/: /ipfs/QmXuTbDkNrN27VydxbS2huvKRk62PMgUTdPDWkxcr2w7j2
name: ERC20
apiVersion: 0.0.6
entities: []
eventHandlers:
- event: 'Initialize(uint160,int24)'
handler: handleInitialize
- event: >-
Swap(indexed address,indexed
address,int256,int256,uint160,uint128,int24)
handler: handleSwap
- event: >-
Mint(address,indexed address,indexed int24,indexed
int24,uint128,uint256,uint256)
handler: handleMint
- event: >-
Burn(indexed address,indexed int24,indexed
int24,uint128,uint256,uint256)
handler: handleBurn
- event: >-
Flash(indexed address,indexed
address,uint256,uint256,uint256,uint256)
handler: handleFlash
- event: >-
Collect(indexed address,address,indexed int24,indexed
int24,uint128,uint128)
handler: handlePoolCollect
- event: 'CollectProtocol(indexed address,indexed address,uint128,uint128)'
handler: handleProtocolCollect
- event: 'SetFeeProtocol(uint8,uint8,uint8,uint8)'
handler: handleSetProtocolFee
file:
/: /ipfs/QmPtcuzBcWWBGXFKGdfUgqZLJov4c4Crt85ANbER2eHdCb
kind: ethereum/events
language: wasm/assemblyscript
name: Pool
network: scroll
source:
abi: Pool
";
}
58 changes: 53 additions & 5 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@

use std::{str::FromStr, sync::Arc};

use ipfs::IpfsFetcher;
use price::PriceCalculator;
use thegraph_core::alloy::{
core::primitives::Address,
primitives::{b256, ChainId, PrimitiveSignature as Signature, B256},
primitives::{b256, ChainId, PrimitiveSignature as Signature, Uint, B256},
signers::SignerSync,
sol,
sol_types::{eip712_domain, Eip712Domain, SolStruct, SolValue},
};

pub mod ipfs;
pub mod price;
pub mod proto;
pub mod server;
pub mod store;
Expand Down Expand Up @@ -131,6 +135,14 @@ pub enum DipsError {
PayerNotAuthorised(Address),
#[error("voucher payee {actual} does not match the expected address {expected}")]
UnexpectedPayee { expected: Address, actual: Address },
#[error("invalid subgraph id {0}")]
InvalidSubgraphManifest(String),
#[error("voucher for chain id {0}, subgraph manifest has network {1}")]
SubgraphChainIdMistmatch(String, String),
#[error("chainId {0} is not supported")]
UnsupportedChainId(String),
#[error("price per block is below configured price for chain {0}, minimum: {1}, offered: {2}")]
PricePerBlockTooLow(String, u64, String),
// cancellation
#[error("cancelled_by is expected to match the signer")]
UnexpectedSigner,
Expand Down Expand Up @@ -276,6 +288,8 @@ pub async fn validate_and_create_agreement(
expected_payee: &Address,
allowed_payers: impl AsRef<[Address]>,
voucher: Vec<u8>,
price_calculator: &PriceCalculator,
ipfs_client: Arc<dyn IpfsFetcher>,
) -> Result<Uuid, DipsError> {
let decoded_voucher = SignedIndexingAgreementVoucher::abi_decode(voucher.as_ref(), true)
.map_err(|e| DipsError::AbiDecoding(e.to_string()))?;
Expand All @@ -287,6 +301,35 @@ pub async fn validate_and_create_agreement(

decoded_voucher.validate(domain, expected_payee, allowed_payers)?;

let manifest = ipfs_client.fetch(&metadata.subgraphDeploymentId).await?;
match manifest.network() {
Some(chain_id) if chain_id == metadata.chainId => {}
Some(chain_id) => {
return Err(DipsError::SubgraphChainIdMistmatch(
metadata.chainId,
chain_id,
))
}
None => return Err(DipsError::UnsupportedChainId("".to_string())),
}

let chain_id = manifest
.network()
.ok_or_else(|| DipsError::UnsupportedChainId("".to_string()))?;

let offered_price = decoded_voucher.voucher.maxOngoingAmountPerEpoch;
match price_calculator.get_minimum_price(&chain_id) {
Some(price) if offered_price.lt(&Uint::from(price)) => {
return Err(DipsError::PricePerBlockTooLow(
chain_id,
price,
offered_price.to_string(),
))
}
Some(_) => {}
None => return Err(DipsError::UnsupportedChainId(chain_id)),
}

store
.create_agreement(decoded_voucher.clone(), metadata)
.await?;
Expand Down Expand Up @@ -339,8 +382,9 @@ mod test {

pub use crate::store::{AgreementStore, InMemoryAgreementStore};
use crate::{
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, CancellationRequest,
DipsError, IndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
dips_agreement_eip712_domain, dips_cancellation_eip712_domain, ipfs::TestIpfsClient,
price::PriceCalculator, CancellationRequest, DipsError, IndexingAgreementVoucher,
SubgraphIndexingVoucherMetadata,
};

#[tokio::test]
Expand All @@ -355,7 +399,7 @@ mod test {
basePricePerEpoch: U256::from(10000_u64),
pricePerEntity: U256::from(100_u64),
protocolNetwork: "eip155:42161".to_string(),
chainId: "eip155:1".to_string(),
chainId: "mainnet".to_string(),
subgraphDeploymentId: deployment_id,
};

Expand Down Expand Up @@ -386,6 +430,8 @@ mod test {
&payee_addr,
vec![payer_addr],
abi_voucher,
&PriceCalculator::for_testing(),
Arc::new(TestIpfsClient::mainnet()),
)
.await
.unwrap();
Expand Down Expand Up @@ -544,7 +590,7 @@ mod test {
basePricePerEpoch: U256::from(10000_u64),
pricePerEntity: U256::from(100_u64),
protocolNetwork: "eip155:42161".to_string(),
chainId: "eip155:1".to_string(),
chainId: "mainnet".to_string(),
subgraphDeploymentId: deployment_id,
};

Expand Down Expand Up @@ -575,6 +621,8 @@ mod test {
&payee_addr,
vec![payer_addr],
signed_voucher.encode_vec(),
&PriceCalculator::for_testing(),
Arc::new(TestIpfsClient::mainnet()),
)
.await?;

Expand Down
Loading

0 comments on commit 065670e

Please sign in to comment.