Skip to content

Commit

Permalink
refactor: update messages and add receipt notification test
Browse files Browse the repository at this point in the history
  • Loading branch information
gusinacio committed Apr 11, 2024
1 parent 81ee48d commit 6927b73
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 160 deletions.
176 changes: 110 additions & 66 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use crate::{

#[derive(Debug)]
pub enum SenderAccountMessage {
CreateSenderAllocation(Address),
UpdateAllocationIds(HashSet<Address>),
RemoveSenderAccount,
UpdateReceiptFees(Address, UnaggregatedReceipts),
#[cfg(test)]
GetAllocationTracker(ractor::RpcReplyPort<AllocationIdTracker>),
Expand Down Expand Up @@ -73,6 +71,33 @@ pub struct State {
}

impl State {
async fn create_sender_allocation(
&self,
sender_account_ref: ActorRef<SenderAccountMessage>,
allocation_id: Address,
) -> Result<()> {
let args = SenderAllocationArgs {
config: self.config,
pgpool: self.pgpool.clone(),
allocation_id,
sender: self.sender,
escrow_accounts: self.escrow_accounts.clone(),
escrow_subgraph: self.escrow_subgraph,
escrow_adapter: self.escrow_adapter.clone(),
domain_separator: self.domain_separator.clone(),
sender_aggregator_endpoint: self.sender_aggregator_endpoint.clone(),
sender_account_ref: sender_account_ref.clone(),
};

SenderAllocation::spawn_linked(
Some(self.format_sender_allocation(&allocation_id)),
SenderAllocation,
args,
sender_account_ref.get_cell(),
)
.await?;
Ok(())
}
fn format_sender_allocation(&self, allocation_id: &Address) -> String {
let mut sender_allocation_id = String::new();
if let Some(prefix) = &self.prefix {
Expand All @@ -85,13 +110,13 @@ impl State {

async fn rav_requester_single(&mut self) -> Result<()> {
let Some(allocation_id) = self.allocation_id_tracker.get_heaviest_allocation_id() else {
anyhow::bail!("Error while getting allocation with most unaggregated fees");
anyhow::bail!("Error while getting the heaviest allocation because none has unaggregated fees tracked");
};
let sender_allocation_id = self.format_sender_allocation(&allocation_id);
let allocation = ActorRef::<SenderAllocationMessage>::where_is(sender_allocation_id);

let Some(allocation) = allocation else {
anyhow::bail!("Error while getting allocation with most unaggregated fees");
anyhow::bail!("Error while getting allocation actor with most unaggregated fees");
};
// we call and wait for the response so we don't process anymore update
let result = call!(allocation, SenderAllocationMessage::TriggerRAVRequest)?;
Expand Down Expand Up @@ -140,16 +165,11 @@ impl Actor for SenderAccount {
}
});

for allocation_id in &allocation_ids {
// Create a sender allocation for each allocation
myself.cast(SenderAccountMessage::CreateSenderAllocation(*allocation_id))?;
}

let escrow_adapter = EscrowAdapter::new(escrow_accounts.clone(), sender_id);

Ok(State {
let state = State {
allocation_id_tracker: AllocationIdTracker::default(),
allocation_ids,
allocation_ids: allocation_ids.clone(),
_indexer_allocations_handle,
prefix,
escrow_accounts,
Expand All @@ -160,7 +180,16 @@ impl Actor for SenderAccount {
config,
pgpool,
sender: sender_id,
})
};

for allocation_id in &allocation_ids {
// Create a sender allocation for each allocation
state
.create_sender_allocation(myself.clone(), *allocation_id)
.await?;
}

Ok(state)
}

async fn handle(
Expand All @@ -170,9 +199,6 @@ impl Actor for SenderAccount {
state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SenderAccountMessage::RemoveSenderAccount => {
myself.stop(Some("Received Remove Sender Account message".into()))
}
SenderAccountMessage::UpdateReceiptFees(allocation_id, unaggregated_fees) => {
let tracker = &mut state.allocation_id_tracker;
tracker.add_or_update(allocation_id, unaggregated_fees.value);
Expand All @@ -181,40 +207,20 @@ impl Actor for SenderAccount {
state.rav_requester_single().await?;
}
}
SenderAccountMessage::CreateSenderAllocation(allocation_id) => {
let args = SenderAllocationArgs {
config: state.config,
pgpool: state.pgpool.clone(),
allocation_id,
sender: state.sender,
escrow_accounts: state.escrow_accounts.clone(),
escrow_subgraph: state.escrow_subgraph,
escrow_adapter: state.escrow_adapter.clone(),
domain_separator: state.domain_separator.clone(),
sender_aggregator_endpoint: state.sender_aggregator_endpoint.clone(),
sender_account_ref: myself.clone(),
};

SenderAllocation::spawn_linked(
Some(state.format_sender_allocation(&allocation_id)),
SenderAllocation,
args,
myself.get_cell(),
)
.await?;
}
SenderAccountMessage::UpdateAllocationIds(allocation_ids) => {
// Create new sender allocations
for allocation_id in allocation_ids.difference(&state.allocation_ids) {
myself.cast(SenderAccountMessage::CreateSenderAllocation(*allocation_id))?;
state
.create_sender_allocation(myself.clone(), *allocation_id)
.await?;
}

// Remove sender allocations
for allocation_id in state.allocation_ids.difference(&allocation_ids) {
if let Some(sender_handle) = ActorRef::<SenderAllocationMessage>::where_is(
state.format_sender_allocation(allocation_id),
) {
sender_handle.cast(SenderAllocationMessage::CloseAllocation)?;
sender_handle.stop(None);
}
}

Expand All @@ -236,11 +242,25 @@ impl Actor for SenderAccount {
&self,
_myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(_, _, _) | SupervisionEvent::ActorPanicked(_, _) => {
SupervisionEvent::ActorTerminated(cell, _, _)
| SupervisionEvent::ActorPanicked(cell, _) => {
// what to do in case of termination or panic?

let Some(allocation_id) = cell.get_name() else {
return Ok(());
};
let Some(allocation_id) = allocation_id.split(':').last() else {
return Ok(());
};
let Ok(allocation_id) = Address::parse_checksummed(allocation_id, None) else {
return Ok(());
};

let tracker = &mut state.allocation_id_tracker;
tracker.add_or_update(allocation_id, 0);
}
_ => {}
}
Expand All @@ -249,8 +269,9 @@ impl Actor for SenderAccount {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
use crate::agent::sender_accounts_manager::NewReceiptNotification;
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
use crate::config;
Expand All @@ -266,7 +287,7 @@ mod tests {
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

// we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare
Expand All @@ -275,7 +296,6 @@ mod tests {
impl PartialEq for SenderAccountMessage {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::CreateSenderAllocation(l0), Self::CreateSenderAllocation(r0)) => l0 == r0,
(Self::UpdateAllocationIds(l0), Self::UpdateAllocationIds(r0)) => l0 == r0,
(Self::UpdateReceiptFees(l0, l1), Self::UpdateReceiptFees(r0, r1)) => {
l0 == r0 && l1 == r1
Expand All @@ -287,7 +307,6 @@ mod tests {

static PREFIX_ID: AtomicU32 = AtomicU32::new(0);
const DUMMY_URL: &str = "http://localhost:1234";
const VALUE_PER_RECEIPT: u128 = 100;
const TRIGGER_VALUE: u128 = 500;

async fn create_sender_account(
Expand Down Expand Up @@ -381,8 +400,33 @@ mod tests {
handle.await.unwrap();
}

struct MockSenderAllocation {
pub struct MockSenderAllocation {
triggered_rav_request: Arc<AtomicBool>,
receipts: Arc<Mutex<Vec<NewReceiptNotification>>>,
}

impl MockSenderAllocation {
pub fn new_with_triggered_rav_request() -> (Self, Arc<AtomicBool>) {
let triggered_rav_request = Arc::new(AtomicBool::new(false));
(
Self {
triggered_rav_request: triggered_rav_request.clone(),
receipts: Arc::new(Mutex::new(Vec::new())),
},
triggered_rav_request,
)
}

pub fn new_with_receipts() -> (Self, Arc<Mutex<Vec<NewReceiptNotification>>>) {
let receipts = Arc::new(Mutex::new(Vec::new()));
(
Self {
triggered_rav_request: Arc::new(AtomicBool::new(false)),
receipts: receipts.clone(),
},
receipts,
)
}
}

#[async_trait::async_trait]
Expand All @@ -405,10 +449,16 @@ mod tests {
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if let SenderAllocationMessage::TriggerRAVRequest(reply) = message {
self.triggered_rav_request
.store(true, std::sync::atomic::Ordering::SeqCst);
reply.send(UnaggregatedReceipts::default())?;
match message {
SenderAllocationMessage::TriggerRAVRequest(reply) => {
self.triggered_rav_request
.store(true, std::sync::atomic::Ordering::SeqCst);
reply.send(UnaggregatedReceipts::default())?;
}
SenderAllocationMessage::NewReceipt(receipt) => {
self.receipts.lock().unwrap().push(receipt);
}
_ => {}
}
Ok(())
}
Expand All @@ -423,18 +473,14 @@ mod tests {
ActorRef<SenderAllocationMessage>,
JoinHandle<()>,
) {
let triggered_rav_request = Arc::new(AtomicBool::new(false));
let (mock_sender_allocation, triggered_rav_request) =
MockSenderAllocation::new_with_triggered_rav_request();

let name = format!("{}:{}:{}", prefix, sender, allocation);
let (sender_account, join_handle) = MockSenderAllocation::spawn(
Some(name),
MockSenderAllocation {
triggered_rav_request: triggered_rav_request.clone(),
},
(),
)
.await
.unwrap();
let (sender_account, join_handle) =
MockSenderAllocation::spawn(Some(name), mock_sender_allocation, ())
.await
.unwrap();
(triggered_rav_request, sender_account, join_handle)
}

Expand All @@ -450,7 +496,7 @@ mod tests {
.cast(SenderAccountMessage::UpdateReceiptFees(
*ALLOCATION_ID_0,
UnaggregatedReceipts {
value: VALUE_PER_RECEIPT,
value: TRIGGER_VALUE - 1,
last_id: 10,
},
))
Expand Down Expand Up @@ -510,15 +556,13 @@ mod tests {
};

// stop
sender_account
.cast(SenderAccountMessage::RemoveSenderAccount)
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
sender_account.stop_and_wait(None, None).await.unwrap();

// check if sender_account is stopped

assert_eq!(sender_account.get_status(), ActorStatus::Stopped);

tokio::time::sleep(Duration::from_millis(10)).await;

// check if sender_allocation is also stopped
assert_eq!(sender_allocation.get_status(), ActorStatus::Stopped);

Expand Down
Loading

0 comments on commit 6927b73

Please sign in to comment.