Skip to content

Commit

Permalink
stats: add as112
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcelCoding committed May 11, 2024
1 parent 0208e54 commit 5e83d99
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
7 changes: 5 additions & 2 deletions src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::routes::documents::list_documents;
use crate::routes::lists::add_subscriber;
use crate::routes::news::{find_keywords, find_post, list_posts};
use crate::routes::peers::get_peers_and_supporter;
use crate::routes::stats::get_stats;
use crate::routes::team::get_team;
use crate::routes::text_blocks::find_text_block;
use crate::state::FoundationState;

use self::stats::{get_as112_stats, get_traffic_stats};

mod bird;
mod documents;
mod lists;
Expand Down Expand Up @@ -49,7 +50,9 @@ pub(crate) fn route(content_paths: &ContentPaths) -> Router<FoundationState> {
.route("/team/:lang", get(get_team))
.nest_service("/team/assets", ServeDir::new(&content_paths.team))
.route("/mailing_lists/:list", post(add_subscriber))
.route("/stats", get(get_stats))
.route("/stats", get(get_traffic_stats))
.route("/stats/traffic", get(get_traffic_stats))
.route("/stats/as112", get(get_as112_stats))
.route("/peers", get(get_peers_and_supporter))
.route("/bird", get(get_bird))
}
18 changes: 15 additions & 3 deletions src/routes/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,25 @@ use tracing::error;
use crate::state::FoundationState;
use crate::stats::AggregatedStats;

pub(super) async fn get_stats(
pub(super) async fn get_traffic_stats(
State(state): State<FoundationState>,
) -> Result<Json<Arc<AggregatedStats>>, StatusCode> {
match state.stats.get_stats().await {
match state.stats.get_traffic_stats().await {
Ok(stats) => Ok(Json(stats)),
Err(err) => {
error!("Error while querying stats: {:?}", err);
error!("Error while querying traffic stats: {:?}", err);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

pub(super) async fn get_as112_stats(
State(state): State<FoundationState>,
) -> Result<Json<Arc<AggregatedStats>>, StatusCode> {
match state.stats.get_as112_stats().await {
Ok(stats) => Ok(Json(stats)),
Err(err) => {
error!("Error while querying as112 stats: {:?}", err);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
Expand Down
80 changes: 79 additions & 1 deletion src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub(crate) struct AggregatedStats {
#[derive(Clone)]
pub(crate) struct Stats {
traffic: Arc<Cache<TrafficUpdater>>,
as112: Arc<Cache<As112Updater>>,
}

struct TrafficUpdater {
Expand Down Expand Up @@ -124,21 +125,98 @@ impl TrafficUpdater {
}
}

struct As112Updater {
client: Client,
prometheus_url: Url,
}

#[async_trait::async_trait]
impl Updater for As112Updater {
type Output = AggregatedStats;
type Error = anyhow::Error;

async fn update(&self) -> Result<Self::Output, Self::Error> {
let now = OffsetDateTime::now_utc();

let (two_days, seven_days, month) = try_join!(
self.query_stats(OffsetDateTime::now_utc() - Duration::days(2), now, 255.0),
self.query_stats(OffsetDateTime::now_utc() - Duration::days(7), now, 255.0),
self.query_stats(OffsetDateTime::now_utc() - Duration::days(30), now, 255.0),
)?;

let metrics = AggregatedStats {
two_days,
seven_days,
month,
};

Ok(metrics)
}
}
impl As112Updater {
async fn query_stats(
&self,
start: OffsetDateTime,
end: OffsetDateTime,
points: f64,
) -> anyhow::Result<Series> {
let query = PrometheusQuery {
query: "sum(rate(knot_query_type_total[5m]))".to_string(),
start,
end,
step: ((end - start) / points).as_seconds_f64(),
};

Ok(Series {
start,
end,
data: self
.client
.get(self.prometheus_url.join("/api/v1/query_range")?)
.query(&query)
.send()
.await?
.error_for_status()?
.json::<PrometheusResponse>()
.await?
.data
.result
.into_iter()
.find(|_| true)
.ok_or_else(|| anyhow!("unexpected prometheus response"))?
.values
.into_iter()
.map(|(time, value)| (time, f64::from_str(&value).unwrap()))
.collect::<Vec<_>>(),
})
}
}

impl Stats {
pub(crate) fn new(prometheus_url: Url) -> Self {
let client = Client::new();

let traffic_updater = TrafficUpdater {
client: client.clone(),
prometheus_url: prometheus_url.clone(),
};

let as112 = As112Updater {
client,
prometheus_url,
};

Self {
traffic: Arc::new(Cache::new(traffic_updater)),
as112: Arc::new(Cache::new(as112)),
}
}

pub(crate) async fn get_stats(&self) -> anyhow::Result<Arc<AggregatedStats>> {
pub(crate) async fn get_traffic_stats(&self) -> anyhow::Result<Arc<AggregatedStats>> {
self.traffic.get().await
}

pub(crate) async fn get_as112_stats(&self) -> anyhow::Result<Arc<AggregatedStats>> {
self.as112.get().await
}
}

0 comments on commit 5e83d99

Please sign in to comment.