Streaming with the per_req_config_modifier #665
-
I am currently developing an app using langserve that requires a user to be authenticated to load files into the vectorstore and for retrieving documents as well: Based on the example provided here: I have come up with this so far which works perfectly for the "invoke" response: class PerUserQuery(RunnableSerializable):
"""
A custom runnable that returns a list of documents for the given user.
The runnable is configurable by the user, and the search results are
filtered by the user ID.
"""
user_id: Optional[str]
vectorstore: VectorStore
class Config:
# Allow arbitrary types since VectorStore is an abstract interface
# and not a pydantic model
arbitrary_types_allowed = True
def get_rag_chain(self, query):
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
retriever = self.vectorstore.similarity_search(
query, k=10, filter={"user_id": {"$eq": self.user_id}}
)
formatted_docs = format_docs(retriever)
setup_and_retrieval = RunnableParallel(
{"context": lambda x: formatted_docs, "question": RunnablePassthrough()}
)
prompt = hub.pull("rarchit/rag-prompt-bullet")
llm = get_llm()
rag_chain = setup_and_retrieval | prompt | llm | StrOutputParser()
return rag_chain
def _invoke(
self, input: str, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> List[Document]:
"""
Invoke the retriever
"""
rag_chain = self.get_rag_chain(query=input)
return rag_chain.invoke(str(input))
def invoke(
self, input: str, config: Optional[RunnableConfig] = None, **kwargs
) -> List[Document]:
return self._call_with_config(self._invoke, input, config, **kwargs) However I am unable to define a request modifier for the stream endpoint async def _stream(
self, input: str, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> AsyncGenerator[str, None]:
"""
Stream the retriever
"""
rag_chain = self.get_rag_chain(query=input)
async for chunk in rag_chain.stream(str(input)):
yield chunk
async def stream(
self, input: str, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> StreamingResponse:
return StreamingResponse(
self._call_with_config(self._stream, input, config, **kwargs)
) But did not have any luck with streaming, it just sends the entire response at once. I would like to obtain the same streaming behaviour as I would if I added the rag_chain directly with: add_routes(app, rag_chain) Any help would be appreciated. Stuck here |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
Also doing something similar. Would love to see if anyone has an answer for this. Also curious if it's possible to enable a /playground endpoint for this PerUserVectorstore |
Beta Was this translation helpful? Give feedback.
@ltbd78 Thank you so much, yes I dug deeper into what you mentioned earlier and after tinkering around for quite a bit, I got something that works for me
Modified the class to look like this: