Skip to content

Commit

Permalink
chore: run cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
lgingerich committed Feb 27, 2025
1 parent e31df11 commit 15b39aa
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions src/storage/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use google_cloud_bigquery::client::{Client, ClientConfig};
use google_cloud_bigquery::http::dataset::{Dataset, DatasetReference};
use google_cloud_bigquery::http::error::Error as BigQueryError;
use google_cloud_bigquery::http::job::query::QueryRequest;
use google_cloud_bigquery::http::table::{Table, TableReference, TimePartitioning, TimePartitionType};
use google_cloud_bigquery::http::table::{
Table, TableReference, TimePartitionType, TimePartitioning,
};
use google_cloud_bigquery::http::tabledata::{
insert_all::{InsertAllRequest, Row as TableRow},
list::Value,
Expand Down Expand Up @@ -221,16 +223,16 @@ pub async fn insert_data<T: serde::Serialize>(
// BigQuery has a 10MB payload size limit
const MAX_PAYLOAD_SIZE: usize = 9_000_000; // 9MB to be safe (under 10MB limit)
let total_rows = data.len();

let mut current_batch = Vec::new();
let mut current_size = 0;
let mut batches_sent = 0;

for item in data {
// Estimate the size of this item
let item_json = serde_json::to_string(&item)?;
let item_size = item_json.len();

// If adding this item would exceed our size limit, send the current batch
if current_size + item_size > MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
// Send the current batch
Expand Down Expand Up @@ -295,23 +297,26 @@ pub async fn insert_data<T: serde::Serialize>(
}
},
&RetryConfig::default(),
&format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent),
&format!(
"insert_data_{}_{}_{}_{}",
chain_name, table_id, block_number, batches_sent
),
None,
)
.await?;

batches_sent += 1;

// Reset for next batch
current_batch = Vec::new();
current_size = 0;
}

// Add item to the current batch
current_batch.push(item);
current_size += item_size;
}

// Send any remaining items
if !current_batch.is_empty() {
let rows = current_batch
Expand Down Expand Up @@ -375,15 +380,23 @@ pub async fn insert_data<T: serde::Serialize>(
}
},
&RetryConfig::default(),
&format!("insert_data_{}_{}_{}_{}", chain_name, table_id, block_number, batches_sent),
&format!(
"insert_data_{}_{}_{}_{}",
chain_name, table_id, block_number, batches_sent
),
None,
)
.await?;
}

info!(
"Successfully inserted {} rows into {}.{}.{} for block {} in {} batches",
total_rows, project_id, chain_name, table_id, block_number, batches_sent + 1
total_rows,
project_id,
chain_name,
table_id,
block_number,
batches_sent + 1
);

Ok(())
Expand Down

0 comments on commit 15b39aa

Please sign in to comment.