Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📦 NEW: pipe.run() support in SDK #82

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/nextjs/app/langbase/pipe/run/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export async function POST(req: NextRequest) {
const result = await langbase.pipe.run({
messages: [{role: 'user', content: prompt}],
name: 'summary',
stream: false
});

// 3. Done, return the stream in a readable stream format.
Expand Down
46 changes: 44 additions & 2 deletions packages/langbase/src/common/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,23 @@ export class Request {
await this.handleErrorResponse({response});
}

if(!options.body) {
if (!options.body) {
return this.handleGenerateResponse({
response,
isChat: false,
threadId: null,
})
});
}

const threadId = response.headers.get('lb-thread-id');

if (options.body?.stream && url.includes('run')) {
return this.handleRunResponseStream({
response,
rawResponse: options.body.rawResponse,
}) as T;
}

if (options.body.stream) {
return this.handleStreamResponse({response}) as T;
}
Expand Down Expand Up @@ -135,6 +142,41 @@ export class Request {
return {stream, threadId: response.headers.get('lb-thread-id')};
}

private handleRunResponseStream({
response,
rawResponse,
}: {
response: Response;
rawResponse?: boolean;
}): {
stream: any;
threadId: string | null;
rawResponse?: {
headers: Record<string, string>;
};
} {
const controller = new AbortController();
const streamSSE = Stream.fromSSEResponse(response, controller);
const stream = streamSSE.toReadableStream();

const result: {
stream: ReadableStream<any>;
threadId: string | null;
rawResponse?: {
headers: Record<string, string>;
};
} = {
stream,
threadId: response.headers.get('lb-thread-id'),
};
if (rawResponse) {
result.rawResponse = {
headers: Object.fromEntries(response.headers.entries()),
};
}
return result;
}

private async handleGenerateResponse({
response,
isChat,
Expand Down
131 changes: 84 additions & 47 deletions packages/langbase/src/langbase/langbase.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,74 @@
import {Request} from '../common/request';
import {
Pipe as PipeBaseAI,
RunOptions as RunOptionsT,
RunOptionsStream as RunOptionsStreamT,
RunResponse,
RunResponseStream,
} from '@baseai/core';

export type Role = 'user' | 'assistant' | 'system' | 'tool';

// Base types without name and apiKey
type BaseRunOptions = Omit<RunOptionsT, 'name' | 'apiKey'> & {
export interface RunOptionsBase {
messages?: Message[];
variables?: Variable[];
threadId?: string;
rawResponse?: boolean;
runTools?: boolean;
tools?: Tools[];
name?: string; // Pipe name for SDK,
apiKey?: string; // pipe level key for SDK
llmKey?: string; // LLM API key
}

export interface RunOptionsT extends RunOptionsBase {
stream?: false;
}

export interface RunOptionsStreamT extends RunOptionsBase {
stream: true;
}

interface ChoiceGenerate {
index: number;
message: Message;
logprobs: boolean | null;
finish_reason: string;
}

export interface Usage {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
}

export interface RunResponse {
completion: string;
threadId?: string;
id: string;
object: string;
created: number;
model: string;
choices: ChoiceGenerate[];
usage: Usage;
system_fingerprint: string | null;
rawResponse?: {
headers: Record<string, string>;
};
messages: Message[];
llmKey?: string;
};
name?: string;
}

export interface RunResponseStream {
stream: ReadableStream<any>;
threadId: string | null;
rawResponse?: {
headers: Record<string, string>;
};
}

// Union type for RunOptions
export type RunOptions =
| (BaseRunOptions & {name: string; apiKey?: never})
| (BaseRunOptions & {name?: never; apiKey: string});

// Similar structure for RunOptionsStream
type BaseRunOptionsStream = Omit<RunOptionsStreamT, 'name' | 'apiKey'> & {
messages: Message[];
llmKey?: string;
};
| (RunOptionsT & {name: string; apiKey?: never})
| (RunOptionsT & {name?: never; apiKey: string});

export type RunOptionsStream =
| (BaseRunOptionsStream & {name: string; apiKey?: never})
| (BaseRunOptionsStream & {name?: never; apiKey: string});
| (RunOptionsStreamT & {name: string; apiKey?: never})
| (RunOptionsStreamT & {name?: never; apiKey: string});

export interface Function {
name: string;
Expand Down Expand Up @@ -59,6 +99,15 @@ interface ToolChoice {
function: {name: string};
}

interface Tools {
type: 'function';
function: {
name: string;
description?: string;
parameters?: Record<string, any>;
};
}

interface PipeBaseOptions {
name: string;
description?: string;
Expand All @@ -75,14 +124,7 @@ interface PipeBaseOptions {
presence_penalty?: number;
frequency_penalty?: number;
stop?: string[];
tools?: {
type: 'function';
function: {
name: string;
description?: string;
parameters?: Record<string, any>;
};
}[];
tools?: Tools[];
tool_choice?: 'auto' | 'required' | ToolChoice;
parallel_tool_calls?: boolean;
messages?: Message[];
Expand Down Expand Up @@ -113,16 +155,7 @@ export interface PipeListResponse {
parallel_tool_calls: boolean;
messages: Message[];
variables: Variable[] | [];
tools:
| {
type: 'function';
function: {
name: string;
description?: string;
parameters?: Record<string, any>;
};
}[]
| [];
tools: Tools[] | [];
memory:
| {
name: string;
Expand Down Expand Up @@ -373,16 +406,20 @@ export class Langbase {
);
}

const pipe = new PipeBaseAI({
apiKey: options.apiKey ?? this.apiKey,
name: options.name?.trim() || '', // Pipe name
prod: true,
// default values
model: 'openai:gpt-4o-mini',
tools: [],
} as any);
// Remove stream property if it's not set to true
if (typeof options.stream === 'undefined') {
delete options.stream;
}

return await pipe.run({...options, runTools: false});
return this.request.post({
endpoint: '/v1/pipes/run',
body: options,
headers: {
...(options.llmKey && {
'LB-LLM-KEY': options.llmKey,
}),
},
});
}

/**
Expand Down
8 changes: 1 addition & 7 deletions packages/langbase/src/pipes/pipes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Message, Role, ToolCall, Variable} from '@/langbase/langbase';
import {Message, Role, ToolCall, Usage, Variable} from '@/langbase/langbase';
import {Request} from '../common/request';
import {Stream} from '../common/stream';

Expand Down Expand Up @@ -36,12 +36,6 @@ interface Delta {
tool_calls?: ToolCall[];
}

export interface Usage {
prompt_tokens: number;
completion_tokens: number;
total_tokens: number;
}

export interface GenerateResponse {
completion: string;
threadId?: string;
Expand Down
Loading