Skip to content

Commit

Permalink
refactor: update deny list check for v2
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Feb 13, 2025
1 parent 7d912af commit 3d5bbd6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 24 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 89 additions & 24 deletions crates/service/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ use crate::{
tap::{CheckingReceipt, TapReceipt},
};

enum DenyListVersion {
V1,
V2,
}

pub struct DenyListCheck {
sender_denylist: Arc<RwLock<HashSet<Address>>>,
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
sender_denylist_v1: Arc<RwLock<HashSet<Address>>>,
sender_denylist_v2: Arc<RwLock<HashSet<Address>>>,
sender_denylist_watcher_cancel_token: tokio_util::sync::CancellationToken,

#[cfg(test)]
Expand All @@ -29,43 +34,65 @@ impl DenyListCheck {
pub async fn new(pgpool: PgPool) -> Self {
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
// don't miss any updates. PG will buffer the notifications until we start consuming them.
let mut pglistener = PgListener::connect_with(&pgpool.clone()).await.unwrap();
pglistener
let mut pglistener_v1 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
let mut pglistener_v2 = PgListener::connect_with(&pgpool.clone()).await.unwrap();
pglistener_v1
.listen("scalar_tap_deny_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'scalar_tap_deny_notification'",
);

pglistener_v2
.listen("tap_horizon_deny_notification")
.await
.expect(
"should be able to subscribe to Postgres Notify events on the channel \
'tap_horizon_deny_notification'",
);

// Fetch the denylist from the DB
let sender_denylist = Arc::new(RwLock::new(HashSet::new()));
Self::sender_denylist_reload(pgpool.clone(), sender_denylist.clone())
let sender_denylist_v1 = Arc::new(RwLock::new(HashSet::new()));
let sender_denylist_v2 = Arc::new(RwLock::new(HashSet::new()));
Self::sender_denylist_reload_v1(pgpool.clone(), sender_denylist_v1.clone())
.await
.expect("should be able to fetch the sender_denylist from the DB on startup");

#[cfg(test)]
let notify = std::sync::Arc::new(tokio::sync::Notify::new());

let sender_denylist_watcher_cancel_token = tokio_util::sync::CancellationToken::new();
let sender_denylist_watcher_handle = Arc::new(tokio::spawn(Self::sender_denylist_watcher(
tokio::spawn(Self::sender_denylist_watcher(
pgpool.clone(),
pglistener,
sender_denylist.clone(),
pglistener_v1,
sender_denylist_v1.clone(),
sender_denylist_watcher_cancel_token.clone(),
DenyListVersion::V1,
#[cfg(test)]
notify.clone(),
)));
));

tokio::spawn(Self::sender_denylist_watcher(
pgpool.clone(),
pglistener_v2,
sender_denylist_v1.clone(),
sender_denylist_watcher_cancel_token.clone(),
DenyListVersion::V2,
#[cfg(test)]
notify.clone(),
));

Self {
sender_denylist,
_sender_denylist_watcher_handle: sender_denylist_watcher_handle,
sender_denylist_v1,
sender_denylist_v2,
sender_denylist_watcher_cancel_token,
#[cfg(test)]
notify,
}
}

async fn sender_denylist_reload(
async fn sender_denylist_reload_v1(
pgpool: PgPool,
denylist_rwlock: Arc<RwLock<HashSet<Address>>>,
) -> anyhow::Result<()> {
Expand All @@ -86,11 +113,33 @@ impl DenyListCheck {
Ok(())
}

async fn sender_denylist_reload_v2(
pgpool: PgPool,
denylist_rwlock: Arc<RwLock<HashSet<Address>>>,
) -> anyhow::Result<()> {
// Fetch the denylist from the DB
let sender_denylist = sqlx::query!(
r#"
SELECT sender_address FROM tap_horizon_denylist
"#
)
.fetch_all(&pgpool)
.await?
.iter()
.map(|row| Address::from_str(&row.sender_address))
.collect::<Result<HashSet<_>, _>>()?;

*(denylist_rwlock.write().unwrap()) = sender_denylist;

Ok(())
}

async fn sender_denylist_watcher(
pgpool: PgPool,
mut pglistener: PgListener,
denylist: Arc<RwLock<HashSet<Address>>>,
cancel_token: tokio_util::sync::CancellationToken,
version: DenyListVersion,
#[cfg(test)] notify: std::sync::Arc<tokio::sync::Notify>,
) {
#[derive(serde::Deserialize)]
Expand Down Expand Up @@ -137,10 +186,14 @@ impl DenyListCheck {
denylist.",
denylist_notification.tg_op
);

Self::sender_denylist_reload(pgpool.clone(), denylist.clone())
.await
.expect("should be able to reload the sender denylist")
match version {
DenyListVersion::V1 => Self::sender_denylist_reload_v1(pgpool.clone(), denylist.clone())
.await
.expect("should be able to reload the sender denylist"),
DenyListVersion::V2 => Self::sender_denylist_reload_v2(pgpool.clone(), denylist.clone())
.await
.expect("should be able to reload the sender denylist"),
}
}
}
#[cfg(test)]
Expand All @@ -153,18 +206,30 @@ impl DenyListCheck {

#[async_trait::async_trait]
impl Check<TapReceipt> for DenyListCheck {
async fn check(&self, ctx: &tap_core::receipt::Context, _: &CheckingReceipt) -> CheckResult {
async fn check(
&self,
ctx: &tap_core::receipt::Context,
receipt: &CheckingReceipt,
) -> CheckResult {
let Sender(receipt_sender) = ctx
.get::<Sender>()
.ok_or(CheckError::Failed(anyhow::anyhow!("Could not find sender")))?;

let denied = match receipt.signed_receipt() {
TapReceipt::V1(_) => self
.sender_denylist_v1
.read()
.unwrap()
.contains(receipt_sender),
TapReceipt::V2(_) => self
.sender_denylist_v2
.read()
.unwrap()
.contains(receipt_sender),
};

// Check that the sender is not denylisted
if self
.sender_denylist
.read()
.unwrap()
.contains(receipt_sender)
{
if denied {
return Err(CheckError::Failed(anyhow::anyhow!(
"Received a receipt from a denylisted sender: {}",
receipt_sender
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
-- Add down migration script here
DROP TRIGGER IF EXISTS deny_update ON tap_horizon_deny CASCADE;

DROP FUNCTION IF EXISTS tap_horizon_deny_notify() CASCADE;

DROP TABLE IF EXISTS tap_horizon_denylist CASCADE;
22 changes: 22 additions & 0 deletions migrations/20250212211337_tap_horizon_sender_denylist.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,25 @@
CREATE TABLE IF NOT EXISTS tap_horizon_denylist (
sender_address CHAR(40) PRIMARY KEY
);


CREATE FUNCTION tap_horizon_deny_notify()
RETURNS trigger AS
$$
BEGIN
IF TG_OP = 'DELETE' THEN
PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "DELETE", "sender_address": "%s"}', OLD.sender_address));
RETURN OLD;
ELSIF TG_OP = 'INSERT' THEN
PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "INSERT", "sender_address": "%s"}', NEW.sender_address));
RETURN NEW;
ELSE -- UPDATE OR TRUNCATE, should never happen
PERFORM pg_notify('tap_horizon_deny_notification', format('{"tg_op": "%s", "sender_address": null}', TG_OP, NEW.sender_address));
RETURN NEW;
END IF;
END;
$$ LANGUAGE 'plpgsql';

CREATE TRIGGER deny_update AFTER INSERT OR UPDATE OR DELETE
ON tap_horizon_denylist
FOR EACH ROW EXECUTE PROCEDURE tap_horizon_deny_notify();

0 comments on commit 3d5bbd6

Please sign in to comment.