Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(js): Initial implementation of compression #1416

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 87 additions & 11 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@
getLangSmithEnvironmentVariable,
getRuntimeEnvironment,
} from "./utils/env.js";

import {
convertToMultipartBlobChunks,
serializeRunOperationAsMultipart,
type MultipartPart,
} from "./utils/multipart.js";
import {
EvaluationResult,
EvaluationResults,
Expand Down Expand Up @@ -87,6 +91,10 @@
* Useful if encountering network rate limits at trace high volumes.
*/
manualFlushMode?: boolean;
/**
* Whether to enable compression of batched runs before sending to LangSmith.
*/
useRunCompression?: boolean;
}

/**
Expand Down Expand Up @@ -284,16 +292,20 @@
sourceRunId?: string;
};

type AutoBatchQueueItem = {
type AutoBatchQueueItemInput = {
action: "create" | "update";
item: RunCreate | RunUpdate;
};

type MultipartPart = {
name: string;
payload: Blob;
type AutoBatchQueueItem = {
action: "create" | "update";
item: RunCreate | RunUpdate | Blob;
};

function isBlob(x: unknown): x is Blob {
return x != null && typeof x === "object" && "size" in x;
}

export function mergeRuntimeEnvIntoRunCreate(run: RunCreate) {
const runtimeEnv = getRuntimeEnvironment();
const envVars = getLangChainEnvVarsMetadata();
Expand Down Expand Up @@ -373,10 +385,35 @@
return false;
};

const _compressPayload = async (stream: CompressionStream, payload: Blob[]) => {
const compressedPayloadStream = new Blob(payload)
.stream()
.pipeThrough(stream);
const reader = compressedPayloadStream.getReader();
const chunks = [];
let totalLength = 0;
try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
totalLength += value.length;

Check warning on line 401 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'totalLength' is assigned a value but never used. Allowed unused vars must match /^_/u
}
} finally {
reader.releaseLock();
}
return new Blob(chunks);
};

function _createNewCompressionStream() {
return new CompressionStream("gzip");
}

export class AutoBatchQueue {
items: {
action: "create" | "update";
payload: RunCreate | RunUpdate;
payload: RunCreate | RunUpdate | Blob;
itemPromiseResolve: () => void;
itemPromise: Promise<void>;
size: number;
Expand All @@ -388,17 +425,20 @@
return this.items[0];
}

push(item: AutoBatchQueueItem): Promise<void> {
async push(item: AutoBatchQueueItemInput): Promise<void> {
let itemPromiseResolve;
const itemPromise = new Promise<void>((resolve) => {
// Setting itemPromiseResolve is synchronous with promise creation:
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/Promise
itemPromiseResolve = resolve;
});
const size = stringifyForTracing(item.item).length;
let payload: RunCreate | RunUpdate | Blob = item.item;

Check failure on line 435 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'payload' is never reassigned. Use 'const' instead
const size = isBlob(payload)
? payload.size
: stringifyForTracing(payload).length;
this.items.push({
action: item.action,
payload: item.item,
payload,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
itemPromiseResolve: itemPromiseResolve!,
itemPromise,
Expand Down Expand Up @@ -429,7 +469,7 @@
// If there is an item on the queue we were unable to pop,
// just return it as a single batch.
if (popped.length === 0 && this.items.length > 0) {
const item = this.items.shift()!;

Check warning on line 472 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Forbidden non-null assertion
popped.push(item);
poppedSizeBytes += item.size;
this.sizeBytes -= item.size;
Expand Down Expand Up @@ -495,6 +535,11 @@

private manualFlushMode = false;

private useRunCompression =
getEnvironmentVariable("LANGSMITH_USE_RUN_COMPRESSION") === "true";

private compressionStream = _createNewCompressionStream();

constructor(config: ClientConfig = {}) {
const defaultConfig = Client.getDefaultClientConfig();

Expand Down Expand Up @@ -533,6 +578,7 @@
this.batchSizeBytesLimit = config.batchSizeBytesLimit;
this.fetchOptions = config.fetchOptions || {};
this.manualFlushMode = config.manualFlushMode ?? this.manualFlushMode;
this.useRunCompression = config.useRunCompression ?? this.useRunCompression;
}

public static getDefaultClientConfig(): {
Expand Down Expand Up @@ -821,18 +867,48 @@
}
}

private async processRunOperation(item: AutoBatchQueueItem) {
private async processRunOperation(
item: AutoBatchQueueItemInput
): Promise<void> {
clearTimeout(this.autoBatchTimeout);
this.autoBatchTimeout = undefined;
let queueItem: AutoBatchQueueItem = item;

Check failure on line 875 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'queueItem' is never reassigned. Use 'const' instead
if (item.action === "create") {
item.item = mergeRuntimeEnvIntoRunCreate(item.item as RunCreate);
}
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
if (this.useRunCompression) {
const { parts } = serializeRunOperationAsMultipart(
item.action,
item.item
);
const blobChunks = convertToMultipartBlobChunks(parts);
// It would be great if you could get the compressed size from a ReadableStream
// so that we could pass the CompressionStream directly into a "fetch" call,
// but this doesn't seem like it's possible.
const compressedItem = await _compressPayload(
this.compressionStream,
blobChunks
);
if (
this.autoBatchQueue.sizeBytes + compressedItem.size >
sizeLimitBytes
) {
this.compressionStream = _createNewCompressionStream();
// Drain the entire queue since items compressed with one stream are not compatible with others
await this.drainAutoBatchQueue(Number.MAX_SAFE_INTEGER);
// Throw out the compressed value and retry
// TODO: Instead, should we estimate compressed size so as not to throw away compression work?
return this.processRunOperation(item);
} else {
queueItem.item = compressedItem;
}
}
const itemPromise = this.autoBatchQueue.push(item);
if (this.manualFlushMode) {
// Rely on manual flushing in serverless environments
return itemPromise;
}
const sizeLimitBytes = await this._getBatchSizeLimitBytes();
if (this.autoBatchQueue.sizeBytes > sizeLimitBytes) {
void this.drainAutoBatchQueue(sizeLimitBytes);
}
Expand Down Expand Up @@ -862,7 +938,7 @@
if (this._serverInfo === undefined) {
try {
this._serverInfo = await this._getServerInfo();
} catch (e) {

Check warning on line 941 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
console.warn(
`[WARNING]: LangSmith failed to fetch info on supported operations. Falling back to batch operations and default limits.`
);
Expand Down Expand Up @@ -1597,7 +1673,7 @@
treeFilter?: string;
isRoot?: boolean;
dataSourceType?: string;
}): Promise<any> {

Check warning on line 1676 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
let projectIds_ = projectIds || [];
if (projectNames) {
projectIds_ = [
Expand Down Expand Up @@ -1885,7 +1961,7 @@
`Failed to list shared examples: ${response.status} ${response.statusText}`
);
}
return result.map((example: any) => ({

Check warning on line 1964 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
...example,
_hostUrl: this.getHostUrl(),
}));
Expand Down Expand Up @@ -2022,7 +2098,7 @@
}
// projectId querying
return true;
} catch (e) {

Check warning on line 2101 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

'e' is defined but never used. Allowed unused args must match /^_/u
return false;
}
}
Expand Down Expand Up @@ -3399,7 +3475,7 @@
async _logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3478 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<[results: EvaluationResult[], feedbacks: Feedback[]]> {
const evalResults: Array<EvaluationResult> =
this._selectEvalResults(evaluatorResponse);
Expand Down Expand Up @@ -3438,7 +3514,7 @@
public async logEvaluationFeedback(
evaluatorResponse: EvaluationResult | EvaluationResults,
run?: Run,
sourceInfo?: { [key: string]: any }

Check warning on line 3517 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
): Promise<EvaluationResult[]> {
const [results] = await this._logEvaluationFeedback(
evaluatorResponse,
Expand Down Expand Up @@ -3934,7 +4010,7 @@

public async createCommit(
promptIdentifier: string,
object: any,

Check warning on line 4013 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
options?: {
parentCommitHash?: string;
}
Expand Down Expand Up @@ -4166,7 +4242,7 @@
isPublic?: boolean;
isArchived?: boolean;
}
): Promise<Record<string, any>> {

Check warning on line 4245 in js/src/client.ts

View workflow job for this annotation

GitHub Actions / Check linting

Unexpected any. Specify a different type
if (!(await this.promptExists(promptIdentifier))) {
throw new Error("Prompt does not exist, you must create it first.");
}
Expand Down
103 changes: 103 additions & 0 deletions js/src/utils/multipart.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { AttachmentData, RunCreate, RunUpdate } from "../schemas.js";
import { stringify as stringifyForTracing } from "./fast-safe-stringify/index.js";

export const MULTIPART_BOUNDARY =
"----LangSmithFormBoundary" + Math.random().toString(36).slice(2);

export type MultipartPart = {
name: string;
payload: Blob;
};

export function serializeRunOperationAsMultipart(
method: string,
originalPayload: RunCreate | RunUpdate
) {
const accumulatedParts: MultipartPart[] = [];
const accumulatedContext: string[] = [];
// collect fields to be sent as separate parts
const { inputs, outputs, events, attachments, ...payload } = originalPayload;
const fields = { inputs, outputs, events };
// encode the main run payload
const stringifiedPayload = stringifyForTracing(payload);
accumulatedParts.push({
name: `${method}.${payload.id}`,
payload: new Blob([stringifiedPayload], {
type: `application/json; length=${stringifiedPayload.length}`,
}),
});
// encode the fields we collected
for (const [key, value] of Object.entries(fields)) {
if (value === undefined) {
continue;
}
const stringifiedValue = stringifyForTracing(value);
accumulatedParts.push({
name: `${method}.${payload.id}.${key}`,
payload: new Blob([stringifiedValue], {
type: `application/json; length=${stringifiedValue.length}`,
}),
});
}
// encode the attachments
if (payload.id !== undefined) {
if (attachments) {
for (const [name, attachment] of Object.entries(attachments)) {
let contentType: string;
let content: AttachmentData;

if (Array.isArray(attachment)) {
[contentType, content] = attachment;
} else {
contentType = attachment.mimeType;
content = attachment.data;
}

// Validate that the attachment name doesn't contain a '.'
if (name.includes(".")) {
console.warn(
`Skipping attachment '${name}' for run ${payload.id}: Invalid attachment name. ` +
`Attachment names must not contain periods ('.'). Please rename the attachment and try again.`
);
continue;
}
accumulatedParts.push({
name: `attachment.${payload.id}.${name}`,
payload: new Blob([content], {
type: `${contentType}; length=${content.byteLength}`,
}),
});
}
}
}
// compute context
accumulatedContext.push(`trace=${payload.trace_id},id=${payload.id}`);
return {
parts: accumulatedParts,
context: accumulatedContext,
};
}

export function convertToMultipartBlobChunks(parts: MultipartPart[]) {
// Create multipart form data manually using Blobs
const chunks: Blob[] = [];

for (const part of parts) {
// Add field boundary
chunks.push(new Blob([`--${MULTIPART_BOUNDARY}\r\n`]));
chunks.push(
new Blob([
`Content-Disposition: form-data; name="${part.name}"\r\n`,
`Content-Type: ${part.payload.type}\r\n\r\n`,
])
);
chunks.push(part.payload);
chunks.push(new Blob(["\r\n"]));
}

// Do once at the end?
// // Add final boundary
// chunks.push(new Blob([`--${MULTIPART_BOUNDARY}--\r\n`]));

return chunks;
}
Loading