Skip to content

Commit

Permalink
feat: add escape hatch to trusted senders (#621)
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored Feb 11, 2025
1 parent bce155e commit bdc40ef
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 14 deletions.
5 changes: 5 additions & 0 deletions crates/config/maximal-config-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ max_receipt_value_grt = "0.001" # 0.001 GRT. We use strings to prevent rounding
# max_amount_willing_to_lose_grt = "0.1"
max_amount_willing_to_lose_grt = 20

# List of Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
# over the escrow balance
trusted_senders = ["0xdeadbeefcafebabedeadbeefcafebabedeadbeef"]


# Receipts query timeout
sender_timeout_secs = 30

Expand Down
13 changes: 10 additions & 3 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
env,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
Expand Down Expand Up @@ -382,6 +382,11 @@ pub struct TapConfig {
pub sender_timeout_secs: Duration,

pub sender_aggregator_endpoints: HashMap<Address, Url>,

/// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
/// over the escrow balance
#[serde(default)]
pub trusted_senders: HashSet<Address>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -431,11 +436,11 @@ pub struct RavRequestConfig {

#[cfg(test)]
mod tests {
use std::{env, fs, path::PathBuf, str::FromStr};
use std::{collections::HashSet, env, fs, path::PathBuf, str::FromStr};

use figment::value::Uncased;
use sealed_test::prelude::*;
use thegraph_core::alloy::primitives::{Address, FixedBytes};
use thegraph_core::alloy::primitives::{address, Address, FixedBytes};
use tracing_test::traced_test;

use super::{DatabaseConfig, SHARED_PREFIX};
Expand All @@ -458,6 +463,8 @@ mod tests {
Some(PathBuf::from("minimal-config-example.toml")).as_ref(),
)
.unwrap();
max_config.tap.trusted_senders =
HashSet::from([address!("deadbeefcafebabedeadbeefcafebabedeadbeef")]);
max_config.dips = Some(crate::DipsConfig {
allowed_payers: vec![Address(
FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(),
Expand Down
3 changes: 1 addition & 2 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ mod test {
time::{Duration, SystemTime, UNIX_EPOCH},
};

use rand::{distr::Alphanumeric, Rng};
use thegraph_core::alloy::{
primitives::{Address, FixedBytes, U256},
signers::local::PrivateKeySigner,
Expand All @@ -386,8 +387,6 @@ mod test {
price::PriceCalculator, CancellationRequest, DipsError, IndexingAgreementVoucher,
SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};
use rand::distr::Alphanumeric;
use rand::Rng;

#[tokio::test]
async fn test_validate_and_create_agreement() -> anyhow::Result<()> {
Expand Down
3 changes: 1 addition & 2 deletions crates/dips/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use thegraph_core::alloy::primitives::Address;
use thegraph_core::alloy::sol_types::Eip712Domain;
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use tonic::{Request, Response, Status};

use crate::{
Expand Down
97 changes: 92 additions & 5 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ pub struct State {
// reset in case of a successful response
backoff_info: BackoffInfo,

/// Allows the sender to go over escrow balance
/// limited to `max_amount_willing_to_lose_grt`
trusted_sender: bool,

// Config forwarded to [SenderAllocation]
config: &'static SenderAccountConfig,
}
Expand All @@ -343,6 +347,9 @@ pub struct SenderAccountConfig {
///
/// This is reached if the database is too slow
pub tap_sender_timeout: Duration,
/// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
/// over the escrow balance
pub trusted_senders: HashSet<Address>,
}

impl SenderAccountConfig {
Expand All @@ -357,6 +364,7 @@ impl SenderAccountConfig {
trigger_value: config.tap.get_trigger_value(),
rav_request_timeout: config.tap.rav_request.request_timeout_secs,
tap_sender_timeout: config.tap.sender_timeout_secs,
trusted_senders: config.tap.trusted_senders.clone(),
}
}
}
Expand Down Expand Up @@ -531,14 +539,22 @@ impl State {
fn deny_condition_reached(&self) -> bool {
let pending_ravs = self.rav_tracker.get_total_fee();
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
let pending_fees_over_balance =
U256::from(pending_ravs + unaggregated_fees) >= self.sender_balance;
let max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt;

// if it's a trusted sender, allow to spend up to max_amount_willing_to_lose
let balance = if self.trusted_sender {
self.sender_balance + U256::from(max_amount_willing_to_lose)
} else {
self.sender_balance
};

let pending_fees_over_balance = U256::from(pending_ravs + unaggregated_fees) >= balance;
let invalid_receipt_fees = self.invalid_receipts_tracker.get_total_fee();
let total_fee_over_max_value =
unaggregated_fees + invalid_receipt_fees >= max_amount_willing_to_lose;

tracing::trace!(
trusted_sender = %self.trusted_sender,
%pending_fees_over_balance,
%total_fee_over_max_value,
"Verifying if deny condition was reached.",
Expand All @@ -550,6 +566,7 @@ impl State {
/// Will update [`State::denied`], as well as the denylist table in the database.
async fn add_to_denylist(&mut self) {
tracing::warn!(
trusted_sender = %self.trusted_sender,
fee_tracker = self.sender_fee_tracker.get_total_fee(),
rav_tracker = self.rav_tracker.get_total_fee(),
max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt,
Expand Down Expand Up @@ -841,6 +858,7 @@ impl Actor for SenderAccount {
aggregator_v1,
aggregator_v2,
backoff_info: BackoffInfo::default(),
trusted_sender: config.trusted_senders.contains(&sender_id),
config,
};

Expand Down Expand Up @@ -1284,7 +1302,7 @@ pub mod tests {
Mock, MockServer, ResponseTemplate,
};

use super::SenderAccountMessage;
use super::{RavInformation, SenderAccountMessage};
use crate::{
agent::{
sender_account::ReceiptFees, sender_accounts_manager::AllocationId,
Expand All @@ -1294,7 +1312,7 @@ pub mod tests {
assert_not_triggered, assert_triggered,
test::{
actors::{create_mock_sender_allocation, MockSenderAllocation},
create_rav, create_sender_account, store_rav_with_options, TRIGGER_VALUE,
create_rav, create_sender_account, store_rav_with_options, ESCROW_VALUE, TRIGGER_VALUE,
},
};

Expand Down Expand Up @@ -1343,7 +1361,6 @@ pub mod tests {
}

/// Prefix shared between tests so we don't have conflicts in the global registry
const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);

Expand Down Expand Up @@ -1986,6 +2003,76 @@ pub mod tests {
sender_account.stop_and_wait(None, None).await.unwrap();
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_trusted_sender(pgpool: PgPool) {
let max_amount_willing_to_lose_grt = ESCROW_VALUE / 10;
// initialize with no trigger value and no max receipt deny
let (sender_account, notify, prefix, _) = create_sender_account()
.pgpool(pgpool)
.trusted_sender(true)
.rav_request_trigger_value(u128::MAX)
.max_amount_willing_to_lose_grt(max_amount_willing_to_lose_grt)
.call()
.await;

let (mock_sender_allocation, _) =
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, ALLOCATION_ID_0);
let (allocation, _) = MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())
.await
.unwrap();

async fn get_deny_status(sender_account: &ActorRef<SenderAccountMessage>) -> bool {
call!(sender_account, SenderAccountMessage::GetDeny).unwrap()
}

macro_rules! update_receipt_fees {
($value:expr) => {
sender_account
.cast(SenderAccountMessage::UpdateRav(RavInformation {
allocation_id: ALLOCATION_ID_0,
value_aggregate: $value,
}))
.unwrap();

flush_messages(&notify).await;
};
}

let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap();
assert!(!deny);

update_receipt_fees!(ESCROW_VALUE - 1);
let deny = get_deny_status(&sender_account).await;
assert!(!deny, "it shouldn't deny a sender below escrow balance");

update_receipt_fees!(ESCROW_VALUE);
let deny = get_deny_status(&sender_account).await;
assert!(
!deny,
"it shouldn't deny a trusted sender below escrow balance + max willing to lose"
);

update_receipt_fees!(ESCROW_VALUE + max_amount_willing_to_lose_grt - 1);
let deny = get_deny_status(&sender_account).await;
assert!(
!deny,
"it shouldn't deny a trusted sender below escrow balance + max willing to lose"
);

update_receipt_fees!(ESCROW_VALUE + max_amount_willing_to_lose_grt);
let deny = get_deny_status(&sender_account).await;
assert!(
deny,
"it should deny a trusted sender over escrow balance + max willing to lose"
);

allocation.stop_and_wait(None, None).await.unwrap();

sender_account.stop_and_wait(None, None).await.unwrap();
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_pending_rav_already_redeemed_and_redeem(pgpool: PgPool) {
// Start a mock graphql server using wiremock
Expand Down
10 changes: 9 additions & 1 deletion crates/tap-agent/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lazy_static! {
pub const TRIGGER_VALUE: u128 = 500;
pub const RECEIPT_LIMIT: u64 = 10000;
pub const DUMMY_URL: &str = "http://localhost:1234";
const ESCROW_VALUE: u128 = 1000;
pub const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);
const RAV_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -93,6 +93,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
indexer_address: INDEXER.1,
escrow_polling_interval: ESCROW_POLLING_INTERVAL,
tap_sender_timeout: Duration::from_secs(63),
trusted_senders: HashSet::new(),
}))
}

Expand All @@ -107,12 +108,18 @@ pub async fn create_sender_account(
network_subgraph_endpoint: Option<&str>,
#[builder(default = RECEIPT_LIMIT)] rav_request_receipt_limit: u64,
aggregator_endpoint: Option<Url>,
#[builder(default = false)] trusted_sender: bool,
) -> (
ActorRef<SenderAccountMessage>,
Arc<Notify>,
String,
Sender<EscrowAccounts>,
) {
let trusted_senders = if trusted_sender {
HashSet::from([SENDER.1])
} else {
HashSet::new()
};
let config = Box::leak(Box::new(SenderAccountConfig {
rav_request_buffer: BUFFER_DURATION,
max_amount_willing_to_lose_grt,
Expand All @@ -122,6 +129,7 @@ pub async fn create_sender_account(
indexer_address: INDEXER.1,
escrow_polling_interval: Duration::default(),
tap_sender_timeout: TAP_SENDER_TIMEOUT,
trusted_senders,
}));

let network_subgraph = Box::leak(Box::new(
Expand Down
8 changes: 7 additions & 1 deletion crates/tap-agent/tests/tap_agent_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};

use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
use indexer_tap_agent::{
Expand Down Expand Up @@ -87,6 +92,7 @@ pub async fn start_agent(
indexer_address: INDEXER_ADDRESS,
escrow_polling_interval: Duration::from_secs(10),
tap_sender_timeout: Duration::from_secs(30),
trusted_senders: HashSet::new(),
}));

let args = SenderAccountsManagerArgs {
Expand Down

0 comments on commit bdc40ef

Please sign in to comment.