Skip to content

Commit

Permalink
Merge pull request #20 from lgingerich/release/v0.1.3
Browse files Browse the repository at this point in the history
Release/v0.1.3
  • Loading branch information
lgingerich authored Feb 26, 2025
2 parents 67e4f73 + 15a7def commit 70678cb
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rand = "0.8.5"
serde = "1.0.216"
serde_json = "1.0.133"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
thiserror = "2.0.11"
tokio = { version = "1.41.0", features = ["full", "sync"] }
tracing = "0.1.41"
Expand Down
161 changes: 157 additions & 4 deletions src/storage/bigquery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use google_cloud_bigquery::http::tabledata::{
list::Value,
};
use once_cell::sync::OnceCell;
use sha2::{Digest, Sha256};
use std::sync::Arc;
use tracing::{error, info};
use tracing::{error, info, warn};

use crate::models::common::Chain;
use crate::storage::bigquery::schema::{
Expand Down Expand Up @@ -218,9 +219,14 @@ pub async fn insert_data<T: serde::Serialize>(
for chunk in data.chunks(BATCH_SIZE) {
let rows = chunk
.iter()
.map(|item| TableRow {
insert_id: None,
json: item,
.map(|item| {
// Generate an appropriate insertId based on the table type and data content
let insert_id = generate_insert_id(table_id, item, block_number);

TableRow {
insert_id: Some(insert_id),
json: item,
}
})
.collect();

Expand Down Expand Up @@ -286,6 +292,153 @@ pub async fn insert_data<T: serde::Serialize>(
Ok(())
}

// Helper function to generate appropriate InsertIDs based on table type and data content
fn generate_insert_id<T: serde::Serialize>(
table_id: &str,
data: &T,
fallback_block_number: u64,
) -> String {
// First convert the data to a Value so we can access its fields
let value = match serde_json::to_value(data) {
Ok(v) => v,
Err(e) => {
error!("Failed to serialize data for InsertID generation: {}", e);
// If serialization fails, fall back to a simple block-based ID
return format!("{}-{}", table_id, fallback_block_number);
}
};

// Get block_number from the data, with fallback to the parameter
let block_number = value
.get("block_number")
.and_then(|v| v.as_u64())
.unwrap_or_else(|| {
if table_id != "blocks" {
// For blocks, block_number might be optional in some cases
warn!(
"Missing block_number in data, using fallback: {}",
fallback_block_number
);
}
fallback_block_number
});

// Generate a base ID string that might exceed the length limit
let base_id = match table_id {
"blocks" => {
// For blocks, just use the block number
format!("block-{}", block_number)
}
"transactions" => {
// For transactions, combine block_number and tx_hash
let tx_hash = match value.get("tx_hash").and_then(|v| v.as_str()) {
Some(hash) => hash,
None => {
error!(
"Missing mandatory tx_hash field in transaction data for block {}",
block_number
);
return format!("tx-{}-unknown", block_number);
}
};

format!("tx-{}-{}", block_number, tx_hash)
}
"logs" => {
// For logs, combine block_number, tx_hash, tx_index, and log_index
let tx_hash = match value.get("tx_hash").and_then(|v| v.as_str()) {
Some(hash) => hash,
None => {
error!(
"Missing mandatory tx_hash field in log data for block {}",
block_number
);
return format!("log-{}-unknown-0-0", block_number);
}
};

let tx_index = match value.get("tx_index").and_then(|v| v.as_u64()) {
Some(idx) => idx,
None => {
error!(
"Missing mandatory tx_index field in log data for block {}",
block_number
);
0
}
};

let log_index = match value.get("log_index").and_then(|v| v.as_u64()) {
Some(idx) => idx,
None => {
error!(
"Missing mandatory log_index field in log data for block {}",
block_number
);
0
}
};

format!(
"log-{}-{}-{}-{}",
block_number, tx_hash, tx_index, log_index
)
}
"traces" => {
// For traces, combine block_number, tx_hash, and trace_address
let tx_hash = match value.get("tx_hash").and_then(|v| v.as_str()) {
Some(hash) => hash,
None => {
error!(
"Missing mandatory tx_hash field in trace data for block {}",
block_number
);
return format!("trace-{}-unknown-root", block_number);
}
};

// Handle trace_address which is an array
let trace_address = match value.get("trace_address").and_then(|v| v.as_array()) {
Some(addr_array) => addr_array
.iter()
.map(|v| v.as_u64().unwrap_or(0).to_string())
.collect::<Vec<String>>()
.join("-"),
None => {
error!(
"Missing mandatory trace_address field in trace data for block {}",
block_number
);
"root".to_string()
}
};

format!("trace-{}-{}-{}", block_number, tx_hash, trace_address)
}
// For any other table types
_ => {
warn!("Invalid table ID: {}", table_id);
format!("{}-{}", table_id, block_number)
}
};

// Check if the base ID exceeds the length limit (128 bytes)
// UTF-8 characters can be up to 4 bytes each, so we'll be conservative
if base_id.len() > 120 {
// Leave some margin for safety
// If it's too long, hash it to create a fixed-length ID
// Use the first 16 bytes of the SHA-256 hash (32 hex chars)
// and prepend with the table ID and block number for readability
let mut hasher = Sha256::new();
hasher.update(base_id.as_bytes());
let hash = format!("{:x}", hasher.finalize());
format!("{}-{}-{}", table_id, block_number, &hash[..32])
} else {
// If it's within the limit, use the base ID as is
base_id
}
}

// Get the last processed block number from storage
pub async fn get_last_processed_block(chain_name: &str, datasets: &Vec<String>) -> Result<u64> {
let (client, project_id) = &*get_client().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/utils/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl RateLimiter {
let mut limit = current_limit.lock().unwrap();

// If error rate is too high, reduce concurrency
if error_rate > 0.1 {
if error_rate > 0.05 {
*limit = (*limit * 3 / 4).max(1);
}
// If response time is too high, reduce concurrency
Expand Down

0 comments on commit 70678cb

Please sign in to comment.