Skip to content

Commit

Permalink
feat(bigquery): change from batch count limit to data size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
lgingerich committed Feb 27, 2025
1 parent 855735d commit f87fbaf
Showing 1 changed file with 98 additions and 6 deletions.
104 changes: 98 additions & 6 deletions src/storage/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,103 @@ pub async fn insert_data<T: serde::Serialize>(
return Ok(());
}

const BATCH_SIZE: usize = 100_000;
// BigQuery has a 10MB payload size limit
const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit)
let total_rows = data.len();

let mut current_batch = Vec::new();
let mut current_size = 0;
let mut batches_sent = 0;

for item in data {
// Estimate the size of this item
let item_json = serde_json::to_string(&item)?;
let item_size = item_json.len();

// If adding this item would exceed our size limit, send the current batch
if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
// Send the current batch
let rows = current_batch
.iter()
.map(|item| {
// Generate an appropriate insertId based on the table type and data content
let insert_id = generate_insert_id(table_id, item, block_number);

TableRow {
insert_id: Some(insert_id),
json: item,
}
})
.collect();

let request = InsertAllRequest {
skip_invalid_rows: Some(true),
ignore_unknown_values: Some(true),
template_suffix: None,
rows,
trace_id: None,
};

for chunk in data.chunks(BATCH_SIZE) {
let rows = chunk
retry(
|| async {
match tabledata_client
.insert(project_id, chain_name, table_id, &request)
.await
{
Ok(response) => {
if let Some(errors) = response.insert_errors {
if !errors.is_empty() {
for error in errors {
error!("Row {} failed to insert", error.index);
for err_msg in error.errors {
error!("Error: {}", err_msg.message);
}
}
return Err(anyhow!("Some rows failed to insert"));
}
}
Ok(())
}
Err(e) => {
match e {
BigQueryError::Response(resp) => {
error!("BigQuery API Error: {}", resp.message);
}
BigQueryError::HttpClient(e) => {
error!("HTTP Client error: {}", e);
}
BigQueryError::HttpMiddleware(e) => {
error!("HTTP Middleware error: {}", e);
}
BigQueryError::TokenSource(e) => {
error!("Token Source error: {}", e);
}
}
Err(anyhow!("Data insertion failed"))
}
}
},
&RetryConfig::default(),
&format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent),
None,
)
.await?;

batches_sent += 1;

// Reset for next batch
current_batch = Vec::new();
current_size = 0;
}

// Add item to the current batch
current_batch.push(item);
current_size += item_size;
}

// Send any remaining items
if !current_batch.is_empty() {
let rows = current_batch
.iter()
.map(|item| {
// Generate an appropriate insertId based on the table type and data content
Expand Down Expand Up @@ -283,15 +375,15 @@ pub async fn insert_data<T: serde::Serialize>(
}
},
&RetryConfig::default(),
&format!("insert_data_{}_{}_{}", chain_name, table_id, block_number),
&format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent),
None,
)
.await?;
}

info!(
"Successfully inserted {} rows into {}.{}.{} for block {}",
total_rows, project_id, chain_name, table_id, block_number
"Successfully inserted {} rows into {}.{}.{} for block {} in {} batches",
total_rows, project_id, chain_name, table_id, block_number, batches_sent + 1
);

Ok(())
Expand Down

0 comments on commit f87fbaf

Please sign in to comment.