diff --git a/examples/nextjs/app/langbase/pipe/run/route.ts b/examples/nextjs/app/langbase/pipe/run/route.ts index 8981d88..a89fbba 100644 --- a/examples/nextjs/app/langbase/pipe/run/route.ts +++ b/examples/nextjs/app/langbase/pipe/run/route.ts @@ -13,6 +13,7 @@ export async function POST(req: NextRequest) { const result = await langbase.pipe.run({ messages: [{role: 'user', content: prompt}], name: 'summary', + stream: false }); // 3. Done, return the stream in a readable stream format. diff --git a/packages/langbase/src/common/request.ts b/packages/langbase/src/common/request.ts index db8fa81..566c6bd 100644 --- a/packages/langbase/src/common/request.ts +++ b/packages/langbase/src/common/request.ts @@ -57,16 +57,23 @@ export class Request { await this.handleErrorResponse({response}); } - if(!options.body) { + if (!options.body) { return this.handleGenerateResponse({ response, isChat: false, threadId: null, - }) + }); } const threadId = response.headers.get('lb-thread-id'); + if (options.body?.stream && url.includes('run')) { + return this.handleRunResponseStream({ + response, + rawResponse: options.body.rawResponse, + }) as T; + } + if (options.body.stream) { return this.handleStreamResponse({response}) as T; } @@ -135,6 +142,41 @@ export class Request { return {stream, threadId: response.headers.get('lb-thread-id')}; } + private handleRunResponseStream({ + response, + rawResponse, + }: { + response: Response; + rawResponse?: boolean; + }): { + stream: any; + threadId: string | null; + rawResponse?: { + headers: Record; + }; + } { + const controller = new AbortController(); + const streamSSE = Stream.fromSSEResponse(response, controller); + const stream = streamSSE.toReadableStream(); + + const result: { + stream: ReadableStream; + threadId: string | null; + rawResponse?: { + headers: Record; + }; + } = { + stream, + threadId: response.headers.get('lb-thread-id'), + }; + if (rawResponse) { + result.rawResponse = { + headers: Object.fromEntries(response.headers.entries()), + }; + } + return result; + } + private async handleGenerateResponse({ response, isChat, diff --git a/packages/langbase/src/langbase/langbase.ts b/packages/langbase/src/langbase/langbase.ts index 5a07564..64633f8 100644 --- a/packages/langbase/src/langbase/langbase.ts +++ b/packages/langbase/src/langbase/langbase.ts @@ -1,34 +1,74 @@ import {Request} from '../common/request'; -import { - Pipe as PipeBaseAI, - RunOptions as RunOptionsT, - RunOptionsStream as RunOptionsStreamT, - RunResponse, - RunResponseStream, -} from '@baseai/core'; export type Role = 'user' | 'assistant' | 'system' | 'tool'; -// Base types without name and apiKey -type BaseRunOptions = Omit & { +export interface RunOptionsBase { + messages?: Message[]; + variables?: Variable[]; + threadId?: string; + rawResponse?: boolean; + runTools?: boolean; + tools?: Tools[]; + name?: string; // Pipe name for SDK, + apiKey?: string; // pipe level key for SDK + llmKey?: string; // LLM API key +} + +export interface RunOptionsT extends RunOptionsBase { + stream?: false; +} + +export interface RunOptionsStreamT extends RunOptionsBase { + stream: true; +} + +interface ChoiceGenerate { + index: number; + message: Message; + logprobs: boolean | null; + finish_reason: string; +} + +export interface Usage { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; +} + +export interface RunResponse { + completion: string; + threadId?: string; + id: string; + object: string; + created: number; + model: string; + choices: ChoiceGenerate[]; + usage: Usage; + system_fingerprint: string | null; + rawResponse?: { + headers: Record; + }; messages: Message[]; llmKey?: string; -}; + name?: string; +} + +export interface RunResponseStream { + stream: ReadableStream; + threadId: string | null; + rawResponse?: { + headers: Record; + }; +} // Union type for RunOptions export type RunOptions = - | (BaseRunOptions & {name: string; apiKey?: never}) - | (BaseRunOptions & {name?: never; apiKey: string}); - -// Similar structure for RunOptionsStream -type BaseRunOptionsStream = Omit & { - messages: Message[]; - llmKey?: string; -}; + | (RunOptionsT & {name: string; apiKey?: never}) + | (RunOptionsT & {name?: never; apiKey: string}); export type RunOptionsStream = - | (BaseRunOptionsStream & {name: string; apiKey?: never}) - | (BaseRunOptionsStream & {name?: never; apiKey: string}); + | (RunOptionsStreamT & {name: string; apiKey?: never}) + | (RunOptionsStreamT & {name?: never; apiKey: string}); export interface Function { name: string; @@ -59,6 +99,15 @@ interface ToolChoice { function: {name: string}; } +interface Tools { + type: 'function'; + function: { + name: string; + description?: string; + parameters?: Record; + }; +} + interface PipeBaseOptions { name: string; description?: string; @@ -75,14 +124,7 @@ interface PipeBaseOptions { presence_penalty?: number; frequency_penalty?: number; stop?: string[]; - tools?: { - type: 'function'; - function: { - name: string; - description?: string; - parameters?: Record; - }; - }[]; + tools?: Tools[]; tool_choice?: 'auto' | 'required' | ToolChoice; parallel_tool_calls?: boolean; messages?: Message[]; @@ -113,16 +155,7 @@ export interface PipeListResponse { parallel_tool_calls: boolean; messages: Message[]; variables: Variable[] | []; - tools: - | { - type: 'function'; - function: { - name: string; - description?: string; - parameters?: Record; - }; - }[] - | []; + tools: Tools[] | []; memory: | { name: string; @@ -373,16 +406,20 @@ export class Langbase { ); } - const pipe = new PipeBaseAI({ - apiKey: options.apiKey ?? this.apiKey, - name: options.name?.trim() || '', // Pipe name - prod: true, - // default values - model: 'openai:gpt-4o-mini', - tools: [], - } as any); + // Remove stream property if it's not set to true + if (typeof options.stream === 'undefined') { + delete options.stream; + } - return await pipe.run({...options, runTools: false}); + return this.request.post({ + endpoint: '/v1/pipes/run', + body: options, + headers: { + ...(options.llmKey && { + 'LB-LLM-KEY': options.llmKey, + }), + }, + }); } /** diff --git a/packages/langbase/src/pipes/pipes.ts b/packages/langbase/src/pipes/pipes.ts index 31553a6..f392812 100644 --- a/packages/langbase/src/pipes/pipes.ts +++ b/packages/langbase/src/pipes/pipes.ts @@ -1,4 +1,4 @@ -import {Message, Role, ToolCall, Variable} from '@/langbase/langbase'; +import {Message, Role, ToolCall, Usage, Variable} from '@/langbase/langbase'; import {Request} from '../common/request'; import {Stream} from '../common/stream'; @@ -36,12 +36,6 @@ interface Delta { tool_calls?: ToolCall[]; } -export interface Usage { - prompt_tokens: number; - completion_tokens: number; - total_tokens: number; -} - export interface GenerateResponse { completion: string; threadId?: string;