From e3bebc246801fd931c18b621e8a7046846e54658 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 14 Mar 2024 16:50:13 -0300 Subject: [PATCH] refactor: use notify wrapper for rav trigger (#135) Signed-off-by: Gustavo Inacio --- tap-agent/src/tap/sender_account.rs | 65 ++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/tap-agent/src/tap/sender_account.rs b/tap-agent/src/tap/sender_account.rs index 2e8af53d..faf82057 100644 --- a/tap-agent/src/tap/sender_account.rs +++ b/tap-agent/src/tap/sender_account.rs @@ -44,10 +44,39 @@ pub struct Inner { unaggregated_receipts_guard: Arc>, } +/// Wrapper around a `Notify` to trigger RAV requests. +/// This gives a better understanding of what is happening +/// because the methods names are more explicit. +#[derive(Clone)] +pub struct RavTrigger(Arc); + +impl RavTrigger { + pub fn new() -> Self { + Self(Arc::new(Notify::new())) + } + + /// Trigger a RAV request if there are any waiters. + /// In case there are no waiters, nothing happens. + pub fn trigger_rav(&self) { + self.0.notify_waiters(); + } + + /// Wait for a RAV trigger. + pub async fn wait_for_rav_request(&self) { + self.0.notified().await; + } + + /// Trigger a RAV request. In case there are no waiters, the request is queued + /// and is executed on the next call to wait_for_rav_trigger(). + pub fn trigger_next_rav(&self) { + self.0.notify_one(); + } +} + impl Inner { - async fn rav_requester(&self, notif_value_trigger: Arc) { + async fn rav_requester(&self, trigger: RavTrigger) { loop { - notif_value_trigger.notified().await; + trigger.wait_for_rav_request().await; if let Err(error) = self.rav_requester_single().await { // If an error occoured, we shouldn't retry right away, so we wait for a bit. @@ -64,7 +93,7 @@ impl Inner { let unaggregated_fees = self.unaggregated_fees.lock().unwrap().clone(); if unaggregated_fees.value >= self.config.tap.rav_request_trigger_value.into() { // If so, "self-notify" to trigger another RAV request. - notif_value_trigger.notify_one(); + trigger.trigger_next_rav(); warn!( "Sender {} has {} unaggregated fees immediately after a RAV request, which is @@ -75,13 +104,13 @@ impl Inner { } } - async fn rav_requester_finalize(&self, notif_finalize_allocations: Arc) { + async fn rav_requester_finalize(&self, finalize_trigger: RavTrigger) { loop { // Wait for either 5 minutes or a notification that we need to try to finalize // allocation receipts. select! { _ = time::sleep(Duration::from_secs(300)) => (), - _ = notif_finalize_allocations.notified() => () + _ = finalize_trigger.wait_for_rav_request() => () } // Get a quick snapshot of the current finalizing allocations. They are @@ -229,9 +258,9 @@ pub struct SenderAccount { escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, rav_requester_task: tokio::task::JoinHandle<()>, - rav_requester_notify: Arc, + rav_trigger: RavTrigger, rav_requester_finalize_task: tokio::task::JoinHandle<()>, - rav_requester_finalize_notify: Arc, + rav_finalize_trigger: RavTrigger, unaggregated_receipts_guard: Arc>, } @@ -260,23 +289,21 @@ impl SenderAccount { unaggregated_receipts_guard: unaggregated_receipts_guard.clone(), }); - let rav_requester_notify = Arc::new(Notify::new()); + let rav_trigger = RavTrigger::new(); let rav_requester_task = tokio::spawn({ let inner = inner.clone(); - let rav_requester_notify = rav_requester_notify.clone(); + let rav_trigger = rav_trigger.clone(); async move { - inner.rav_requester(rav_requester_notify).await; + inner.rav_requester(rav_trigger).await; } }); - let rav_requester_finalize_notify = Arc::new(Notify::new()); + let rav_finalize_trigger = RavTrigger::new(); let rav_requester_finalize_task = tokio::spawn({ let inner = inner.clone(); - let rav_requester_finalize_notify = rav_requester_finalize_notify.clone(); + let rav_finalize_trigger = rav_finalize_trigger.clone(); async move { - inner - .rav_requester_finalize(rav_requester_finalize_notify) - .await; + inner.rav_requester_finalize(rav_finalize_trigger).await; } }); @@ -287,9 +314,9 @@ impl SenderAccount { escrow_adapter, tap_eip712_domain_separator, rav_requester_task, - rav_requester_notify, + rav_trigger, rav_requester_finalize_task, - rav_requester_finalize_notify, + rav_finalize_trigger, unaggregated_receipts_guard, } } @@ -316,7 +343,7 @@ impl SenderAccount { } if allocations_to_finalize { - self.rav_requester_finalize_notify.notify_waiters(); + self.rav_finalize_trigger.trigger_rav(); } } @@ -384,7 +411,7 @@ impl SenderAccount { // Check if we need to trigger a RAV request. if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into() { - self.rav_requester_notify.notify_waiters(); + self.rav_trigger.trigger_rav(); } } } else {