Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Rust tracing client more robust to errors. #1424

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resolver = "2"
[workspace.lints]

[workspace.dependencies]
backon = "1.3.0"
chrono = "0.4.38"
crossbeam-channel = "0.5.14"
fastrand = "2.3.0"
Expand Down
1 change: 1 addition & 0 deletions rust/crates/langsmith-tracing-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fastrand = { workspace = true }
crossbeam-channel = { workspace = true }
http = { workspace = true }
zstd = { workspace = true }
backon = { workspace = true }

[dev-dependencies]
multer = "3.1.0"
Expand Down
37 changes: 32 additions & 5 deletions rust/crates/langsmith-tracing-client/src/client/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ pub enum TracingClientError {
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),

#[error("HTTP error: {0}")]
HttpError(reqwest::StatusCode),
#[error("HTTP error: status {0}, message \"{1}\"")]
HttpError(reqwest::StatusCode, String),

#[error("Request error: {0}")]
RequestError(#[from] reqwest::Error),

#[error("Channel send error")]
ChannelSendError,

#[error("Unexpected shutdown")]
UnexpectedShutdown,

Expand All @@ -29,3 +26,33 @@ impl From<std::io::Error> for TracingClientError {
Self::IoError(value.to_string())
}
}

/// When an error involving our output stream happens, what state is the stream in?
#[derive(Debug)]
pub(crate) enum StreamState {
/// The stream is safe. We can skip the offending record and keep going.
#[expect(
dead_code,
reason = "will be used when we decide how to report non-fatal client errors"
)]
Safe(TracingClientError),

/// Some of the offending record's data has been written into the stream,
/// so the stream's data is now invalid and cannot be recovered.
/// We must discard the entire stream and start over.
#[expect(
dead_code,
reason = "will be used when we decide how to report non-fatal client errors"
)]
Polluted(TracingClientError),
}

impl StreamState {
pub(crate) fn safe(inner: impl Into<TracingClientError>) -> Self {
Self::Safe(inner.into())
}

pub(crate) fn polluted(inner: impl Into<TracingClientError>) -> Self {
Self::Polluted(inner.into())
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{borrow::Cow, io::Write, path::Path};

use crate::client::errors::StreamState;

pub struct StreamingMultipart<W: Write> {
writer: W,
boundary: String,
Expand Down Expand Up @@ -46,11 +48,7 @@ impl<W: Write> StreamingMultipart<W> {
}
}

pub(super) fn json_part(
&mut self,
name: &str,
serialized: &[u8],
) -> Result<(), std::io::Error> {
pub(super) fn json_part(&mut self, name: &str, serialized: &[u8]) -> Result<(), StreamState> {
self.empty = false;
let boundary = self.boundary.as_str();
let length = serialized.len();
Expand All @@ -75,10 +73,11 @@ Content-Disposition: form-data; name=\"{name}\"\r\n\
Content-Type: application/json\r\n\
Content-Length: {length}\r\n\
\r\n"
)?;
)
.map_err(StreamState::polluted)?;

self.writer.write_all(serialized)?;
write!(self.writer, "\r\n")
self.writer.write_all(serialized).map_err(StreamState::polluted)?;
write!(self.writer, "\r\n").map_err(StreamState::polluted)
}

pub(super) fn file_part_from_bytes(
Expand All @@ -87,7 +86,7 @@ Content-Length: {length}\r\n\
file_name: &str,
content_type: &str,
contents: &[u8],
) -> Result<(), std::io::Error> {
) -> Result<(), StreamState> {
self.empty = false;
let boundary = self.boundary.as_str();
let length = contents.len();
Expand All @@ -106,11 +105,12 @@ Content-Disposition: form-data; name=\"{name}\"; filename=\"{file_name}\"\r\n\
Content-Type: {content_type}\r\n\
Content-Length: {length}\r\n\
\r\n"
)?;
)
.map_err(StreamState::polluted)?;

self.writer.write_all(contents)?;
self.writer.write_all(contents).map_err(StreamState::polluted)?;

write!(self.writer, "\r\n")
write!(self.writer, "\r\n").map_err(StreamState::polluted)
}

pub(super) fn file_part_from_path(
Expand All @@ -119,14 +119,14 @@ Content-Length: {length}\r\n\
file_name: &str,
content_type: &str,
path: &Path,
) -> Result<(), std::io::Error> {
) -> Result<(), StreamState> {
self.empty = false;
let boundary = self.boundary.as_str();
let name = Self::escape_field_value(name);
let file_name = Self::escape_field_value(file_name);

let mut file = std::fs::File::open(path)?;
let metadata = file.metadata()?;
let mut file = std::fs::File::open(path).map_err(StreamState::safe)?;
let metadata = file.metadata().map_err(StreamState::safe)?;
let file_size = metadata.len();

// `Content-Length` is explicitly prohibited in multipart parts by RFC 7578:
Expand All @@ -141,11 +141,12 @@ Content-Disposition: form-data; name=\"{name}\"; filename=\"{file_name}\"\r\n\
Content-Type: {content_type}\r\n\
Content-Length: {file_size}\r\n\
\r\n"
)?;
)
.map_err(StreamState::polluted)?;

std::io::copy(&mut file, &mut self.writer)?;
std::io::copy(&mut file, &mut self.writer).map_err(StreamState::polluted)?;

write!(self.writer, "\r\n")
write!(self.writer, "\r\n").map_err(StreamState::polluted)
}

pub(super) fn finish(mut self) -> Result<W, std::io::Error> {
Expand Down
Loading
Loading