From f87fbafba4460784b3373d12e4b5375b0b7a3d37 Mon Sep 17 00:00:00 2001 From: Landon Gingerich Date: Thu, 27 Feb 2025 08:37:57 -0600 Subject: [PATCH 1/3] feat(bigquery): change from batch count limit to data size limit --- src/storage/bigquery/mod.rs | 104 +++++++++++++++++++++++++++++++++--- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index 7213354..3a56494 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -218,11 +218,103 @@ pub async fn insert_data( return Ok(()); } - const BATCH_SIZE: usize = 100_000; + // BigQuery has a 10MB payload size limit + const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit) let total_rows = data.len(); + + let mut current_batch = Vec::new(); + let mut current_size = 0; + let mut batches_sent = 0; + + for item in data { + // Estimate the size of this item + let item_json = serde_json::to_string(&item)?; + let item_size = item_json.len(); + + // If adding this item would exceed our size limit, send the current batch + if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() { + // Send the current batch + let rows = current_batch + .iter() + .map(|item| { + // Generate an appropriate insertId based on the table type and data content + let insert_id = generate_insert_id(table_id, item, block_number); + + TableRow { + insert_id: Some(insert_id), + json: item, + } + }) + .collect(); + + let request = InsertAllRequest { + skip_invalid_rows: Some(true), + ignore_unknown_values: Some(true), + template_suffix: None, + rows, + trace_id: None, + }; - for chunk in data.chunks(BATCH_SIZE) { - let rows = chunk + retry( + || async { + match tabledata_client + .insert(project_id, chain_name, table_id, &request) + .await + { + Ok(response) => { + if let Some(errors) = response.insert_errors { + if !errors.is_empty() { + for error in errors { + error!("Row {} failed to insert", error.index); + for err_msg in error.errors { + error!("Error: {}", err_msg.message); + } + } + return Err(anyhow!("Some rows failed to insert")); + } + } + Ok(()) + } + Err(e) => { + match e { + BigQueryError::Response(resp) => { + error!("BigQuery API Error: {}", resp.message); + } + BigQueryError::HttpClient(e) => { + error!("HTTP Client error: {}", e); + } + BigQueryError::HttpMiddleware(e) => { + error!("HTTP Middleware error: {}", e); + } + BigQueryError::TokenSource(e) => { + error!("Token Source error: {}", e); + } + } + Err(anyhow!("Data insertion failed")) + } + } + }, + &RetryConfig::default(), + &format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent), + None, + ) + .await?; + + batches_sent += 1; + + // Reset for next batch + current_batch = Vec::new(); + current_size = 0; + } + + // Add item to the current batch + current_batch.push(item); + current_size += item_size; + } + + // Send any remaining items + if !current_batch.is_empty() { + let rows = current_batch .iter() .map(|item| { // Generate an appropriate insertId based on the table type and data content @@ -283,15 +375,15 @@ pub async fn insert_data( } }, &RetryConfig::default(), - &format!("insert_data_{}_{}_{}", chain_name, table_id, block_number), + &format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent), None, ) .await?; } info!( - "Successfully inserted {} rows into {}.{}.{} for block {}", - total_rows, project_id, chain_name, table_id, block_number + "Successfully inserted {} rows into {}.{}.{} for block {} in {} batches", + total_rows, project_id, chain_name, table_id, block_number, batches_sent + 1 ); Ok(()) From e31df116eb915b48d057df64a61ca128751c6e3a Mon Sep 17 00:00:00 2001 From: Landon Gingerich Date: Thu, 27 Feb 2025 08:58:27 -0600 Subject: [PATCH 2/3] test: remove unused code --- src/utils/rate_limiter.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utils/rate_limiter.rs b/src/utils/rate_limiter.rs index ea7fe1d..9a5592e 100644 --- a/src/utils/rate_limiter.rs +++ b/src/utils/rate_limiter.rs @@ -229,7 +229,6 @@ impl Drop for RateLimitPermit<'_> { #[cfg(test)] mod tests { use super::*; - use tokio::time::sleep; #[tokio::test] async fn test_rate_limiter_basic() { From 15b39aa0138c383f36056978f6ff5aa029c908fc Mon Sep 17 00:00:00 2001 From: Landon Gingerich Date: Thu, 27 Feb 2025 08:58:44 -0600 Subject: [PATCH 3/3] chore: run cargo fmt --- src/storage/bigquery/mod.rs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index 3a56494..99d48ba 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -5,7 +5,9 @@ use google_cloud_bigquery::client::{Client, ClientConfig}; use google_cloud_bigquery::http::dataset::{Dataset, DatasetReference}; use google_cloud_bigquery::http::error::Error as BigQueryError; use google_cloud_bigquery::http::job::query::QueryRequest; -use google_cloud_bigquery::http::table::{Table, TableReference, TimePartitioning, TimePartitionType}; +use google_cloud_bigquery::http::table::{ + Table, TableReference, TimePartitionType, TimePartitioning, +}; use google_cloud_bigquery::http::tabledata::{ insert_all::{InsertAllRequest, Row as TableRow}, list::Value, @@ -221,16 +223,16 @@ pub async fn insert_data( // BigQuery has a 10MB payload size limit const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit) let total_rows = data.len(); - + let mut current_batch = Vec::new(); let mut current_size = 0; let mut batches_sent = 0; - + for item in data { // Estimate the size of this item let item_json = serde_json::to_string(&item)?; let item_size = item_json.len(); - + // If adding this item would exceed our size limit, send the current batch if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() { // Send the current batch @@ -295,23 +297,26 @@ pub async fn insert_data( } }, &RetryConfig::default(), - &format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent), + &format!( + "insert_data_{}_{}_{}_{}", + chain_name, table_id, block_number, batches_sent + ), None, ) .await?; - + batches_sent += 1; - + // Reset for next batch current_batch = Vec::new(); current_size = 0; } - + // Add item to the current batch current_batch.push(item); current_size += item_size; } - + // Send any remaining items if !current_batch.is_empty() { let rows = current_batch @@ -375,7 +380,10 @@ pub async fn insert_data( } }, &RetryConfig::default(), - &format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent), + &format!( + "insert_data_{}_{}_{}_{}", + chain_name, table_id, block_number, batches_sent + ), None, ) .await?; @@ -383,7 +391,12 @@ pub async fn insert_data( info!( "Successfully inserted {} rows into {}.{}.{} for block {} in {} batches", - total_rows, project_id, chain_name, table_id, block_number, batches_sent + 1 + total_rows, + project_id, + chain_name, + table_id, + block_number, + batches_sent + 1 ); Ok(())