Skip to content

Commit

Permalink
refactor: use notify wrapper for rav trigger (#135)
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio authored Mar 14, 2024
1 parent c03c5b3 commit e3bebc2
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions tap-agent/src/tap/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,39 @@ pub struct Inner {
unaggregated_receipts_guard: Arc<TokioMutex<()>>,
}

/// Wrapper around a `Notify` to trigger RAV requests.
/// This gives a better understanding of what is happening
/// because the methods names are more explicit.
#[derive(Clone)]
pub struct RavTrigger(Arc<Notify>);

impl RavTrigger {
pub fn new() -> Self {
Self(Arc::new(Notify::new()))
}

/// Trigger a RAV request if there are any waiters.
/// In case there are no waiters, nothing happens.
pub fn trigger_rav(&self) {
self.0.notify_waiters();
}

/// Wait for a RAV trigger.
pub async fn wait_for_rav_request(&self) {
self.0.notified().await;
}

/// Trigger a RAV request. In case there are no waiters, the request is queued
/// and is executed on the next call to wait_for_rav_trigger().
pub fn trigger_next_rav(&self) {
self.0.notify_one();
}
}

impl Inner {
async fn rav_requester(&self, notif_value_trigger: Arc<Notify>) {
async fn rav_requester(&self, trigger: RavTrigger) {
loop {
notif_value_trigger.notified().await;
trigger.wait_for_rav_request().await;

if let Err(error) = self.rav_requester_single().await {
// If an error occoured, we shouldn't retry right away, so we wait for a bit.
Expand All @@ -64,7 +93,7 @@ impl Inner {
let unaggregated_fees = self.unaggregated_fees.lock().unwrap().clone();
if unaggregated_fees.value >= self.config.tap.rav_request_trigger_value.into() {
// If so, "self-notify" to trigger another RAV request.
notif_value_trigger.notify_one();
trigger.trigger_next_rav();

warn!(
"Sender {} has {} unaggregated fees immediately after a RAV request, which is
Expand All @@ -75,13 +104,13 @@ impl Inner {
}
}

async fn rav_requester_finalize(&self, notif_finalize_allocations: Arc<Notify>) {
async fn rav_requester_finalize(&self, finalize_trigger: RavTrigger) {
loop {
// Wait for either 5 minutes or a notification that we need to try to finalize
// allocation receipts.
select! {
_ = time::sleep(Duration::from_secs(300)) => (),
_ = notif_finalize_allocations.notified() => ()
_ = finalize_trigger.wait_for_rav_request() => ()
}

// Get a quick snapshot of the current finalizing allocations. They are
Expand Down Expand Up @@ -229,9 +258,9 @@ pub struct SenderAccount {
escrow_adapter: EscrowAdapter,
tap_eip712_domain_separator: Eip712Domain,
rav_requester_task: tokio::task::JoinHandle<()>,
rav_requester_notify: Arc<Notify>,
rav_trigger: RavTrigger,
rav_requester_finalize_task: tokio::task::JoinHandle<()>,
rav_requester_finalize_notify: Arc<Notify>,
rav_finalize_trigger: RavTrigger,
unaggregated_receipts_guard: Arc<TokioMutex<()>>,
}

Expand Down Expand Up @@ -260,23 +289,21 @@ impl SenderAccount {
unaggregated_receipts_guard: unaggregated_receipts_guard.clone(),
});

let rav_requester_notify = Arc::new(Notify::new());
let rav_trigger = RavTrigger::new();
let rav_requester_task = tokio::spawn({
let inner = inner.clone();
let rav_requester_notify = rav_requester_notify.clone();
let rav_trigger = rav_trigger.clone();
async move {
inner.rav_requester(rav_requester_notify).await;
inner.rav_requester(rav_trigger).await;
}
});

let rav_requester_finalize_notify = Arc::new(Notify::new());
let rav_finalize_trigger = RavTrigger::new();
let rav_requester_finalize_task = tokio::spawn({
let inner = inner.clone();
let rav_requester_finalize_notify = rav_requester_finalize_notify.clone();
let rav_finalize_trigger = rav_finalize_trigger.clone();
async move {
inner
.rav_requester_finalize(rav_requester_finalize_notify)
.await;
inner.rav_requester_finalize(rav_finalize_trigger).await;
}
});

Expand All @@ -287,9 +314,9 @@ impl SenderAccount {
escrow_adapter,
tap_eip712_domain_separator,
rav_requester_task,
rav_requester_notify,
rav_trigger,
rav_requester_finalize_task,
rav_requester_finalize_notify,
rav_finalize_trigger,
unaggregated_receipts_guard,
}
}
Expand All @@ -316,7 +343,7 @@ impl SenderAccount {
}

if allocations_to_finalize {
self.rav_requester_finalize_notify.notify_waiters();
self.rav_finalize_trigger.trigger_rav();
}
}

Expand Down Expand Up @@ -384,7 +411,7 @@ impl SenderAccount {
// Check if we need to trigger a RAV request.
if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into()
{
self.rav_requester_notify.notify_waiters();
self.rav_trigger.trigger_rav();
}
}
} else {
Expand Down

0 comments on commit e3bebc2

Please sign in to comment.