Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add escape hatch to trusted senders #621

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/config/maximal-config-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ max_receipt_value_grt = "0.001" # 0.001 GRT. We use strings to prevent rounding
# max_amount_willing_to_lose_grt = "0.1"
max_amount_willing_to_lose_grt = 20

# List of Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
# over the escrow balance
trusted_senders = ["0xdeadbeefcafebabedeadbeefcafebabedeadbeef"]


# Receipts query timeout
sender_timeout_secs = 30

Expand Down
13 changes: 10 additions & 3 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
env,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
Expand Down Expand Up @@ -382,6 +382,11 @@ pub struct TapConfig {
pub sender_timeout_secs: Duration,

pub sender_aggregator_endpoints: HashMap<Address, Url>,

/// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
/// over the escrow balance
#[serde(default)]
pub trusted_senders: HashSet<Address>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -431,11 +436,11 @@ pub struct RavRequestConfig {

#[cfg(test)]
mod tests {
use std::{env, fs, path::PathBuf, str::FromStr};
use std::{collections::HashSet, env, fs, path::PathBuf, str::FromStr};

use figment::value::Uncased;
use sealed_test::prelude::*;
use thegraph_core::alloy::primitives::{Address, FixedBytes};
use thegraph_core::alloy::primitives::{address, Address, FixedBytes};
use tracing_test::traced_test;

use super::{DatabaseConfig, SHARED_PREFIX};
Expand All @@ -458,6 +463,8 @@ mod tests {
Some(PathBuf::from("minimal-config-example.toml")).as_ref(),
)
.unwrap();
max_config.tap.trusted_senders =
HashSet::from([address!("deadbeefcafebabedeadbeefcafebabedeadbeef")]);
max_config.dips = Some(crate::DipsConfig {
allowed_payers: vec![Address(
FixedBytes::<20>::from_str("0x3333333333333333333333333333333333333333").unwrap(),
Expand Down
3 changes: 1 addition & 2 deletions crates/dips/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ mod test {
time::{Duration, SystemTime, UNIX_EPOCH},
};

use rand::{distr::Alphanumeric, Rng};
use thegraph_core::alloy::{
primitives::{Address, FixedBytes, U256},
signers::local::PrivateKeySigner,
Expand All @@ -386,8 +387,6 @@ mod test {
price::PriceCalculator, CancellationRequest, DipsError, IndexingAgreementVoucher,
SignedIndexingAgreementVoucher, SubgraphIndexingVoucherMetadata,
};
use rand::distr::Alphanumeric;
use rand::Rng;

#[tokio::test]
async fn test_validate_and_create_agreement() -> anyhow::Result<()> {
Expand Down
3 changes: 1 addition & 2 deletions crates/dips/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use thegraph_core::alloy::primitives::Address;
use thegraph_core::alloy::sol_types::Eip712Domain;
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use tonic::{Request, Response, Status};

use crate::{
Expand Down
97 changes: 92 additions & 5 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ pub struct State {
// reset in case of a successful response
backoff_info: BackoffInfo,

/// Allows the sender to go over escrow balance
/// limited to `max_amount_willing_to_lose_grt`
trusted_sender: bool,

// Config forwarded to [SenderAllocation]
config: &'static SenderAccountConfig,
}
Expand All @@ -343,6 +347,9 @@ pub struct SenderAccountConfig {
///
/// This is reached if the database is too slow
pub tap_sender_timeout: Duration,
/// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
/// over the escrow balance
pub trusted_senders: HashSet<Address>,
}

impl SenderAccountConfig {
Expand All @@ -357,6 +364,7 @@ impl SenderAccountConfig {
trigger_value: config.tap.get_trigger_value(),
rav_request_timeout: config.tap.rav_request.request_timeout_secs,
tap_sender_timeout: config.tap.sender_timeout_secs,
trusted_senders: config.tap.trusted_senders.clone(),
}
}
}
Expand Down Expand Up @@ -531,14 +539,22 @@ impl State {
fn deny_condition_reached(&self) -> bool {
let pending_ravs = self.rav_tracker.get_total_fee();
let unaggregated_fees = self.sender_fee_tracker.get_total_fee();
let pending_fees_over_balance =
U256::from(pending_ravs + unaggregated_fees) >= self.sender_balance;
let max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt;

// if it's a trusted sender, allow to spend up to max_amount_willing_to_lose
let balance = if self.trusted_sender {
self.sender_balance + U256::from(max_amount_willing_to_lose)
} else {
self.sender_balance
};

let pending_fees_over_balance = U256::from(pending_ravs + unaggregated_fees) >= balance;
let invalid_receipt_fees = self.invalid_receipts_tracker.get_total_fee();
let total_fee_over_max_value =
unaggregated_fees + invalid_receipt_fees >= max_amount_willing_to_lose;

tracing::trace!(
trusted_sender = %self.trusted_sender,
%pending_fees_over_balance,
%total_fee_over_max_value,
"Verifying if deny condition was reached.",
Expand All @@ -550,6 +566,7 @@ impl State {
/// Will update [`State::denied`], as well as the denylist table in the database.
async fn add_to_denylist(&mut self) {
tracing::warn!(
trusted_sender = %self.trusted_sender,
fee_tracker = self.sender_fee_tracker.get_total_fee(),
rav_tracker = self.rav_tracker.get_total_fee(),
max_amount_willing_to_lose = self.config.max_amount_willing_to_lose_grt,
Expand Down Expand Up @@ -841,6 +858,7 @@ impl Actor for SenderAccount {
aggregator_v1,
aggregator_v2,
backoff_info: BackoffInfo::default(),
trusted_sender: config.trusted_senders.contains(&sender_id),
config,
};

Expand Down Expand Up @@ -1284,7 +1302,7 @@ pub mod tests {
Mock, MockServer, ResponseTemplate,
};

use super::SenderAccountMessage;
use super::{RavInformation, SenderAccountMessage};
use crate::{
agent::{
sender_account::ReceiptFees, sender_accounts_manager::AllocationId,
Expand All @@ -1294,7 +1312,7 @@ pub mod tests {
assert_not_triggered, assert_triggered,
test::{
actors::{create_mock_sender_allocation, MockSenderAllocation},
create_rav, create_sender_account, store_rav_with_options, TRIGGER_VALUE,
create_rav, create_sender_account, store_rav_with_options, ESCROW_VALUE, TRIGGER_VALUE,
},
};

Expand Down Expand Up @@ -1343,7 +1361,6 @@ pub mod tests {
}

/// Prefix shared between tests so we don't have conflicts in the global registry
const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);

Expand Down Expand Up @@ -1986,6 +2003,76 @@ pub mod tests {
sender_account.stop_and_wait(None, None).await.unwrap();
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_trusted_sender(pgpool: PgPool) {
let max_amount_willing_to_lose_grt = ESCROW_VALUE / 10;
// initialize with no trigger value and no max receipt deny
let (sender_account, notify, prefix, _) = create_sender_account()
.pgpool(pgpool)
.trusted_sender(true)
.rav_request_trigger_value(u128::MAX)
.max_amount_willing_to_lose_grt(max_amount_willing_to_lose_grt)
.call()
.await;

let (mock_sender_allocation, _) =
MockSenderAllocation::new_with_next_rav_value(sender_account.clone());

let name = format!("{}:{}:{}", prefix, SENDER.1, ALLOCATION_ID_0);
let (allocation, _) = MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())
.await
.unwrap();

async fn get_deny_status(sender_account: &ActorRef<SenderAccountMessage>) -> bool {
call!(sender_account, SenderAccountMessage::GetDeny).unwrap()
}

macro_rules! update_receipt_fees {
($value:expr) => {
sender_account
.cast(SenderAccountMessage::UpdateRav(RavInformation {
allocation_id: ALLOCATION_ID_0,
value_aggregate: $value,
}))
.unwrap();

flush_messages(&notify).await;
};
}

let deny = call!(sender_account, SenderAccountMessage::GetDeny).unwrap();
assert!(!deny);

update_receipt_fees!(ESCROW_VALUE - 1);
let deny = get_deny_status(&sender_account).await;
assert!(!deny, "it shouldn't deny a sender below escrow balance");

update_receipt_fees!(ESCROW_VALUE);
let deny = get_deny_status(&sender_account).await;
assert!(
!deny,
"it shouldn't deny a trusted sender below escrow balance + max willing to lose"
);

update_receipt_fees!(ESCROW_VALUE + max_amount_willing_to_lose_grt - 1);
let deny = get_deny_status(&sender_account).await;
assert!(
!deny,
"it shouldn't deny a trusted sender below escrow balance + max willing to lose"
);

update_receipt_fees!(ESCROW_VALUE + max_amount_willing_to_lose_grt);
let deny = get_deny_status(&sender_account).await;
assert!(
deny,
"it should deny a trusted sender over escrow balance + max willing to lose"
);

allocation.stop_and_wait(None, None).await.unwrap();

sender_account.stop_and_wait(None, None).await.unwrap();
}

#[sqlx::test(migrations = "../../migrations")]
async fn test_pending_rav_already_redeemed_and_redeem(pgpool: PgPool) {
// Start a mock graphql server using wiremock
Expand Down
10 changes: 9 additions & 1 deletion crates/tap-agent/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lazy_static! {
pub const TRIGGER_VALUE: u128 = 500;
pub const RECEIPT_LIMIT: u64 = 10000;
pub const DUMMY_URL: &str = "http://localhost:1234";
const ESCROW_VALUE: u128 = 1000;
pub const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);
const RAV_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down Expand Up @@ -93,6 +93,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
indexer_address: INDEXER.1,
escrow_polling_interval: ESCROW_POLLING_INTERVAL,
tap_sender_timeout: Duration::from_secs(63),
trusted_senders: HashSet::new(),
}))
}

Expand All @@ -107,12 +108,18 @@ pub async fn create_sender_account(
network_subgraph_endpoint: Option<&str>,
#[builder(default = RECEIPT_LIMIT)] rav_request_receipt_limit: u64,
aggregator_endpoint: Option<Url>,
#[builder(default = false)] trusted_sender: bool,
) -> (
ActorRef<SenderAccountMessage>,
Arc<Notify>,
String,
Sender<EscrowAccounts>,
) {
let trusted_senders = if trusted_sender {
HashSet::from([SENDER.1])
} else {
HashSet::new()
};
let config = Box::leak(Box::new(SenderAccountConfig {
rav_request_buffer: BUFFER_DURATION,
max_amount_willing_to_lose_grt,
Expand All @@ -122,6 +129,7 @@ pub async fn create_sender_account(
indexer_address: INDEXER.1,
escrow_polling_interval: Duration::default(),
tap_sender_timeout: TAP_SENDER_TIMEOUT,
trusted_senders,
}));

let network_subgraph = Box::leak(Box::new(
Expand Down
8 changes: 7 additions & 1 deletion crates/tap-agent/tests/tap_agent_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};

use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
use indexer_tap_agent::{
Expand Down Expand Up @@ -87,6 +92,7 @@ pub async fn start_agent(
indexer_address: INDEXER_ADDRESS,
escrow_polling_interval: Duration::from_secs(10),
tap_sender_timeout: Duration::from_secs(30),
trusted_senders: HashSet::new(),
}));

let args = SenderAccountsManagerArgs {
Expand Down
Loading