Skip to content

Commit

Permalink
test: add tests to sender accounts manager
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Apr 9, 2024
1 parent 37ac6ec commit 6500a78
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 173 deletions.
1 change: 1 addition & 0 deletions tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub async fn start_agent() -> ActorRef<SenderAccountsManagerMessage> {
escrow_accounts,
escrow_subgraph,
sender_aggregator_endpoints,
prefix: None,
};

let (manager, _) = SenderAccountsManager::spawn(None, SenderAccountsManager, args)
Expand Down
21 changes: 19 additions & 2 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy_sol_types::Eip712Domain;
use anyhow::Result;
use eventuals::{Eventual, EventualExt, PipeHandle};
use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient};
use ractor::{call, Actor, ActorProcessingErr, ActorRef};
use ractor::{call, Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use sqlx::PgPool;
use thegraph::types::Address;
use tracing::error;
Expand Down Expand Up @@ -162,14 +162,31 @@ impl Actor for SenderAccount {
})
}

async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(_, _, _) | SupervisionEvent::ActorPanicked(_, _) => {
// what to do in case of termination or panic?
}
_ => {}
}
Ok(())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SenderAccountMessage::RemoveSenderAccount => myself.stop(None),
SenderAccountMessage::RemoveSenderAccount => {
myself.stop(Some("Received Remove Sender Account message".into()))
}
SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => {
let tracker = &mut state.allocation_id_tracker;
tracker.add_or_update(allocation_id, unaggregated_fees.value);
Expand Down
261 changes: 246 additions & 15 deletions tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use anyhow::Result;
use eventuals::{Eventual, EventualExt, PipeHandle};
use indexer_common::escrow_accounts::EscrowAccounts;
use indexer_common::prelude::{Allocation, SubgraphClient};
use ractor::{Actor, ActorProcessingErr, ActorRef};
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use serde::Deserialize;
use sqlx::{postgres::PgListener, PgPool};
use thegraph::types::Address;
use tokio::select;
use tracing::{error, warn};

use super::sender_account::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
Expand Down Expand Up @@ -45,6 +46,8 @@ pub struct SenderAccountsManagerArgs {
pub escrow_accounts: Eventual<EscrowAccounts>,
pub escrow_subgraph: &'static SubgraphClient,
pub sender_aggregator_endpoints: HashMap<Address, String>,

pub prefix: Option<String>,
}

pub struct State {
Expand All @@ -59,6 +62,7 @@ pub struct State {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_subgraph: &'static SubgraphClient,
sender_aggregator_endpoints: HashMap<Address, String>,
prefix: Option<String>,
}

#[async_trait::async_trait]
Expand All @@ -78,6 +82,7 @@ impl Actor for SenderAccountsManager {
escrow_accounts,
escrow_subgraph,
sender_aggregator_endpoints,
prefix,
}: Self::Arguments,
) -> std::result::Result<Self::State, ActorProcessingErr> {
let indexer_allocations = indexer_allocations.map(|allocations| async move {
Expand Down Expand Up @@ -121,8 +126,15 @@ impl Actor for SenderAccountsManager {
escrow_accounts,
escrow_subgraph,
sender_aggregator_endpoints,
prefix,
};
let sender_allocation = select! {
sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation,
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
panic!("Timeout while getting pending sender allocation ids");
}
};
let sender_allocation = state.get_pending_sender_allocation_id().await;

for (sender_id, allocation_ids) in sender_allocation {
myself.cast(SenderAccountsManagerMessage::CreateSenderAccount(
sender_id,
Expand All @@ -143,6 +155,21 @@ impl Actor for SenderAccountsManager {
Ok(())
}

async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(_, _, _) | SupervisionEvent::ActorPanicked(_, _) => {
// what to do in case of termination or panic
}
_ => {}
}
Ok(())
}

async fn handle(
&self,
myself: ActorRef<Self::Msg>,
Expand All @@ -161,9 +188,9 @@ impl Actor for SenderAccountsManager {

// Remove sender accounts
for sender in state.sender_ids.difference(&target_senders) {
if let Some(sender_handle) =
ActorRef::<SenderAccountMessage>::where_is(sender.to_string())
{
if let Some(sender_handle) = ActorRef::<SenderAccountMessage>::where_is(
state.format_sender_account(sender),
) {
sender_handle.cast(SenderAccountMessage::RemoveSenderAccount)?;
}
}
Expand All @@ -173,7 +200,7 @@ impl Actor for SenderAccountsManager {
SenderAccountsManagerMessage::CreateSenderAccount(sender_id, allocation_ids) => {
let args = state.new_sender_account_args(&sender_id, allocation_ids)?;
SenderAccount::spawn_linked(
Some(sender_id.to_string()),
Some(state.format_sender_account(&sender_id)),
SenderAccount,
args,
myself.get_cell(),
Expand All @@ -186,6 +213,16 @@ impl Actor for SenderAccountsManager {
}

impl State {
fn format_sender_account(&self, sender: &Address) -> String {
let mut sender_allocation_id = String::new();
if let Some(prefix) = &self.prefix {
sender_allocation_id.push_str(prefix);
sender_allocation_id.push(':');
}
sender_allocation_id.push_str(&format!("{}", sender));
sender_allocation_id
}

async fn get_pending_sender_allocation_id(&self) -> HashMap<Address, HashSet<Address>> {
let escrow_accounts_snapshot = self
.escrow_accounts
Expand Down Expand Up @@ -308,7 +345,7 @@ impl State {
})?
.clone(),
allocation_ids,
prefix: None,
prefix: self.prefix.clone(),
})
}
}
Expand Down Expand Up @@ -374,15 +411,209 @@ async fn new_receipts_watcher(

#[cfg(test)]
mod tests {
#[test]
fn test_create_sender_accounts_manager() {}
use super::{
SenderAccountsManager, SenderAccountsManagerArgs, SenderAccountsManagerMessage, State,
};
use crate::agent::sender_account::SenderAccountMessage;
use crate::config;
use crate::tap::test_utils::{
create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0,
ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR,
};
use alloy_primitives::Address;
use eventuals::{Eventual, EventualExt};
use indexer_common::allocations::Allocation;
use indexer_common::escrow_accounts::EscrowAccounts;
use indexer_common::prelude::{DeploymentDetails, SubgraphClient};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorRef};
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU32;

const DUMMY_URL: &str = "http://localhost:1234";
static PREFIX_ID: AtomicU32 = AtomicU32::new(0);

fn get_subgraph_client() -> &'static SubgraphClient {
Box::leak(Box::new(SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(DUMMY_URL).unwrap(),
)))
}

fn get_config() -> &'static config::Cli {
Box::leak(Box::new(config::Cli {
config: None,
ethereum: config::Ethereum {
indexer_address: INDEXER.1,
},
tap: config::Tap {
rav_request_trigger_value: 100,
rav_request_timestamp_buffer_ms: 1,
..Default::default()
},
..Default::default()
}))
}

async fn create_sender_accounts_manager(
pgpool: PgPool,
) -> (
String,
(ActorRef<SenderAccountsManagerMessage>, JoinHandle<()>),
) {
let config = get_config();

let (mut indexer_allocations_writer, indexer_allocations_eventual) =
Eventual::<HashMap<Address, Allocation>>::new();
indexer_allocations_writer.write(HashMap::new());
let escrow_subgraph = get_subgraph_client();

let (mut escrow_accounts_writer, escrow_accounts_eventual) =
Eventual::<EscrowAccounts>::new();
escrow_accounts_writer.write(EscrowAccounts::default());

let prefix = format!(
"test-{}",
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
);
let args = SenderAccountsManagerArgs {
config,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
pgpool,
indexer_allocations: indexer_allocations_eventual,
escrow_accounts: escrow_accounts_eventual,
escrow_subgraph,
sender_aggregator_endpoints: HashMap::from([
(SENDER.1, String::from("http://localhost:8000")),
(SENDER_2.1, String::from("http://localhost:8000")),
]),
prefix: Some(prefix.clone()),
};
(
prefix,
SenderAccountsManager::spawn(None, SenderAccountsManager, args)
.await
.unwrap(),
)
}

#[sqlx::test(migrations = "../migrations")]
async fn test_create_sender_accounts_manager(pgpool: PgPool) {
let (_, (actor, join_handle)) = create_sender_accounts_manager(pgpool).await;
actor
.stop_and_wait(Some("Test".into()), None)
.await
.unwrap();
join_handle.await.unwrap();
}

#[sqlx::test(migrations = "../migrations")]
async fn test_pending_sender_allocations(pgpool: PgPool) {
let config = get_config();
let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();
let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers);
let state = State {
config,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
sender_ids: HashSet::new(),
new_receipts_watcher_handle: tokio::spawn(async {}),
_eligible_allocations_senders_pipe: Eventual::from_value(()).pipe_async(|_| async {}),
pgpool: pgpool.clone(),
indexer_allocations: Eventual::from_value(HashSet::new()),
escrow_accounts: Eventual::from_value(escrow_accounts),
escrow_subgraph: get_subgraph_client(),
sender_aggregator_endpoints: HashMap::new(),
prefix: None,
};

// add receipts to the database
for i in 1..=10 {
let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, i, i, i.into());
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
}

// add non-final ravs
let signed_rav = create_rav(*ALLOCATION_ID_1, SIGNER.0.clone(), 4, 10);
store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap();

#[test]
fn test_create_with_pending_sender_allocations() {}
let pending_allocation_id = state.get_pending_sender_allocation_id().await;

#[test]
fn test_update_sender_allocation() {}
// check if pending allocations are correct
assert_eq!(pending_allocation_id.len(), 1);
assert!(pending_allocation_id.get(&SENDER.1).is_some());
assert_eq!(pending_allocation_id.get(&SENDER.1).unwrap().len(), 2);
}

#[test]
fn test_create_sender_account() {}
#[sqlx::test(migrations = "../migrations")]
async fn test_update_sender_allocation(pgpool: PgPool) {
let (prefix, (actor, join_handle)) = create_sender_accounts_manager(pgpool).await;

actor
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(
vec![SENDER.1].into_iter().collect(),
))
.unwrap();

tokio::time::sleep(std::time::Duration::from_millis(10)).await;

// verify if create sender account
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
"{}:{}",
prefix.clone(),
SENDER.1.to_string()
));
assert!(actor_ref.is_some());

actor
.cast(SenderAccountsManagerMessage::UpdateSenderAccounts(
vec![].into_iter().collect(),
))
.unwrap();

tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// verify if it gets removed
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
"{}:{}",
prefix,
SENDER.1.to_string()
));
assert!(actor_ref.is_none());

// safely stop the manager
actor
.stop_and_wait(Some("Test".into()), None)
.await
.unwrap();
join_handle.await.unwrap();
}

#[sqlx::test(migrations = "../migrations")]
async fn test_create_sender_account(pgpool: PgPool) {
let (prefix, (actor, join_handle)) = create_sender_accounts_manager(pgpool).await;
actor
.cast(SenderAccountsManagerMessage::CreateSenderAccount(
SENDER_2.1,
HashSet::new(),
))
.unwrap();
// we wait to check if the sender is created
tokio::time::sleep(std::time::Duration::from_millis(10)).await;

let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
"{}:{}",
prefix,
SENDER_2.1.to_string()
));
assert!(actor_ref.is_some());

actor
.stop_and_wait(Some("Test".into()), None)
.await
.unwrap();
join_handle.await.unwrap();
}
}
Loading

0 comments on commit 6500a78

Please sign in to comment.