Skip to content

Commit

Permalink
Merge pull request #22 from lgingerich/release/v0.1.5
Browse files Browse the repository at this point in the history
Release/v0.1.5
  • Loading branch information
lgingerich authored Feb 27, 2025
2 parents 855735d + 15b39aa commit 110ab3c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 8 deletions.
119 changes: 112 additions & 7 deletions src/storage/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use google_cloud_bigquery::client::{Client, ClientConfig};
use google_cloud_bigquery::http::dataset::{Dataset, DatasetReference};
use google_cloud_bigquery::http::error::Error as BigQueryError;
use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::http::table::{Table, TableReference, TimePartitioning, TimePartitionType};
use google_cloud_bigquery::http::table::{
Table, TableReference, TimePartitionType, TimePartitioning,
};
use google_cloud_bigquery::http::tabledata::{
insert_all::{InsertAllRequest, Row as TableRow},
list::Value,
Expand Down Expand Up @@ -218,11 +220,106 @@ pub async fn insert_data<T: serde::Serialize>(
return Ok(());
}

const BATCH_SIZE: usize = 100_000;
// BigQuery has a 10MB payload size limit
const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit)
let total_rows = data.len();

for chunk in data.chunks(BATCH_SIZE) {
let rows = chunk
let mut current_batch = Vec::new();
let mut current_size = 0;
let mut batches_sent = 0;

for item in data {
// Estimate the size of this item
let item_json = serde_json::to_string(&item)?;
let item_size = item_json.len();

// If adding this item would exceed our size limit, send the current batch
if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
// Send the current batch
let rows = current_batch
.iter()
.map(|item| {
// Generate an appropriate insertId based on the table type and data content
let insert_id = generate_insert_id(table_id, item, block_number);

TableRow {
insert_id: Some(insert_id),
json: item,
}
})
.collect();

let request = InsertAllRequest {
skip_invalid_rows: Some(true),
ignore_unknown_values: Some(true),
template_suffix: None,
rows,
trace_id: None,
};

retry(
|| async {
match tabledata_client
.insert(project_id, chain_name, table_id, &request)
.await
{
Ok(response) => {
if let Some(errors) = response.insert_errors {
if !errors.is_empty() {
for error in errors {
error!("Row {} failed to insert", error.index);
for err_msg in error.errors {
error!("Error: {}", err_msg.message);
}
}
return Err(anyhow!("Some rows failed to insert"));
}
}
Ok(())
}
Err(e) => {
match e {
BigQueryError::Response(resp) => {
error!("BigQuery API Error: {}", resp.message);
}
BigQueryError::HttpClient(e) => {
error!("HTTP Client error: {}", e);
}
BigQueryError::HttpMiddleware(e) => {
error!("HTTP Middleware error: {}", e);
}
BigQueryError::TokenSource(e) => {
error!("Token Source error: {}", e);
}
}
Err(anyhow!("Data insertion failed"))
}
}
},
&RetryConfig::default(),
&format!(
"insert_data_{}_{}_{}_{}",
chain_name, table_id, block_number, batches_sent
),
None,
)
.await?;

batches_sent += 1;

// Reset for next batch
current_batch = Vec::new();
current_size = 0;
}

// Add item to the current batch
current_batch.push(item);
current_size += item_size;
}

// Send any remaining items
if !current_batch.is_empty() {
let rows = current_batch
.iter()
.map(|item| {
// Generate an appropriate insertId based on the table type and data content
Expand Down Expand Up @@ -283,15 +380,23 @@ pub async fn insert_data<T: serde::Serialize>(
}
},
&RetryConfig::default(),
&format!("insert_data_{}_{}_{}", chain_name, table_id, block_number),
&format!(
"insert_data_{}_{}_{}_{}",
chain_name, table_id, block_number, batches_sent
),
None,
)
.await?;
}

info!(
"Successfully inserted {} rows into {}.{}.{} for block {}",
total_rows, project_id, chain_name, table_id, block_number
"Successfully inserted {} rows into {}.{}.{} for block {} in {} batches",
total_rows,
project_id,
chain_name,
table_id,
block_number,
batches_sent + 1
);

Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/utils/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ impl Drop for RateLimitPermit<'_> {
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;

#[tokio::test]
async fn test_rate_limiter_basic() {
Expand Down

0 comments on commit 110ab3c

Please sign in to comment.