Skip to content

Commit

Permalink
Implement streaming endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
onmete committed Dec 11, 2024
1 parent 2aa8ae0 commit 5f242a7
Show file tree
Hide file tree
Showing 14 changed files with 1,044 additions and 285 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,8 @@ To send a request to the server you can use the following curl command:
curl -X 'POST' 'http://127.0.0.1:8080/v1/query' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{"query": "write a deployment yaml for the mongodb image"}'
```

> You can use `/v1/streaming_query` endpoint (with same parameters) to get the streaming response. By default, it streams text, but you can also yield events as JSONs via additionl `"media_type": "text/plain"` parameter in the payload data.

### Swagger UI

Web page with Swagger UI has the standard `/docs` endpoint. If the service is running on localhost on port 8080, Swagger UI can be accessed on address `http://localhost:8080/docs`.
Expand Down
98 changes: 97 additions & 1 deletion docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,89 @@
}
}
},
"/v1/streaming_query": {
"post": {
"tags": [
"streaming_query"
],
"summary": "Conversation Request",
"description": "Handle conversation requests for the OLS endpoint.\n\nArgs:\n llm_request: The incoming request containing query details.\n auth: The authentication context, provided by dependency injection.\n\nReturns:\n StreamingResponse: The streaming response generated for the query.",
"operationId": "conversation_request_v1_streaming_query_post",
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/LLMRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Query is valid and stream/events from endpoint is returned",
"content": {
"application/json": {
"schema": {
"type": "string",
"title": "Response 200 Conversation Request V1 Streaming Query Post"
}
}
}
},
"401": {
"description": "Missing or invalid credentials provided by client",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UnauthorizedResponse"
}
}
}
},
"403": {
"description": "Client does not have permission to access resource",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ForbiddenResponse"
}
}
}
},
"413": {
"description": "Prompt is too long",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/PromptTooLongResponse"
}
}
}
},
"500": {
"description": "Query can not be validated, LLM is not accessible or other internal error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorResponse"
}
}
}
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
}
}
}
}
},
"/v1/feedback/status": {
"get": {
"tags": [
Expand Down Expand Up @@ -568,6 +651,18 @@
}
],
"title": "Attachments"
},
"media_type": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Media Type",
"default": "text/plain"
}
},
"additionalProperties": false,
Expand All @@ -576,7 +671,7 @@
"query"
],
"title": "LLMRequest",
"description": "Model representing a request for the LLM (Language Model) send into OLS service.\n\nAttributes:\n query: The query string.\n conversation_id: The optional conversation ID (UUID).\n provider: The optional provider.\n model: The optional model.\n attachments: The optional attachments.\n\nExample:\n ```python\n llm_request = LLMRequest(query=\"Tell me about Kubernetes\")\n ```",
"description": "Model representing a request for the LLM (Language Model) send into OLS service.\n\nAttributes:\n query: The query string.\n conversation_id: The optional conversation ID (UUID).\n provider: The optional provider.\n model: The optional model.\n attachments: The optional attachments.\n media_type: The optional parameter for streaming response.\n\nExample:\n ```python\n llm_request = LLMRequest(query=\"Tell me about Kubernetes\")\n ```",
"examples": [
{
"attachments": [
Expand All @@ -597,6 +692,7 @@
}
],
"conversation_id": "123e4567-e89b-12d3-a456-426614174000",
"media_type": "text/plain",
"model": "gpt-4o-mini",
"provider": "openai",
"query": "write a deployment yaml for the mongodb image"
Expand Down
141 changes: 91 additions & 50 deletions ols/app/endpoints/ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from typing import Any, Generator, Optional, Union

import pytz
from fastapi import APIRouter, Depends, HTTPException, status
Expand Down Expand Up @@ -79,49 +79,15 @@ def conversation_request(
Returns:
Response containing the processed information.
"""
timestamps: dict[str, float] = {}
timestamps["start"] = time.time()

# Initialize variables
previous_input = []

user_id = retrieve_user_id(auth)
logger.info("User ID %s", user_id)
timestamps["retrieve user"] = time.time()

conversation_id = retrieve_conversation_id(llm_request)
timestamps["retrieve conversation"] = time.time()

# Important note: Redact the query before attempting to do any
# logging of the query to avoid leaking PII into logs.

# Redact the query
llm_request = redact_query(conversation_id, llm_request)
timestamps["redact query"] = time.time()

# Log incoming request (after redaction)
logger.info("%s Incoming request: %s", conversation_id, llm_request.query)

previous_input = retrieve_previous_input(user_id, llm_request)
timestamps["retrieve previous input"] = time.time()

# Retrieve attachments from the request
attachments = retrieve_attachments(llm_request)

# Redact all attachments
attachments = redact_attachments(conversation_id, attachments)

# All attachments should be appended to query - but store original
# query for later use in transcript storage
query_without_attachments = llm_request.query
llm_request.query = append_attachments_to_query(llm_request.query, attachments)
timestamps["append attachments"] = time.time()

validate_requested_provider_model(llm_request)

# Validate the query
valid = validate_question(conversation_id, llm_request)
timestamps["validate question"] = time.time()
(
user_id,
conversation_id,
query_without_attachments,
previous_input,
attachments,
valid,
timestamps,
) = process_request(auth, llm_request)

if not valid:
summarizer_response = SummarizerResponse(
Expand Down Expand Up @@ -172,6 +138,64 @@ def conversation_request(
)


def process_request(auth: Any, llm_request: LLMRequest):
"""Process incoming request."""
timestamps = {"start": time.time()}

user_id = retrieve_user_id(auth)
logger.info("User ID %s", user_id)
timestamps["retrieve user"] = time.time()

conversation_id = retrieve_conversation_id(llm_request)
timestamps["retrieve conversation"] = time.time()

# Important note: Redact the query before attempting to do any
# logging of the query to avoid leaking PII into logs.

# Redact the query
llm_request = redact_query(conversation_id, llm_request)
timestamps["redact query"] = time.time()

# Log incoming request (after redaction)
logger.info("%s Incoming request: %s", conversation_id, llm_request.query)

previous_input = retrieve_previous_input(user_id, llm_request)
timestamps["retrieve previous input"] = time.time()

# Retrieve attachments from the request
attachments = retrieve_attachments(llm_request)

# Redact all attachments
attachments = redact_attachments(conversation_id, attachments)

# All attachments should be appended to query - but store original
# query for later use in transcript storage
query_without_attachments = llm_request.query
llm_request.query = append_attachments_to_query(llm_request.query, attachments)
timestamps["append attachments"] = time.time()

validate_requested_provider_model(llm_request)

# Validate the query
if not previous_input:
valid = validate_question(conversation_id, llm_request)
else:
logger.debug("follow-up conversation - skipping question validation")
valid = True

timestamps["validate question"] = time.time()

return (
user_id,
conversation_id,
query_without_attachments,
previous_input,
attachments,
valid,
timestamps,
)


def log_processing_durations(timestamps: dict[str, float]) -> None:
"""Log processing durations."""

Expand Down Expand Up @@ -285,17 +309,34 @@ def generate_response(
conversation_id: str,
llm_request: LLMRequest,
previous_input: list[CacheEntry],
) -> SummarizerResponse:
"""Generate response based on validation result, previous input, and model output."""
# Summarize documentation
streaming: bool = False,
) -> Union[SummarizerResponse, Generator]:
"""Generate response based on validation result, previous input, and model output.
Args:
conversation_id: The unique identifier for the conversation.
llm_request: The request containing a query.
previous_input: The history of the conversation (if available).
streaming: The flag indicating if the response should be streamed.
Returns:
SummarizerResponse or Generator, depending on the streaming flag.
"""
try:
docs_summarizer = DocsSummarizer(
provider=llm_request.provider, model=llm_request.model
)
history = CacheEntry.cache_entries_to_history(previous_input)
return docs_summarizer.summarize(
conversation_id, llm_request.query, config.rag_index, history
)
if streaming:
return docs_summarizer.generate_response(
llm_request.query, config.rag_index, history
)
else:
response = docs_summarizer.create_response(
llm_request.query, config.rag_index, history
)
logger.debug(f"{conversation_id} Generated response: {response}")
return response
except PromptTooLongError as summarizer_error:
logger.error("Prompt is too long: %s", summarizer_error)
raise HTTPException(
Expand Down
Loading

0 comments on commit 5f242a7

Please sign in to comment.