diff --git a/src/storage/bigquery/mod.rs b/src/storage/bigquery/mod.rs index 7213354..99d48ba 100644 --- a/src/storage/bigquery/mod.rs +++ b/src/storage/bigquery/mod.rs @@ -5,7 +5,9 @@ use google_cloud_bigquery::client::{Client, ClientConfig}; use google_cloud_bigquery::http::dataset::{Dataset, DatasetReference}; use google_cloud_bigquery::http::error::Error as BigQueryError; use google_cloud_bigquery::http::job::query::QueryRequest; -use google_cloud_bigquery::http::table::{Table, TableReference, TimePartitioning, TimePartitionType}; +use google_cloud_bigquery::http::table::{ + Table, TableReference, TimePartitionType, TimePartitioning, +}; use google_cloud_bigquery::http::tabledata::{ insert_all::{InsertAllRequest, Row as TableRow}, list::Value, @@ -218,11 +220,106 @@ pub async fn insert_data( return Ok(()); } - const BATCH_SIZE: usize = 100_000; + // BigQuery has a 10MB payload size limit + const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit) let total_rows = data.len(); - for chunk in data.chunks(BATCH_SIZE) { - let rows = chunk + let mut current_batch = Vec::new(); + let mut current_size = 0; + let mut batches_sent = 0; + + for item in data { + // Estimate the size of this item + let item_json = serde_json::to_string(&item)?; + let item_size = item_json.len(); + + // If adding this item would exceed our size limit, send the current batch + if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() { + // Send the current batch + let rows = current_batch + .iter() + .map(|item| { + // Generate an appropriate insertId based on the table type and data content + let insert_id = generate_insert_id(table_id, item, block_number); + + TableRow { + insert_id: Some(insert_id), + json: item, + } + }) + .collect(); + + let request = InsertAllRequest { + skip_invalid_rows: Some(true), + ignore_unknown_values: Some(true), + template_suffix: None, + rows, + trace_id: None, + }; + + retry( + || async { + match tabledata_client + .insert(project_id, chain_name, table_id, &request) + .await + { + Ok(response) => { + if let Some(errors) = response.insert_errors { + if !errors.is_empty() { + for error in errors { + error!("Row {} failed to insert", error.index); + for err_msg in error.errors { + error!("Error: {}", err_msg.message); + } + } + return Err(anyhow!("Some rows failed to insert")); + } + } + Ok(()) + } + Err(e) => { + match e { + BigQueryError::Response(resp) => { + error!("BigQuery API Error: {}", resp.message); + } + BigQueryError::HttpClient(e) => { + error!("HTTP Client error: {}", e); + } + BigQueryError::HttpMiddleware(e) => { + error!("HTTP Middleware error: {}", e); + } + BigQueryError::TokenSource(e) => { + error!("Token Source error: {}", e); + } + } + Err(anyhow!("Data insertion failed")) + } + } + }, + &RetryConfig::default(), + &format!( + "insert_data_{}_{}_{}_{}", + chain_name, table_id, block_number, batches_sent + ), + None, + ) + .await?; + + batches_sent += 1; + + // Reset for next batch + current_batch = Vec::new(); + current_size = 0; + } + + // Add item to the current batch + current_batch.push(item); + current_size += item_size; + } + + // Send any remaining items + if !current_batch.is_empty() { + let rows = current_batch .iter() .map(|item| { // Generate an appropriate insertId based on the table type and data content @@ -283,15 +380,23 @@ pub async fn insert_data( } }, &RetryConfig::default(), - &format!("insert_data_{}_{}_{}", chain_name, table_id, block_number), + &format!( + "insert_data_{}_{}_{}_{}", + chain_name, table_id, block_number, batches_sent + ), None, ) .await?; } info!( - "Successfully inserted {} rows into {}.{}.{} for block {}", - total_rows, project_id, chain_name, table_id, block_number + "Successfully inserted {} rows into {}.{}.{} for block {} in {} batches", + total_rows, + project_id, + chain_name, + table_id, + block_number, + batches_sent + 1 ); Ok(()) diff --git a/src/utils/rate_limiter.rs b/src/utils/rate_limiter.rs index ea7fe1d..9a5592e 100644 --- a/src/utils/rate_limiter.rs +++ b/src/utils/rate_limiter.rs @@ -229,7 +229,6 @@ impl Drop for RateLimitPermit<'_> { #[cfg(test)] mod tests { use super::*; - use tokio::time::sleep; #[tokio::test] async fn test_rate_limiter_basic() {