Skip to content

Commit

Permalink
refactor: move metrics to new file and merge imports
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Nov 27, 2024
1 parent 828356f commit b6d6204
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 196 deletions.
1 change: 1 addition & 0 deletions crates/tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::agent::sender_accounts_manager::{
use crate::{database, CONFIG, EIP_712_DOMAIN};
use sender_accounts_manager::SenderAccountsManager;

mod metrics;
pub mod sender_account;
pub mod sender_accounts_manager;
pub mod sender_allocation;
Expand Down
84 changes: 84 additions & 0 deletions crates/tap-agent/src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use prometheus::{
register_counter_vec, register_gauge_vec, register_histogram_vec, register_int_gauge_vec,
CounterVec, GaugeVec, HistogramVec, IntGaugeVec,
};

lazy_static::lazy_static! {
pub static ref SENDER_DENIED: IntGaugeVec =
register_int_gauge_vec!("tap_sender_denied", "Sender is denied", &["sender"]).unwrap();
pub static ref ESCROW_BALANCE: GaugeVec = register_gauge_vec!(
"tap_sender_escrow_balance_grt_total",
"Sender escrow balance",
&["sender"]
)
.unwrap();
pub static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!(
"tap_unaggregated_fees_grt_total",
"Unggregated Fees value",
&["sender", "allocation"]
)
.unwrap();
pub static ref SENDER_FEE_TRACKER: GaugeVec = register_gauge_vec!(
"tap_sender_fee_tracker_grt_total",
"Sender fee tracker metric",
&["sender"]
)
.unwrap();
pub static ref INVALID_RECEIPT_FEES: GaugeVec = register_gauge_vec!(
"tap_invalid_receipt_fees_grt_total",
"Failed receipt fees",
&["sender", "allocation"]
)
.unwrap();
pub static ref PENDING_RAV: GaugeVec = register_gauge_vec!(
"tap_pending_rav_grt_total",
"Pending ravs values",
&["sender", "allocation"]
)
.unwrap();
pub static ref MAX_FEE_PER_SENDER: GaugeVec = register_gauge_vec!(
"tap_max_fee_per_sender_grt_total",
"Max fee per sender in the config",
&["sender"]
)
.unwrap();
pub static ref RAV_REQUEST_TRIGGER_VALUE: GaugeVec = register_gauge_vec!(
"tap_rav_request_trigger_value",
"RAV request trigger value divisor",
&["sender"]
)
.unwrap();
pub static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!(
"tap_receipts_received_total",
"Receipts received since start of the program.",
&["sender", "allocation"]
)
.unwrap();

// Allocation metrics
pub static ref CLOSED_SENDER_ALLOCATIONS: CounterVec = register_counter_vec!(
"tap_closed_sender_allocation_total",
"Count of sender-allocation managers closed since the start of the program",
&["sender"]
)
.unwrap();
pub static ref RAVS_CREATED: CounterVec = register_counter_vec!(
"tap_ravs_created_total",
"RAVs updated or created per sender allocation since the start of the program",
&["sender", "allocation"]
)
.unwrap();
pub static ref RAVS_FAILED: CounterVec = register_counter_vec!(
"tap_ravs_failed_total",
"RAV requests failed since the start of the program",
&["sender", "allocation"]
)
.unwrap();
pub static ref RAV_RESPONSE_TIME: HistogramVec = register_histogram_vec!(
"tap_rav_response_time_seconds",
"RAV response time per sender",
&["sender"]
)
.unwrap();

}
103 changes: 29 additions & 74 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
@@ -1,93 +1,48 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use alloy::hex::ToHexExt;
use alloy::primitives::U256;

use bigdecimal::num_bigint::ToBigInt;
use bigdecimal::ToPrimitive;

use alloy::{
dyn_abi::Eip712Domain,
hex::ToHexExt,
primitives::{Address, U256},
};
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
use futures::{stream, StreamExt};
use indexer_query::unfinalized_transactions;
use indexer_query::UnfinalizedTransactions;
use indexer_monitor::{EscrowAccounts, SubgraphClient};
use indexer_query::{unfinalized_transactions, UnfinalizedTransactions};
use indexer_watcher::watch_pipe;
use jsonrpsee::http_client::HttpClientBuilder;
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use reqwest::Url;
use state::State;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch::Receiver;

use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use indexer_monitor::{EscrowAccounts, SubgraphClient};
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use reqwest::Url;
use sqlx::PgPool;
use state::State;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use tap_core::rav::SignedRAV;
use tokio::sync::watch::Receiver;
use tracing::{error, warn, Level};

use crate::adaptative_concurrency::AdaptiveLimiter;
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
use crate::backoff::BackoffInfo;
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};
use lazy_static::lazy_static;
use crate::{
adaptative_concurrency::AdaptiveLimiter,
agent::{
metrics::{
ESCROW_BALANCE, INVALID_RECEIPT_FEES, MAX_FEE_PER_SENDER, PENDING_RAV,
RAV_REQUEST_TRIGGER_VALUE, SENDER_DENIED, SENDER_FEE_TRACKER, UNAGGREGATED_FEES,
},
sender_allocation::SenderAllocationMessage,
unaggregated_receipts::UnaggregatedReceipts,
},
backoff::BackoffInfo,
tracker::{SenderFeeTracker, SimpleFeeTracker},
};

// mod actor;
// mod config;
mod state;
#[cfg(test)]
pub mod tests;

lazy_static! {
static ref SENDER_DENIED: IntGaugeVec =
register_int_gauge_vec!("tap_sender_denied", "Sender is denied", &["sender"]).unwrap();
static ref ESCROW_BALANCE: GaugeVec = register_gauge_vec!(
"tap_sender_escrow_balance_grt_total",
"Sender escrow balance",
&["sender"]
)
.unwrap();
static ref UNAGGREGATED_FEES: GaugeVec = register_gauge_vec!(
"tap_unaggregated_fees_grt_total",
"Unggregated Fees value",
&["sender", "allocation"]
)
.unwrap();
static ref SENDER_FEE_TRACKER: GaugeVec = register_gauge_vec!(
"tap_sender_fee_tracker_grt_total",
"Sender fee tracker metric",
&["sender"]
)
.unwrap();
static ref INVALID_RECEIPT_FEES: GaugeVec = register_gauge_vec!(
"tap_invalid_receipt_fees_grt_total",
"Failed receipt fees",
&["sender", "allocation"]
)
.unwrap();
static ref PENDING_RAV: GaugeVec = register_gauge_vec!(
"tap_pending_rav_grt_total",
"Pending ravs values",
&["sender", "allocation"]
)
.unwrap();
static ref MAX_FEE_PER_SENDER: GaugeVec = register_gauge_vec!(
"tap_max_fee_per_sender_grt_total",
"Max fee per sender in the config",
&["sender"]
)
.unwrap();
static ref RAV_REQUEST_TRIGGER_VALUE: GaugeVec = register_gauge_vec!(
"tap_rav_request_trigger_value",
"RAV request trigger value divisor",
&["sender"]
)
.unwrap();
}

const INITIAL_RAV_REQUEST_CONCURRENT: usize = 1;

type RavMap = HashMap<Address, u128>;
Expand Down
48 changes: 25 additions & 23 deletions crates/tap-agent/src/agent/sender_account/state.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use alloy::hex::ToHexExt;
use alloy::primitives::U256;

use bigdecimal::ToPrimitive;

use indexer_query::closed_allocations::{self, ClosedAllocations};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::task::JoinHandle;

use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use alloy::{
dyn_abi::Eip712Domain,
hex::ToHexExt,
primitives::{Address, U256},
};
use anyhow::Result;
use bigdecimal::ToPrimitive;
use indexer_monitor::{EscrowAccounts, SubgraphClient};
use indexer_query::closed_allocations::{self, ClosedAllocations};
use ractor::{Actor, ActorRef, MessagingErr};
use sqlx::PgPool;
use std::{collections::HashSet, str::FromStr, time::Duration};
use tap_core::rav::SignedRAV;
use tokio::{sync::watch::Receiver, task::JoinHandle};
use tracing::error;

use crate::adaptative_concurrency::AdaptiveLimiter;
use crate::agent::sender_account::{SenderAccount, SENDER_DENIED};
use crate::agent::sender_allocation::{AllocationConfig, SenderAllocationMessage};
use crate::agent::sender_allocation::{SenderAllocation, SenderAllocationArgs};
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
use crate::backoff::BackoffInfo;
use crate::tracker::{SenderFeeTracker, SimpleFeeTracker};

use super::{SenderAccountConfig, SenderAccountMessage, PENDING_RAV, SENDER_FEE_TRACKER, UNAGGREGATED_FEES};
use crate::{
adaptative_concurrency::AdaptiveLimiter,
agent::{
sender_account::{SenderAccount, SENDER_DENIED},
sender_allocation::{
AllocationConfig, SenderAllocation, SenderAllocationArgs, SenderAllocationMessage,
},
unaggregated_receipts::UnaggregatedReceipts,
},
backoff::BackoffInfo,
tracker::{SenderFeeTracker, SimpleFeeTracker},
};

use super::{
SenderAccountConfig, SenderAccountMessage, PENDING_RAV, SENDER_FEE_TRACKER, UNAGGREGATED_FEES,
};

pub struct State {
pub(super) prefix: Option<String>,
Expand Down
41 changes: 23 additions & 18 deletions crates/tap-agent/src/agent/sender_account/tests.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@

use super::{SenderAccount, SenderAccountArgs, SenderAccountMessage};
use crate::agent::sender_account::ReceiptFees;
use crate::agent::sender_accounts_manager::NewReceiptNotification;
use crate::agent::sender_allocation::SenderAllocationMessage;
use crate::agent::unaggregated_receipts::UnaggregatedReceipts;
use crate::tap::test_utils::{
create_rav, store_rav_with_options, ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER, SIGNER,
TAP_EIP712_DOMAIN_SEPARATOR,
use crate::{
agent::{
sender_account::ReceiptFees, sender_accounts_manager::NewReceiptNotification,
sender_allocation::SenderAllocationMessage, unaggregated_receipts::UnaggregatedReceipts,
},
tap::test_utils::{
create_rav, store_rav_with_options, ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER,
SIGNER, TAP_EIP712_DOMAIN_SEPARATOR,
},
};
use alloy::{
hex::ToHexExt,
primitives::{Address, U256},
};
use alloy::hex::ToHexExt;
use alloy::primitives::{Address, U256};
use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient};
use ractor::concurrency::JoinHandle;
use ractor::{call, Actor, ActorProcessingErr, ActorRef, ActorStatus};
use ractor::{call, concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef, ActorStatus};
use reqwest::Url;
use serde_json::json;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{
collections::{HashMap, HashSet},
sync::{atomic::AtomicU32, Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::watch::{self, Sender};
use wiremock::matchers::{body_string_contains, method};
use wiremock::{Mock, MockGuard, MockServer, ResponseTemplate};
use wiremock::{
matchers::{body_string_contains, method},
Mock, MockGuard, MockServer, ResponseTemplate,
};

// we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare
impl Eq for SenderAccountMessage {}
Expand Down
11 changes: 0 additions & 11 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use futures::{stream, StreamExt};
use indexer_allocation::Allocation;
use indexer_monitor::{EscrowAccounts, SubgraphClient};
use indexer_watcher::watch_pipe;
use lazy_static::lazy_static;
use prometheus::{register_counter_vec, CounterVec};
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use receipt_watcher::new_receipts_watcher;
use reqwest::Url;
Expand All @@ -28,15 +26,6 @@ mod state;
#[cfg(test)]
mod tests;

lazy_static! {
static ref RECEIPTS_CREATED: CounterVec = register_counter_vec!(
"tap_receipts_received_total",
"Receipts received since start of the program.",
&["sender", "allocation"]
)
.unwrap();
}

#[derive(Deserialize, Debug, PartialEq, Eq)]
pub struct NewReceiptNotification {
pub id: u64,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::agent::{
sender_account::SenderAccountMessage, sender_accounts_manager::RECEIPTS_CREATED,
metrics::RECEIPTS_CREATED, sender_account::SenderAccountMessage,
sender_allocation::SenderAllocationMessage,
};
use anyhow::{anyhow, bail, Result};
Expand All @@ -11,7 +11,6 @@ use tracing::{error, warn};

use super::NewReceiptNotification;


/// Continuously listens for new receipt notifications from Postgres and forwards them to the
/// corresponding SenderAccount.
pub async fn new_receipts_watcher(
Expand Down
17 changes: 8 additions & 9 deletions crates/tap-agent/src/agent/sender_accounts_manager/state.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
// Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};

use crate::agent::sender_account::{SenderAccount, SenderAccountArgs, SenderAccountConfig};
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use anyhow::Result;
use anyhow::{anyhow, bail};
use alloy::{dyn_abi::Eip712Domain, primitives::Address};
use anyhow::{anyhow, bail, Result};
use indexer_monitor::{EscrowAccounts, SubgraphClient};
use ractor::concurrency::JoinHandle;
use ractor::{Actor, ActorCell};
use ractor::{concurrency::JoinHandle, Actor, ActorCell};
use reqwest::Url;
use sqlx::PgPool;
use tokio::sync::watch::Receiver;
Expand Down
Loading

0 comments on commit b6d6204

Please sign in to comment.