Skip to content

Commit

Permalink
refactor: implement get_pending_sender_allocation_id_v2
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Feb 11, 2025
1 parent e5c9c04 commit 1c753ef
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 7 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 104 additions & 4 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,104 @@ impl State {
///
/// This loads horizon allocations
async fn get_pending_sender_allocation_id_v2(&self) -> HashMap<Address, HashSet<AllocationId>> {
unimplemented!()
// First we accumulate all allocations for each sender. This is because we may have more
// than one signer per sender in DB.
let mut unfinalized_sender_allocations_map: HashMap<Address, HashSet<AllocationId>> =
HashMap::new();

let receipts_signer_allocations_in_db = sqlx::query!(
r#"
WITH grouped AS (
SELECT signer_address, allocation_id
FROM tap_horizon_receipts
GROUP BY signer_address, allocation_id
)
SELECT DISTINCT
signer_address,
(
SELECT ARRAY
(
SELECT DISTINCT allocation_id
FROM grouped
WHERE signer_address = top.signer_address
)
) AS allocation_ids
FROM grouped AS top
"#
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch pending receipts from the database");

for row in receipts_signer_allocations_in_db {
let allocation_ids = row
.allocation_ids
.expect("all receipts should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();
let signer_id = Address::from_str(&row.signer_address)
.expect("signer_address should be a valid address");
let sender_id = self
.escrow_accounts_v1
.borrow()
.get_sender_for_signer(&signer_id)
.expect("should be able to get sender from signer");

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}

let nonfinal_ravs_sender_allocations_in_db = sqlx::query!(
r#"
SELECT DISTINCT
sender_address,
(
SELECT ARRAY
(
SELECT DISTINCT allocation_id
FROM tap_horizon_ravs
WHERE sender_address = top.sender_address
AND NOT last
)
) AS allocation_id
FROM scalar_tap_ravs AS top
"#
)
.fetch_all(&self.pgpool)
.await
.expect("should be able to fetch unfinalized RAVs from the database");

for row in nonfinal_ravs_sender_allocations_in_db {
let allocation_ids = row
.allocation_id
.expect("all RAVs should have an allocation_id")
.iter()
.map(|allocation_id| {
AllocationId::Legacy(
Address::from_str(allocation_id)
.expect("allocation_id should be a valid address"),
)
})
.collect::<HashSet<_>>();
let sender_id = Address::from_str(&row.sender_address)
.expect("sender_address should be a valid address");

// Accumulate allocations for the sender
unfinalized_sender_allocations_map
.entry(sender_id)
.or_default()
.extend(allocation_ids);
}
unfinalized_sender_allocations_map
}

/// Helper function to create [SenderAccountArgs]
Expand Down Expand Up @@ -922,8 +1019,11 @@ mod tests {
flush_messages(&notify).await;

// verify if create sender account
let actor_ref =
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
"{}:legacy:{}",
prefix.clone(),
SENDER.1
));
assert!(actor_ref.is_some());

actor
Expand Down Expand Up @@ -959,7 +1059,7 @@ mod tests {
.unwrap();

let actor_ref =
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix, SENDER_2.1));
ActorRef::<SenderAccountMessage>::where_is(format!("{}:legacy:{}", prefix, SENDER_2.1));
assert!(actor_ref.is_some());
}

Expand Down
7 changes: 5 additions & 2 deletions crates/tap-agent/tests/sender_account_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ async fn sender_account_manager_layer_test(pgpool: PgPool) {
flush_messages(&notify).await;

// verify if create sender account
let sender_account_ref =
ActorRef::<SenderAccountMessage>::where_is(format!("{}:{}", prefix.clone(), SENDER.1));
let sender_account_ref = ActorRef::<SenderAccountMessage>::where_is(format!(
"{}:legacy:{}",
prefix.clone(),
SENDER.1
));
assert!(sender_account_ref.is_some());

let receipt = create_received_receipt(&ALLOCATION_ID_0, &SIGNER.0, 1, 1, TRIGGER_VALUE - 10);
Expand Down
2 changes: 1 addition & 1 deletion crates/tap-agent/tests/tap_agent_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn test_start_tap_agent(pgpool: PgPool) {
flush_messages(&notify).await;

// verify if create sender account
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("{}", TAP_SENDER.1));
let actor_ref = ActorRef::<SenderAccountMessage>::where_is(format!("legacy:{}", TAP_SENDER.1));

assert!(actor_ref.is_some());

Expand Down

0 comments on commit 1c753ef

Please sign in to comment.