diff --git a/README.md b/README.md index 1f2eb2e1..c99478a1 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,17 @@ pip install infinity-emb[all] ### Launch via Python ```Python from infinity_emb import create server -create_server() +fastapi_app = create_server() +``` +or use the AsyncAPI directly.: + +```python +from infinity_emb import AsyncEmbeddingEngine, transformer +sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."] +engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch) +async with engine: # engine starts with engine.astart() + embeddings = np.array(await engine.embed(sentences)) +# engine stops with engine.astop() ``` ### or launch the `create_server()` command via CLI diff --git a/libs/infinity_emb/README.md b/libs/infinity_emb/README.md index 1f2eb2e1..c99478a1 100644 --- a/libs/infinity_emb/README.md +++ b/libs/infinity_emb/README.md @@ -57,7 +57,17 @@ pip install infinity-emb[all] ### Launch via Python ```Python from infinity_emb import create server -create_server() +fastapi_app = create_server() +``` +or use the AsyncAPI directly.: + +```python +from infinity_emb import AsyncEmbeddingEngine, transformer +sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."] +engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch) +async with engine: # engine starts with engine.astart() + embeddings = np.array(await engine.embed(sentences)) +# engine stops with engine.astop() ``` ### or launch the `create_server()` command via CLI diff --git a/libs/infinity_emb/infinity_emb/__init__.py b/libs/infinity_emb/infinity_emb/__init__.py index 905b6e41..7abe263b 100644 --- a/libs/infinity_emb/infinity_emb/__init__.py +++ b/libs/infinity_emb/infinity_emb/__init__.py @@ -4,6 +4,7 @@ "fastapi_schemas", "logger", "create_server", + "AsyncEmbeddingEngine", "__version__", ] import importlib.metadata @@ -11,7 +12,7 @@ from infinity_emb import fastapi_schemas, inference, transformer # reexports -from infinity_emb.infinity_server import create_server +from infinity_emb.infinity_server import AsyncEmbeddingEngine, create_server from infinity_emb.log_handler import logger __version__ = importlib.metadata.version("infinity_emb") diff --git a/libs/infinity_emb/infinity_emb/inference/batch_handler.py b/libs/infinity_emb/infinity_emb/inference/batch_handler.py index c146390a..0542778d 100644 --- a/libs/infinity_emb/infinity_emb/inference/batch_handler.py +++ b/libs/infinity_emb/infinity_emb/inference/batch_handler.py @@ -310,7 +310,8 @@ def _preprocess_batch(self): continue except Exception as ex: logger.exception(ex) - exit("_preprocess_batch crashed") + raise ValueError("_preprocess_batch crashed") + self._ready = False def _core_batch(self): """waiting for preprocessed batches (on device) @@ -338,7 +339,7 @@ def _core_batch(self): self._feature_queue.task_done() except Exception as ex: logger.exception(ex) - exit("_core_batch crashed.") + raise ValueError("_core_batch crashed.") async def _postprocess_batch(self): """collecting forward(.encode) results and put them into the result store""" @@ -379,20 +380,23 @@ async def _postprocess_batch(self): self._postprocess_queue.task_done() except Exception as ex: logger.exception(ex) - exit("Postprocessor crashed") + raise ValueError("Postprocessor crashed") async def spawn(self): """set up the resources in batch""" + if self._ready: + raise ValueError("previous threads are still running.") logger.info("creating batching engine") - self.loop = asyncio.get_event_loop() # asyncio.events._get_running_loop() + self.loop = asyncio.get_event_loop() self._threadpool.submit(self._preprocess_batch) self._threadpool.submit(self._core_batch) asyncio.create_task(self._postprocess_batch()) - def shutdown(self): + async def shutdown(self): """ set the shutdown event and close threadpool. Blocking event, until shutdown complete. """ self._shutdown.set() - self._threadpool.shutdown(wait=True) + with ThreadPoolExecutor() as tp_temp: + await to_thread(self._threadpool.shutdown, tp_temp) diff --git a/libs/infinity_emb/infinity_emb/infinity_server.py b/libs/infinity_emb/infinity_emb/infinity_server.py index 2a085434..bbe37ef6 100644 --- a/libs/infinity_emb/infinity_emb/infinity_server.py +++ b/libs/infinity_emb/infinity_emb/infinity_server.py @@ -1,4 +1,5 @@ import time +from typing import List import typer import uvicorn @@ -19,6 +20,106 @@ from infinity_emb.transformer.utils import InferenceEngine, InferenceEngineTypeHint +class AsyncEmbeddingEngine: + def __init__( + self, + model_name_or_path: str = "BAAI/bge-small-en-v1.5", + batch_size: int = 64, + engine: InferenceEngine = InferenceEngine.torch, + model_warmup=True, + ) -> None: + """Creating a Async EmbeddingEngine object. + + Args: + model_name_or_path, str: Defaults to "BAAI/bge-small-en-v1.5". + batch_size, int: Defaults to 64. + engine, InferenceEngine: backend for inference. + Defaults to InferenceEngine.torch. + model_warmup bool: decide which . Defaults to True. + + Example: + ```python + from infinity_emb import AsyncEmbeddingEngine, transformer + sentences = ["Embedded this via Infinity.", "Paris is in France."] + engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch) + async with engine: # engine starts with engine.astart() + embeddings = np.array(await engine.embed(sentences)) + # engine stops with engine.astop(). + # For frequent restarts, handle start/stop yourself. + ``` + """ + self.batch_size = batch_size + self.running = False + self._model, self._min_inference_t = select_model_to_functional( + model_name_or_path=model_name_or_path, + batch_size=batch_size, + engine=engine, + model_warmup=model_warmup, + ) + + async def astart(self): + """startup engine""" + self.running = True + self._batch_handler = BatchHandler( + max_batch_size=self.batch_size, + model=self._model, + verbose=logger.level <= 10, + batch_delay=self._min_inference_t / 2, + ) + await self._batch_handler.spawn() + + async def astop(self): + """stop engine""" + self.running = False + await self._batch_handler.shutdown() + + async def __aenter__(self): + if self.running: + raise ValueError( + "DoubleSpawn: already started `AsyncEmbeddingEngine`. " + " recommended use is via AsyncContextManager" + " `async with engine: ..`" + ) + await self.astart() + + async def __aexit__(self, *args): + self._check_running() + await self.astop() + + def overload_status(self): + self._check_running() + return self._batch_handler.overload_status() + + def is_overloaded(self) -> bool: + self._check_running() + return self._batch_handler.is_overloaded() + + async def embed(self, sentences: List[str]) -> List[List[float]]: + """embed multiple sentences + + Args: + sentences (List[str]): sentences to be embedded + + Raises: + ValueError: raised if engine is not started yet" + + Returns: + List[List[float]]: embeddings + 2D list-array of shape( len(sentences),embed_dim ) + """ + self._check_running() + embeddings, _ = await self._batch_handler.schedule(sentences) + return embeddings + + def _check_running(self): + if not self.running: + raise ValueError( + "didn't start `AsyncEmbeddingEngine` " + " recommended use is via AsyncContextManager" + " `async with engine: ..`" + ) + + def create_server( model_name_or_path: str = "BAAI/bge-small-en-v1.5", url_prefix: str = "/v1", @@ -76,7 +177,7 @@ async def _startup(): @app.on_event("shutdown") async def _shutdown(): - app.batch_handler.shutdown() + await app.batch_handler.shutdown() @app.get("/ready") async def _ready() -> float: diff --git a/libs/infinity_emb/infinity_emb/transformer/__init__.py b/libs/infinity_emb/infinity_emb/transformer/__init__.py new file mode 100644 index 00000000..c5ee173e --- /dev/null +++ b/libs/infinity_emb/infinity_emb/transformer/__init__.py @@ -0,0 +1,2 @@ +__all__ = ["InferenceEngine"] +from infinity_emb.transformer.utils import InferenceEngine diff --git a/libs/infinity_emb/tests/script_live.py b/libs/infinity_emb/tests/script_live.py index 13f3046d..df10a1ed 100644 --- a/libs/infinity_emb/tests/script_live.py +++ b/libs/infinity_emb/tests/script_live.py @@ -1,5 +1,6 @@ import concurrent.futures import json +import time import timeit from functools import partial @@ -58,5 +59,21 @@ def remote(json_data: bytes, iters=1): assert latency_st * 1.1 > latency_request +def latency_single(): + session = requests.Session() + + def _post(i): + json_d = json.dumps({"input": [str(i)], "model": "model"}) + s = time.perf_counter() + res = session.post(f"{LIVE_URL}/embeddings", data=json_d) + e = time.perf_counter() + assert res.status_code == 200 + return (e - s) * 10**3 + + _post("hi") + times = [_post(i) for i in range(32)] + print(f"{np.median(times)}+-{np.std(times)}") + + if __name__ == "__main__": - embedding_live_performance() + latency_single() diff --git a/libs/infinity_emb/tests/unit_test/inference/test_batch_handler.py b/libs/infinity_emb/tests/unit_test/inference/test_batch_handler.py index 8d59fa5e..cde99c7d 100644 --- a/libs/infinity_emb/tests/unit_test/inference/test_batch_handler.py +++ b/libs/infinity_emb/tests/unit_test/inference/test_batch_handler.py @@ -79,15 +79,15 @@ def method_st(_sentences): # yappi.get_func_stats().print_all() # yappi.stop() method_st(sentences[::10]) - time.sleep(0.2) + time.sleep(0.5) time_batch_handler = np.median( [(await method_batch_handler(sentences)) for _ in range(N_TIMINGS)] ) - time.sleep(0.2) + time.sleep(0.5) time_st_patched = np.median( [method_patched(sentences) for _ in range(N_TIMINGS)] ) - time.sleep(0.2) + time.sleep(0.5) time_st = np.median([method_st(sentences) for _ in range(N_TIMINGS)]) print( @@ -113,4 +113,4 @@ def method_st(_sentences): ) finally: - bh.shutdown() + await bh.shutdown() diff --git a/libs/infinity_emb/tests/unit_test/test_infinity_server.py b/libs/infinity_emb/tests/unit_test/test_infinity_server.py index 23765710..f46deab9 100644 --- a/libs/infinity_emb/tests/unit_test/test_infinity_server.py +++ b/libs/infinity_emb/tests/unit_test/test_infinity_server.py @@ -1,9 +1,12 @@ import subprocess +import numpy as np +import pytest import typer import uvicorn from fastapi import FastAPI +from infinity_emb import AsyncEmbeddingEngine, transformer from infinity_emb.infinity_server import ( UVICORN_LOG_LEVELS, InferenceEngineTypeHint, @@ -14,6 +17,38 @@ from infinity_emb.transformer.utils import InferenceEngine +@pytest.mark.anyio +async def test_async_api_debug(): + sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."] + engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.debugengine) + async with engine: + embeddings = np.array(await engine.embed(sentences)) + assert embeddings.shape[0] == len(sentences) + assert embeddings.shape[1] >= 10 + for idx, s in enumerate(sentences): + assert embeddings[idx][0] == len(s), f"{embeddings}, {s}" + + +@pytest.mark.anyio +async def test_async_api_torch(): + sentences = ["Hi", "how"] + engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch) + async with engine: + embeddings = np.array(await engine.embed(sentences)) + assert embeddings.shape[0] == 2 + assert embeddings.shape[1] >= 10 + + +@pytest.mark.anyio +async def test_async_api_fastembed(): + sentences = ["Hi", "how"] + engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.fastembed) + async with engine: + embeddings = np.array(await engine.embed(sentences)) + assert embeddings.shape[0] == 2 + assert embeddings.shape[1] >= 10 + + def test_cli_help(): log = subprocess.run(["infinity_emb", "--help"]) assert log.returncode == 0