Skip to content

Commit

Permalink
refactor: add actor recovery and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
gusinacio committed Apr 12, 2024
1 parent b85c1f0 commit 11f0d97
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 8 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
32 changes: 29 additions & 3 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,30 +246,56 @@ impl Actor for SenderAccount {
// is shutdown the supervisor on actor termination events
async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(cell, _, _)
| SupervisionEvent::ActorPanicked(cell, _) => {
SupervisionEvent::ActorTerminated(cell, _, _) => {
// what to do in case of termination or panic?
let sender_allocation = cell.get_name();
tracing::warn!(?sender_allocation, "Actor SenderAllocation was terminated");

let Some(allocation_id) = cell.get_name() else {
tracing::error!("SenderAllocation doesn't have a name");
return Ok(());
};
let Some(allocation_id) = allocation_id.split(':').last() else {
tracing::error!(%allocation_id, "Could not extract allocation_id from name");
return Ok(());
};
let Ok(allocation_id) = Address::parse_checksummed(allocation_id, None) else {
tracing::error!(%allocation_id, "Could not convert allocation_id to Address");
return Ok(());
};

let tracker = &mut state.allocation_id_tracker;
tracker.add_or_update(allocation_id, 0);
}
SupervisionEvent::ActorPanicked(cell, error) => {
let sender_allocation = cell.get_name();
tracing::warn!(
?sender_allocation,
?error,
"Actor SenderAllocation panicked. Restarting..."
);
let Some(allocation_id) = cell.get_name() else {
tracing::error!("SenderAllocation doesn't have a name");
return Ok(());
};
let Some(allocation_id) = allocation_id.split(':').last() else {
tracing::error!(%allocation_id, "Could not extract allocation_id from name");
return Ok(());
};
let Ok(allocation_id) = Address::parse_checksummed(allocation_id, None) else {
tracing::error!(%allocation_id, "Could not convert allocation_id to Address");
return Ok(());
};

state
.create_sender_allocation(myself.clone(), allocation_id)
.await?;
}
_ => {}
}
Ok(())
Expand Down
45 changes: 40 additions & 5 deletions tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,50 @@ impl Actor for SenderAccountsManager {
// is shutdown the supervisor on actor termination events
async fn handle_supervisor_evt(
&self,
_myself: ActorRef<Self::Msg>,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
_state: &mut Self::State,
state: &mut Self::State,
) -> std::result::Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(cell, _, _)
| SupervisionEvent::ActorPanicked(cell, _) => {
SupervisionEvent::ActorTerminated(cell, _, reason) => {
let sender_id = cell.get_name();
tracing::warn!(?sender_id, ?reason, "Actor SenderAccount was terminated")
}
SupervisionEvent::ActorPanicked(cell, error) => {
let sender_id = cell.get_name();
tracing::warn!(?sender_id, "Actor SenderAccount was terminated")
tracing::warn!(
?sender_id,
?error,
"Actor SenderAccount panicked. Restarting..."
);
let Some(sender_id) = cell.get_name() else {
tracing::error!("SenderAllocation doesn't have a name");
return Ok(());
};
let Some(sender_id) = sender_id.split(':').last() else {
tracing::error!(%sender_id, "Could not extract sender_id from name");
return Ok(());
};
let Ok(sender_id) = Address::parse_checksummed(sender_id, None) else {
tracing::error!(%sender_id, "Could not convert sender_id to Address");
return Ok(());
};

let mut sender_allocation = select! {
sender_allocation = state.get_pending_sender_allocation_id() => sender_allocation,
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
tracing::error!("Timeout while getting pending sender allocation ids");
return Ok(());
}
};

let allocations = sender_allocation
.remove(&sender_id)
.unwrap_or(HashSet::new());

state
.create_sender_account(myself.get_cell(), sender_id, allocations)
.await?;
}
_ => {}
}
Expand Down

0 comments on commit 11f0d97

Please sign in to comment.