diff --git a/js/src/client.ts b/js/src/client.ts index 31704d448..96dab9eb8 100644 --- a/js/src/client.ts +++ b/js/src/client.ts @@ -53,7 +53,11 @@ import { getLangSmithEnvironmentVariable, getRuntimeEnvironment, } from "./utils/env.js"; - +import { + convertToMultipartBlobChunks, + serializeRunOperationAsMultipart, + type MultipartPart, +} from "./utils/multipart.js"; import { EvaluationResult, EvaluationResults, @@ -87,6 +91,10 @@ export interface ClientConfig { * Useful if encountering network rate limits at trace high volumes. */ manualFlushMode?: boolean; + /** + * Whether to enable compression of batched runs before sending to LangSmith. + */ + useRunCompression?: boolean; } /** @@ -284,16 +292,20 @@ export type CreateExampleOptions = { sourceRunId?: string; }; -type AutoBatchQueueItem = { +type AutoBatchQueueItemInput = { action: "create" | "update"; item: RunCreate | RunUpdate; }; -type MultipartPart = { - name: string; - payload: Blob; +type AutoBatchQueueItem = { + action: "create" | "update"; + item: RunCreate | RunUpdate | Blob; }; +function isBlob(x: unknown): x is Blob { + return x != null && typeof x === "object" && "size" in x; +} + export function mergeRuntimeEnvIntoRunCreate(run: RunCreate) { const runtimeEnv = getRuntimeEnvironment(); const envVars = getLangChainEnvVarsMetadata(); @@ -373,10 +385,35 @@ const handle429 = async (response?: Response) => { return false; }; +const _compressPayload = async (stream: CompressionStream, payload: Blob[]) => { + const compressedPayloadStream = new Blob(payload) + .stream() + .pipeThrough(stream); + const reader = compressedPayloadStream.getReader(); + const chunks = []; + let totalLength = 0; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + totalLength += value.length; + } + } finally { + reader.releaseLock(); + } + return new Blob(chunks); +}; + +function _createNewCompressionStream() { + return new CompressionStream("gzip"); +} + export class AutoBatchQueue { items: { action: "create" | "update"; - payload: RunCreate | RunUpdate; + payload: RunCreate | RunUpdate | Blob; itemPromiseResolve: () => void; itemPromise: Promise; size: number; @@ -388,17 +425,20 @@ export class AutoBatchQueue { return this.items[0]; } - push(item: AutoBatchQueueItem): Promise { + async push(item: AutoBatchQueueItemInput): Promise { let itemPromiseResolve; const itemPromise = new Promise((resolve) => { // Setting itemPromiseResolve is synchronous with promise creation: // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise itemPromiseResolve = resolve; }); - const size = stringifyForTracing(item.item).length; + let payload: RunCreate | RunUpdate | Blob = item.item; + const size = isBlob(payload) + ? payload.size + : stringifyForTracing(payload).length; this.items.push({ action: item.action, - payload: item.item, + payload, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion itemPromiseResolve: itemPromiseResolve!, itemPromise, @@ -495,6 +535,11 @@ export class Client implements LangSmithTracingClientInterface { private manualFlushMode = false; + private useRunCompression = + getEnvironmentVariable("LANGSMITH_USE_RUN_COMPRESSION") === "true"; + + private compressionStream = _createNewCompressionStream(); + constructor(config: ClientConfig = {}) { const defaultConfig = Client.getDefaultClientConfig(); @@ -533,6 +578,7 @@ export class Client implements LangSmithTracingClientInterface { this.batchSizeBytesLimit = config.batchSizeBytesLimit; this.fetchOptions = config.fetchOptions || {}; this.manualFlushMode = config.manualFlushMode ?? this.manualFlushMode; + this.useRunCompression = config.useRunCompression ?? this.useRunCompression; } public static getDefaultClientConfig(): { @@ -821,18 +867,48 @@ export class Client implements LangSmithTracingClientInterface { } } - private async processRunOperation(item: AutoBatchQueueItem) { + private async processRunOperation( + item: AutoBatchQueueItemInput + ): Promise { clearTimeout(this.autoBatchTimeout); this.autoBatchTimeout = undefined; + let queueItem: AutoBatchQueueItem = item; if (item.action === "create") { item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate); } + const sizeLimitBytes = await this._getBatchSizeLimitBytes(); + if (this.useRunCompression) { + const { parts } = serializeRunOperationAsMultipart( + item.action, + item.item + ); + const blobChunks = convertToMultipartBlobChunks(parts); + // It would be great if you could get the compressed size from a ReadableStream + // so that we could pass the CompressionStream directly into a "fetch" call, + // but this doesn't seem like it's possible. + const compressedItem = await _compressPayload( + this.compressionStream, + blobChunks + ); + if ( + this.autoBatchQueue.sizeBytes + compressedItem.size > + sizeLimitBytes + ) { + this.compressionStream = _createNewCompressionStream(); + // Drain the entire queue since items compressed with one stream are not compatible with others + await this.drainAutoBatchQueue(Number.MAX_SAFE_INTEGER); + // Throw out the compressed value and retry + // TODO: Instead, should we estimate compressed size so as not to throw away compression work? + return this.processRunOperation(item); + } else { + queueItem.item = compressedItem; + } + } const itemPromise = this.autoBatchQueue.push(item); if (this.manualFlushMode) { // Rely on manual flushing in serverless environments return itemPromise; } - const sizeLimitBytes = await this._getBatchSizeLimitBytes(); if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) { void this.drainAutoBatchQueue(sizeLimitBytes); } diff --git a/js/src/utils/multipart.ts b/js/src/utils/multipart.ts new file mode 100644 index 000000000..6479299f5 --- /dev/null +++ b/js/src/utils/multipart.ts @@ -0,0 +1,103 @@ +import { AttachmentData, RunCreate, RunUpdate } from "../schemas.js"; +import { stringify as stringifyForTracing } from "./fast-safe-stringify/index.js"; + +export const MULTIPART_BOUNDARY = + "----LangSmithFormBoundary" + Math.random().toString(36).slice(2); + +export type MultipartPart = { + name: string; + payload: Blob; +}; + +export function serializeRunOperationAsMultipart( + method: string, + originalPayload: RunCreate | RunUpdate +) { + const accumulatedParts: MultipartPart[] = []; + const accumulatedContext: string[] = []; + // collect fields to be sent as separate parts + const { inputs, outputs, events, attachments, ...payload } = originalPayload; + const fields = { inputs, outputs, events }; + // encode the main run payload + const stringifiedPayload = stringifyForTracing(payload); + accumulatedParts.push({ + name: `${method}.${payload.id}`, + payload: new Blob([stringifiedPayload], { + type: `application/json; length=${stringifiedPayload.length}`, + }), + }); + // encode the fields we collected + for (const [key, value] of Object.entries(fields)) { + if (value === undefined) { + continue; + } + const stringifiedValue = stringifyForTracing(value); + accumulatedParts.push({ + name: `${method}.${payload.id}.${key}`, + payload: new Blob([stringifiedValue], { + type: `application/json; length=${stringifiedValue.length}`, + }), + }); + } + // encode the attachments + if (payload.id !== undefined) { + if (attachments) { + for (const [name, attachment] of Object.entries(attachments)) { + let contentType: string; + let content: AttachmentData; + + if (Array.isArray(attachment)) { + [contentType, content] = attachment; + } else { + contentType = attachment.mimeType; + content = attachment.data; + } + + // Validate that the attachment name doesn't contain a '.' + if (name.includes(".")) { + console.warn( + `Skipping attachment '${name}' for run ${payload.id}: Invalid attachment name. ` + + `Attachment names must not contain periods ('.'). Please rename the attachment and try again.` + ); + continue; + } + accumulatedParts.push({ + name: `attachment.${payload.id}.${name}`, + payload: new Blob([content], { + type: `${contentType}; length=${content.byteLength}`, + }), + }); + } + } + } + // compute context + accumulatedContext.push(`trace=${payload.trace_id},id=${payload.id}`); + return { + parts: accumulatedParts, + context: accumulatedContext, + }; +} + +export function convertToMultipartBlobChunks(parts: MultipartPart[]) { + // Create multipart form data manually using Blobs + const chunks: Blob[] = []; + + for (const part of parts) { + // Add field boundary + chunks.push(new Blob([`--${MULTIPART_BOUNDARY}\r\n`])); + chunks.push( + new Blob([ + `Content-Disposition: form-data; name="${part.name}"\r\n`, + `Content-Type: ${part.payload.type}\r\n\r\n`, + ]) + ); + chunks.push(part.payload); + chunks.push(new Blob(["\r\n"])); + } + + // Do once at the end? + // // Add final boundary + // chunks.push(new Blob([`--${MULTIPART_BOUNDARY}--\r\n`])); + + return chunks; +}