From f9edeea39d06c6c5c53086ddbffd1289c4bbacf1 Mon Sep 17 00:00:00 2001 From: Marcel Date: Mon, 20 May 2024 00:17:38 +0200 Subject: [PATCH] refactoring; clippy --- src/bird.rs | 40 +++--- src/lists.rs | 19 --- src/peers.rs | 30 +---- src/routes/bird.rs | 3 - src/stats.rs | 291 ------------------------------------------- src/stats/as112.rs | 107 ++++++++++++++++ src/stats/mod.rs | 146 ++++++++++++++++++++++ src/stats/traffic.rs | 106 ++++++++++++++++ 8 files changed, 384 insertions(+), 358 deletions(-) delete mode 100644 src/stats.rs create mode 100644 src/stats/as112.rs create mode 100644 src/stats/mod.rs create mode 100644 src/stats/traffic.rs diff --git a/src/bird.rs b/src/bird.rs index 03ace4e..734d38c 100644 --- a/src/bird.rs +++ b/src/bird.rs @@ -1,39 +1,39 @@ +use axum::async_trait; +use select::predicate::Name; use std::path::{Path, PathBuf}; use std::sync::Arc; -use select::predicate::Name; -use time::{Duration, OffsetDateTime}; -use tokio::sync::{Mutex, RwLock}; +use crate::cache::{Cache, Updater}; -const MAX_AGE: Duration = Duration::minutes(10); +struct BirdUpdater { + path: PathBuf, +} #[derive(Clone)] pub(crate) struct Bird { - path: PathBuf, - next_update: Arc>, - content: Arc>>, + content: Arc>, +} + +#[async_trait] +impl Updater for BirdUpdater { + type Output = String; + type Error = anyhow::Error; + + async fn update(&self) -> Result { + let content = load(&self.path).await?; + Ok(content) + } } impl Bird { pub(crate) async fn new(path: PathBuf) -> anyhow::Result { - let content = load(&path).await?; Ok(Self { - path, - next_update: Arc::new(Mutex::new(OffsetDateTime::now_utc())), - content: Arc::new(RwLock::new(Arc::new(content))), + content: Arc::new(Cache::new(BirdUpdater { path })), }) } pub(crate) async fn content(&self) -> anyhow::Result> { - { - let mut lock = self.next_update.lock().await; - if OffsetDateTime::now_utc() > *lock { - *self.content.write().await = Arc::new(load(&self.path).await?); - *lock = OffsetDateTime::now_utc(); - } - } - - Ok(self.content.read().await.clone()) + self.content.get().await } } diff --git a/src/lists.rs b/src/lists.rs index 0f90d75..68082ae 100644 --- a/src/lists.rs +++ b/src/lists.rs @@ -50,14 +50,6 @@ pub(crate) struct ListmonkUserCreateResponse { data: ListmonkUser, } -#[derive(Debug, Clone, Serialize)] -pub(crate) struct ListmonkAddSubscribers { - ids: Vec, - action: String, - target_list_ids: Vec, - status: String, -} - #[derive(Debug, Clone)] pub enum MailingListsError { InvalidMailingListId, @@ -100,17 +92,6 @@ impl ListmonkCreateSubscriber { } } -impl ListmonkAddSubscribers { - fn load(subscriber: i32, desired_list: i32) -> ListmonkAddSubscribers { - ListmonkAddSubscribers { - ids: vec![subscriber], - action: "add".to_string(), - target_list_ids: vec![desired_list], - status: "unconfirmed".to_string(), - } - } -} - impl MailingLists { pub async fn load( url: &Url, diff --git a/src/peers.rs b/src/peers.rs index 4a52b60..e9cba56 100644 --- a/src/peers.rs +++ b/src/peers.rs @@ -1,15 +1,10 @@ use std::collections::HashMap; -use std::ops::Deref; -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; +use std::path::Path; use std::sync::Arc; use reqwest::Client; use serde::{Deserialize, Serialize}; -use time::{Duration, OffsetDateTime}; -use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::info; use url::Url; use crate::cache::Cache; @@ -17,30 +12,16 @@ use crate::cache::Updater; // https://github.com/euro-ix/json-schemas/wiki/Schema-Field-Entries-Members#schema-field-entries---members -const MAX_AGE: Duration = Duration::hours(1); - #[derive(Deserialize, PartialEq, Clone)] enum EuroIXMemberType { #[serde(rename = "peering")] Peering, #[serde(rename = "ixp")] - IXP, + Ixp, #[serde(rename = "other")] Other, } -#[derive(Deserialize, Serialize, Clone)] -enum PeeringPolicy { - #[serde(rename = "open")] - Open, - #[serde(rename = "selective")] - Selective, - #[serde(rename = "case-by-case")] - CaseByCase, - #[serde(rename = "mandatory")] - Mandatory, -} - #[derive(Deserialize, Clone)] struct EuroIXIfList { if_speed: u64, @@ -64,7 +45,6 @@ struct EuroIXMemberScheme { member_type: EuroIXMemberType, name: String, url: Url, - peering_policy: PeeringPolicy, connection_list: Vec, } @@ -132,7 +112,7 @@ impl Updater for PeersUpdater { let mut peers: Vec = api_result .member_list .into_iter() - .filter(|peer| peer.member_type != EuroIXMemberType::IXP) + .filter(|peer| peer.member_type != EuroIXMemberType::Ixp) .map(|value| { let is_supporter = self.yaml_file.supporting_peers.contains(&value.asnum); let mut does_v4 = false; @@ -197,7 +177,7 @@ impl Updater for PeersUpdater { } impl NetworkService { - pub(crate) async fn new(base_path: &PathBuf, ixp_manager_url: Url) -> anyhow::Result { + pub(crate) async fn new(base_path: &Path, ixp_manager_url: Url) -> anyhow::Result { let serialized_supporter = tokio::fs::read_to_string(base_path.join("supporter.yaml")).await?; let yaml_file = serde_yaml::from_str(&serialized_supporter)?; diff --git a/src/routes/bird.rs b/src/routes/bird.rs index f214440..32fb80d 100644 --- a/src/routes/bird.rs +++ b/src/routes/bird.rs @@ -1,9 +1,6 @@ -use std::sync::Arc; - use axum::extract::State; use axum::http::StatusCode; use axum::response::Html; -use axum::Json; use tracing::error; use crate::state::FoundationState; diff --git a/src/stats.rs b/src/stats.rs deleted file mode 100644 index 720b2c3..0000000 --- a/src/stats.rs +++ /dev/null @@ -1,291 +0,0 @@ -use std::str::FromStr; -use std::sync::Arc; - -use anyhow::anyhow; -use reqwest::Client; -use serde::{Deserialize, Serialize}; -use time::{Duration, OffsetDateTime}; -use url::Url; - -use crate::cache::{Cache, Updater}; - -#[derive(Deserialize)] -#[serde(rename_all = "snake_case")] -pub(crate) enum TimeSelection { - LastTwoDays, - LastWeek, - LastMonth, - LastThreeMonths, - LastYear, -} - -impl From for Duration { - fn from(value: TimeSelection) -> Self { - match value { - TimeSelection::LastTwoDays => Duration::days(2), - TimeSelection::LastWeek => Duration::weeks(1), - TimeSelection::LastMonth => Duration::days(30), - TimeSelection::LastThreeMonths => Duration::days(90), - TimeSelection::LastYear => Duration::days(365), - } - } -} - -struct TimeSelectionStore { - two_days: T, - week: T, - month: T, - three_months: T, - year: T, -} - -impl TimeSelectionStore { - pub(crate) fn get(&self, selection: TimeSelection) -> &T { - match selection { - TimeSelection::LastTwoDays => &self.two_days, - TimeSelection::LastWeek => &self.week, - TimeSelection::LastMonth => &self.month, - TimeSelection::LastThreeMonths => &self.three_months, - TimeSelection::LastYear => &self.year, - } - } -} - -#[derive(Serialize)] -struct PrometheusQuery { - query: String, - #[serde(with = "time::serde::rfc3339")] - start: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - end: OffsetDateTime, - step: f64, -} - -#[derive(Deserialize)] -struct PrometheusResponse { - data: PrometheusData, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct PrometheusData { - result: Vec, -} - -#[derive(Deserialize)] -struct PrometheusMetrics { - values: Vec<(f64, String)>, -} - -#[derive(Serialize)] -pub(crate) struct Series { - #[serde(with = "time::serde::rfc3339")] - start: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - end: OffsetDateTime, - data: Vec<(f64, f64)>, -} - -#[derive(Clone)] -pub(crate) struct Stats { - traffic: Arc>>, - as112: Arc>>, -} - -struct TrafficUpdater { - client: Client, - prometheus_url: Url, - selection: Duration, -} - -#[async_trait::async_trait] -impl Updater for TrafficUpdater { - type Output = Series; - type Error = anyhow::Error; - - async fn update(&self) -> Result { - let now = OffsetDateTime::now_utc(); - - let data = self - .query_stats(OffsetDateTime::now_utc() - self.selection, now, 255.0) - .await?; - - Ok(data) - } -} - -impl TrafficUpdater { - async fn query_stats( - &self, - start: OffsetDateTime, - end: OffsetDateTime, - points: f64, - ) -> anyhow::Result { - let query = PrometheusQuery { - query: "sum(rate(sflow_router_bytes[5m]))*8".to_string(), - start, - end, - step: ((end - start) / points).as_seconds_f64(), - }; - - Ok(Series { - start, - end, - data: self - .client - .get(self.prometheus_url.join("/api/v1/query_range")?) - .query(&query) - .send() - .await? - .error_for_status()? - .json::() - .await? - .data - .result - .into_iter() - .find(|_| true) - .ok_or_else(|| anyhow!("unexpected prometheus response"))? - .values - .into_iter() - .map(|(time, value)| (time, f64::from_str(&value).unwrap())) - .collect::>(), - }) - } -} - -struct As112Updater { - client: Client, - prometheus_url: Url, - selection: Duration, -} - -#[async_trait::async_trait] -impl Updater for As112Updater { - type Output = Series; - type Error = anyhow::Error; - - async fn update(&self) -> Result { - let now = OffsetDateTime::now_utc(); - - let data = self - .query_stats(OffsetDateTime::now_utc() - self.selection, now, 255.0) - .await?; - - Ok(data) - } -} -impl As112Updater { - async fn query_stats( - &self, - start: OffsetDateTime, - end: OffsetDateTime, - points: f64, - ) -> anyhow::Result { - let query = PrometheusQuery { - query: "sum(rate(knot_query_type_total[5m]))".to_string(), - start, - end, - step: ((end - start) / points).as_seconds_f64(), - }; - - Ok(Series { - start, - end, - data: self - .client - .get(self.prometheus_url.join("/api/v1/query_range")?) - .query(&query) - .send() - .await? - .error_for_status()? - .json::() - .await? - .data - .result - .into_iter() - .find(|_| true) - .ok_or_else(|| anyhow!("unexpected prometheus response"))? - .values - .into_iter() - .map(|(time, value)| (time, f64::from_str(&value).unwrap())) - .collect::>(), - }) - } -} - -impl Stats { - pub(crate) fn new(prometheus_url: Url) -> Self { - let client = Client::new(); - - Self { - traffic: Arc::new(TimeSelectionStore { - two_days: Cache::new(TrafficUpdater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastTwoDays.into(), - }), - week: Cache::new(TrafficUpdater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastWeek.into(), - }), - month: Cache::new(TrafficUpdater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastMonth.into(), - }), - three_months: Cache::new(TrafficUpdater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastThreeMonths.into(), - }), - year: Cache::new(TrafficUpdater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastYear.into(), - }), - }), - as112: Arc::new(TimeSelectionStore { - two_days: Cache::new(As112Updater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastTwoDays.into(), - }), - week: Cache::new(As112Updater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastWeek.into(), - }), - month: Cache::new(As112Updater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastMonth.into(), - }), - three_months: Cache::new(As112Updater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastThreeMonths.into(), - }), - year: Cache::new(As112Updater { - client: client.clone(), - prometheus_url: prometheus_url.clone(), - selection: TimeSelection::LastYear.into(), - }), - }), - } - } - - pub(crate) async fn get_traffic_stats( - &self, - selection: TimeSelection, - ) -> anyhow::Result> { - self.traffic.get(selection).get().await - } - - pub(crate) async fn get_as112_stats( - &self, - selection: TimeSelection, - ) -> anyhow::Result> { - self.as112.get(selection).get().await - } -} diff --git a/src/stats/as112.rs b/src/stats/as112.rs new file mode 100644 index 0000000..e00ba7b --- /dev/null +++ b/src/stats/as112.rs @@ -0,0 +1,107 @@ +// sum by(type) (increase(knot_query_type_total[$__range])) + +use std::str::FromStr; + +use anyhow::anyhow; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use time::{Duration, OffsetDateTime}; +use url::Url; + +use crate::cache::Updater; + +use super::{Series, TimeSelection}; + +#[derive(Serialize)] +struct PrometheusQuery { + query: String, + #[serde(with = "time::serde::rfc3339")] + start: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + end: OffsetDateTime, + step: f64, +} + +#[derive(Deserialize)] +struct PrometheusResponse { + data: PrometheusData, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct PrometheusData { + result: Vec, +} + +#[derive(Deserialize)] +struct PrometheusMetrics { + values: Vec<(f64, String)>, +} + +pub(super) struct As112Updater { + client: Client, + prometheus_url: Url, + selection: Duration, +} + +#[async_trait::async_trait] +impl Updater for As112Updater { + type Output = Series; + type Error = anyhow::Error; + + async fn update(&self) -> Result { + let now = OffsetDateTime::now_utc(); + + let data = self + .query_stats(OffsetDateTime::now_utc() - self.selection, now, 255.0) + .await?; + + Ok(data) + } +} +impl As112Updater { + pub(super) fn new(client: Client, prometheus_url: Url, selection: TimeSelection) -> Self { + Self { + client, + prometheus_url, + selection: selection.into(), + } + } + + async fn query_stats( + &self, + start: OffsetDateTime, + end: OffsetDateTime, + points: f64, + ) -> anyhow::Result { + let query = PrometheusQuery { + query: "sum(rate(knot_query_type_total[5m]))".to_string(), + start, + end, + step: ((end - start) / points).as_seconds_f64(), + }; + + Ok(Series { + start, + end, + data: self + .client + .get(self.prometheus_url.join("/api/v1/query_range")?) + .query(&query) + .send() + .await? + .error_for_status()? + .json::() + .await? + .data + .result + .into_iter() + .find(|_| true) + .ok_or_else(|| anyhow!("unexpected prometheus response"))? + .values + .into_iter() + .map(|(time, value)| (time, f64::from_str(&value).unwrap())) + .collect::>(), + }) + } +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs new file mode 100644 index 0000000..6fcccb3 --- /dev/null +++ b/src/stats/mod.rs @@ -0,0 +1,146 @@ +mod as112; +mod traffic; + +use std::sync::Arc; + +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use time::{Duration, OffsetDateTime}; +use url::Url; + +use crate::cache::Cache; + +use self::{as112::As112Updater, traffic::TrafficUpdater}; + +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +pub(crate) enum TimeSelection { + TwoDays, + Week, + Month, + ThreeMonths, + Year, +} + +impl From for Duration { + fn from(value: TimeSelection) -> Self { + match value { + TimeSelection::TwoDays => Duration::days(2), + TimeSelection::Week => Duration::weeks(1), + TimeSelection::Month => Duration::days(30), + TimeSelection::ThreeMonths => Duration::days(90), + TimeSelection::Year => Duration::days(365), + } + } +} + +struct TimeSelectionStore { + two_days: T, + week: T, + month: T, + three_months: T, + year: T, +} + +#[derive(Serialize)] +pub(crate) struct Series { + #[serde(with = "time::serde::rfc3339")] + start: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + end: OffsetDateTime, + data: Vec<(f64, f64)>, +} + +impl TimeSelectionStore { + pub(crate) fn get(&self, selection: TimeSelection) -> &T { + match selection { + TimeSelection::TwoDays => &self.two_days, + TimeSelection::Week => &self.week, + TimeSelection::Month => &self.month, + TimeSelection::ThreeMonths => &self.three_months, + TimeSelection::Year => &self.year, + } + } +} + +#[derive(Clone)] +pub(crate) struct Stats { + traffic: Arc>>, + as112: Arc>>, +} +impl Stats { + pub(crate) fn new(prometheus_url: Url) -> Self { + let client = Client::new(); + + Self { + traffic: Arc::new(TimeSelectionStore { + two_days: Cache::new(TrafficUpdater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::TwoDays, + )), + week: Cache::new(TrafficUpdater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Week, + )), + month: Cache::new(TrafficUpdater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Month, + )), + three_months: Cache::new(TrafficUpdater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::ThreeMonths, + )), + year: Cache::new(TrafficUpdater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Year, + )), + }), + as112: Arc::new(TimeSelectionStore { + two_days: Cache::new(As112Updater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::TwoDays, + )), + week: Cache::new(As112Updater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Week, + )), + month: Cache::new(As112Updater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Month, + )), + three_months: Cache::new(As112Updater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::ThreeMonths, + )), + year: Cache::new(As112Updater::new( + client.clone(), + prometheus_url.clone(), + TimeSelection::Year, + )), + }), + } + } + + pub(crate) async fn get_traffic_stats( + &self, + selection: TimeSelection, + ) -> anyhow::Result> { + self.traffic.get(selection).get().await + } + + pub(crate) async fn get_as112_stats( + &self, + selection: TimeSelection, + ) -> anyhow::Result> { + self.as112.get(selection).get().await + } +} diff --git a/src/stats/traffic.rs b/src/stats/traffic.rs new file mode 100644 index 0000000..9505c6d --- /dev/null +++ b/src/stats/traffic.rs @@ -0,0 +1,106 @@ +use std::str::FromStr; + +use anyhow::anyhow; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use time::{Duration, OffsetDateTime}; +use url::Url; + +use crate::cache::Updater; + +use super::{Series, TimeSelection}; + +#[derive(Serialize)] +struct PrometheusQuery { + query: String, + #[serde(with = "time::serde::rfc3339")] + start: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + end: OffsetDateTime, + step: f64, +} + +#[derive(Deserialize)] +struct PrometheusResponse { + data: PrometheusData, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct PrometheusData { + result: Vec, +} + +#[derive(Deserialize)] +struct PrometheusMetrics { + values: Vec<(f64, String)>, +} + +pub(super) struct TrafficUpdater { + client: Client, + prometheus_url: Url, + selection: Duration, +} + +#[async_trait::async_trait] +impl Updater for TrafficUpdater { + type Output = Series; + type Error = anyhow::Error; + + async fn update(&self) -> Result { + let now = OffsetDateTime::now_utc(); + + let data = self + .query_stats(OffsetDateTime::now_utc() - self.selection, now, 255.0) + .await?; + + Ok(data) + } +} + +impl TrafficUpdater { + pub(super) fn new(client: Client, prometheus_url: Url, selection: TimeSelection) -> Self { + Self { + client, + prometheus_url, + selection: selection.into(), + } + } + + async fn query_stats( + &self, + start: OffsetDateTime, + end: OffsetDateTime, + points: f64, + ) -> anyhow::Result { + let query = PrometheusQuery { + query: "sum(rate(sflow_router_bytes[5m]))*8".to_string(), + start, + end, + step: ((end - start) / points).as_seconds_f64(), + }; + + Ok(Series { + start, + end, + data: self + .client + .get(self.prometheus_url.join("/api/v1/query_range")?) + .query(&query) + .send() + .await? + .error_for_status()? + .json::() + .await? + .data + .result + .into_iter() + .find(|_| true) + .ok_or_else(|| anyhow!("unexpected prometheus response"))? + .values + .into_iter() + .map(|(time, value)| (time, f64::from_str(&value).unwrap())) + .collect::>(), + }) + } +}