Skip to content

Commit

Permalink
Remove dead code and benchmarks for deleted code.
Browse files Browse the repository at this point in the history
  • Loading branch information
obi1kenobi committed Jan 15, 2025
1 parent bcf6a6d commit 06e84c9
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 238 deletions.
Original file line number Diff line number Diff line change
@@ -1,35 +1,21 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use langsmith_tracing_client::client::async_enabled::{ClientConfig, TracingClient};
use langsmith_tracing_client::client::blocking::{
ClientConfig as BlockingClientConfig, TracingClient as BlockingTracingClient,
};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use langsmith_tracing_client::client::{
Attachment, EventType, RunCommon, RunCreate, RunCreateExtended, RunEventBytes, RunIO, TimeValue,
Attachment, ClientConfig as BlockingClientConfig, RunCommon, RunCreate, RunCreateExtended,
RunIO, TimeValue, TracingClient as BlockingTracingClient,
};
use mockito::Server;
use serde_json::{json, Value};
use std::time::Duration;
use tokio::runtime::Runtime;

fn create_mock_client_config(server_url: &str, batch_size: usize) -> ClientConfig {
ClientConfig {
endpoint: server_url.to_string(),
queue_capacity: 1_000_000,
batch_size,
batch_timeout: Duration::from_secs(1),
headers: Default::default(),
}
}

fn create_mock_client_config_sync(server_url: &str, batch_size: usize) -> BlockingClientConfig {
BlockingClientConfig {
endpoint: server_url.to_string(),
api_key: "anything".into(),
queue_capacity: 1_000_000,
batch_size,
batch_timeout: Duration::from_secs(1),
send_at_batch_size: batch_size,
send_at_batch_time: Duration::from_secs(1),
headers: Default::default(),
num_worker_threads: 1,
compression_level: 1,
}
}

Expand Down Expand Up @@ -67,26 +53,6 @@ fn create_run_create(
}
}

fn create_run_bytes(
attachments: Option<Vec<Attachment>>,
inputs: Option<Value>,
outputs: Option<Value>,
) -> RunEventBytes {
let inputs_bytes = inputs.as_ref().map(|i| serde_json::to_vec(&i).unwrap());
let outputs_bytes = outputs.as_ref().map(|o| serde_json::to_vec(&o).unwrap());
let run_create = create_run_create(attachments, inputs, outputs);
let run_bytes = serde_json::to_vec(&run_create.run_create).unwrap();

RunEventBytes {
run_id: run_create.run_create.common.id,
event_type: EventType::Create,
run_bytes,
inputs_bytes,
outputs_bytes,
attachments: run_create.attachments,
}
}

fn create_large_json(len: usize) -> Value {
let large_array: Vec<Value> = (0..len)
.map(|i| {
Expand All @@ -113,186 +79,6 @@ fn create_large_json(len: usize) -> Value {
})
}

#[expect(dead_code)]
fn bench_run_create(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create");
for batch_size in [50] {
for json_len in [1_000, 5_000] {
for num_runs in [500, 1_000] {
group.bench_with_input(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
&(batch_size, json_len, num_runs),
|b, &(batch_size, json_len, num_runs)| {
b.to_async(&rt).iter_batched(
|| {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
);
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config =
create_mock_client_config(&server.url(), batch_size);
let client = TracingClient::new(client_config).unwrap();
(client, runs)
},
|(client, runs)| async {
for run in runs {
client.submit_run_create(black_box(run)).await.unwrap();
}
// shutdown the client to flush the queue
client.shutdown().await.unwrap();
},
BatchSize::LargeInput,
);
},
);
}
}
}
group.finish();
}

#[expect(dead_code, clippy::single_element_loop)]
fn bench_run_create_iter_custom(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create_custom_iter");
let server_url = server.url();
for batch_size in [100] {
for json_len in [3_000] {
for num_runs in [1_000] {
group.bench_function(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
|b| {
b.to_async(&rt).iter_custom(|iters| {
let mut elapsed_time = Duration::default();
let server_url = server_url.clone();
async move {
for _ in 0..iters {
let runs: Vec<RunCreateExtended> = (0..num_runs)
.map(|i| {
let mut run = create_run_create(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
);
run.run_create.common.id = format!("test_id_{}", i);
run
})
.collect();
let client_config =
create_mock_client_config(&server_url, batch_size);
let client = TracingClient::new(client_config).unwrap();

let start = std::time::Instant::now();
for run in runs {
client.submit_run_create(black_box(run)).await.unwrap();
}

// shutdown the client to flush the queue
let start_shutdown = std::time::Instant::now();
println!("----------SHUTDOWN----------");
client.shutdown().await.unwrap();
println!("----------SHUTDOWN END----------");
println!(
"Elapsed time for shutdown: {:?}",
start_shutdown.elapsed()
);
elapsed_time += start.elapsed();
println!("Elapsed time: {:?}", elapsed_time);
}
elapsed_time
}
})
},
);
}
}
}
group.finish();
}

#[expect(dead_code)]
fn bench_run_bytes_iter_custom(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let server = rt.block_on(async {
let mut server = Server::new_async().await;
server.mock("POST", "/runs/multipart").with_status(202).create_async().await;
server
});

let mut group = c.benchmark_group("run_create_bytes_iter");
let server_url = server.url();
for batch_size in [50] {
for json_len in [1_000, 5_000] {
for num_runs in [500, 1_000] {
group.bench_function(
BenchmarkId::new(
"run_create_async",
format!("batch_{}_json_{}_runs_{}", batch_size, json_len, num_runs),
),
|b| {
b.to_async(&rt).iter_custom(|iters| {
let mut elapsed_time = Duration::default();
let server_url = server_url.clone();
async move {
for _ in 0..iters {
let runs: Vec<RunEventBytes> = (0..num_runs)
.map(|_i| {
create_run_bytes(
None,
Some(create_large_json(json_len)),
Some(create_large_json(json_len)),
)
})
.collect();
let client_config =
create_mock_client_config(&server_url, batch_size);
let client = TracingClient::new(client_config).unwrap();

let start = std::time::Instant::now();
for run in runs {
client.submit_run_bytes(black_box(run)).await.unwrap();
}
// shutdown the client to flush the queue
client.shutdown().await.unwrap();
elapsed_time += start.elapsed();
}
elapsed_time
}
})
},
);
}
}
}
group.finish();
}

#[expect(unused_variables, clippy::single_element_loop)]
fn bench_run_create_sync_iter_custom(c: &mut Criterion) {
let server = {
Expand Down
4 changes: 2 additions & 2 deletions rust/crates/langsmith-tracing-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod streaming;

pub use errors::TracingClientError;
pub use run::{
Attachment, EventType, RunCommon, RunCreate, RunCreateExtended, RunEventBytes, RunIO,
RunUpdate, RunUpdateExtended, TimeValue,
Attachment, RunCommon, RunCreate, RunCreateExtended, RunIO, RunUpdate, RunUpdateExtended,
TimeValue,
};
pub use streaming::{ClientConfig, TracingClient};
16 changes: 0 additions & 16 deletions rust/crates/langsmith-tracing-client/src/client/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,6 @@ pub struct RunUpdateExtended {
pub attachments: Option<Vec<Attachment>>,
}

#[derive(Debug)]
pub struct RunEventBytes {
pub run_id: String,
pub event_type: EventType,
pub run_bytes: Vec<u8>,
pub inputs_bytes: Option<Vec<u8>>,
pub outputs_bytes: Option<Vec<u8>>,
pub attachments: Option<Vec<Attachment>>,
}

#[derive(Debug)]
pub enum EventType {
Create,
Update,
}

#[derive(Debug)]
pub(crate) enum QueuedRun {
Create(RunCreateExtended),
Expand Down

0 comments on commit 06e84c9

Please sign in to comment.