Skip to content

Commit

Permalink
Merge pull request #32 from michaelfeil/python-api
Browse files Browse the repository at this point in the history
add python api
  • Loading branch information
michaelfeil authored Nov 11, 2023
2 parents b703655 + 311d4f1 commit bd18c90
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 15 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ pip install infinity-emb[all]
### Launch via Python
```Python
from infinity_emb import create server
create_server()
fastapi_app = create_server()
```
or use the AsyncAPI directly.:

```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop()
```

### or launch the `create_server()` command via CLI
Expand Down
12 changes: 11 additions & 1 deletion libs/infinity_emb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ pip install infinity-emb[all]
### Launch via Python
```Python
from infinity_emb import create server
create_server()
fastapi_app = create_server()
```
or use the AsyncAPI directly.:

```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop()
```

### or launch the `create_server()` command via CLI
Expand Down
3 changes: 2 additions & 1 deletion libs/infinity_emb/infinity_emb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
"fastapi_schemas",
"logger",
"create_server",
"AsyncEmbeddingEngine",
"__version__",
]
import importlib.metadata

from infinity_emb import fastapi_schemas, inference, transformer

# reexports
from infinity_emb.infinity_server import create_server
from infinity_emb.infinity_server import AsyncEmbeddingEngine, create_server
from infinity_emb.log_handler import logger

__version__ = importlib.metadata.version("infinity_emb")
16 changes: 10 additions & 6 deletions libs/infinity_emb/infinity_emb/inference/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ def _preprocess_batch(self):
continue
except Exception as ex:
logger.exception(ex)
exit("_preprocess_batch crashed")
raise ValueError("_preprocess_batch crashed")
self._ready = False

def _core_batch(self):
"""waiting for preprocessed batches (on device)
Expand Down Expand Up @@ -338,7 +339,7 @@ def _core_batch(self):
self._feature_queue.task_done()
except Exception as ex:
logger.exception(ex)
exit("_core_batch crashed.")
raise ValueError("_core_batch crashed.")

async def _postprocess_batch(self):
"""collecting forward(.encode) results and put them into the result store"""
Expand Down Expand Up @@ -379,20 +380,23 @@ async def _postprocess_batch(self):
self._postprocess_queue.task_done()
except Exception as ex:
logger.exception(ex)
exit("Postprocessor crashed")
raise ValueError("Postprocessor crashed")

async def spawn(self):
"""set up the resources in batch"""
if self._ready:
raise ValueError("previous threads are still running.")
logger.info("creating batching engine")
self.loop = asyncio.get_event_loop() # asyncio.events._get_running_loop()
self.loop = asyncio.get_event_loop()
self._threadpool.submit(self._preprocess_batch)
self._threadpool.submit(self._core_batch)
asyncio.create_task(self._postprocess_batch())

def shutdown(self):
async def shutdown(self):
"""
set the shutdown event and close threadpool.
Blocking event, until shutdown complete.
"""
self._shutdown.set()
self._threadpool.shutdown(wait=True)
with ThreadPoolExecutor() as tp_temp:
await to_thread(self._threadpool.shutdown, tp_temp)
103 changes: 102 additions & 1 deletion libs/infinity_emb/infinity_emb/infinity_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from typing import List

import typer
import uvicorn
Expand All @@ -19,6 +20,106 @@
from infinity_emb.transformer.utils import InferenceEngine, InferenceEngineTypeHint


class AsyncEmbeddingEngine:
def __init__(
self,
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
batch_size: int = 64,
engine: InferenceEngine = InferenceEngine.torch,
model_warmup=True,
) -> None:
"""Creating a Async EmbeddingEngine object.
Args:
model_name_or_path, str: Defaults to "BAAI/bge-small-en-v1.5".
batch_size, int: Defaults to 64.
engine, InferenceEngine: backend for inference.
Defaults to InferenceEngine.torch.
model_warmup bool: decide which . Defaults to True.
Example:
```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop().
# For frequent restarts, handle start/stop yourself.
```
"""
self.batch_size = batch_size
self.running = False
self._model, self._min_inference_t = select_model_to_functional(
model_name_or_path=model_name_or_path,
batch_size=batch_size,
engine=engine,
model_warmup=model_warmup,
)

async def astart(self):
"""startup engine"""
self.running = True
self._batch_handler = BatchHandler(
max_batch_size=self.batch_size,
model=self._model,
verbose=logger.level <= 10,
batch_delay=self._min_inference_t / 2,
)
await self._batch_handler.spawn()

async def astop(self):
"""stop engine"""
self.running = False
await self._batch_handler.shutdown()

async def __aenter__(self):
if self.running:
raise ValueError(
"DoubleSpawn: already started `AsyncEmbeddingEngine`. "
" recommended use is via AsyncContextManager"
" `async with engine: ..`"
)
await self.astart()

async def __aexit__(self, *args):
self._check_running()
await self.astop()

def overload_status(self):
self._check_running()
return self._batch_handler.overload_status()

def is_overloaded(self) -> bool:
self._check_running()
return self._batch_handler.is_overloaded()

async def embed(self, sentences: List[str]) -> List[List[float]]:
"""embed multiple sentences
Args:
sentences (List[str]): sentences to be embedded
Raises:
ValueError: raised if engine is not started yet"
Returns:
List[List[float]]: embeddings
2D list-array of shape( len(sentences),embed_dim )
"""
self._check_running()
embeddings, _ = await self._batch_handler.schedule(sentences)
return embeddings

def _check_running(self):
if not self.running:
raise ValueError(
"didn't start `AsyncEmbeddingEngine` "
" recommended use is via AsyncContextManager"
" `async with engine: ..`"
)


def create_server(
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
url_prefix: str = "/v1",
Expand Down Expand Up @@ -76,7 +177,7 @@ async def _startup():

@app.on_event("shutdown")
async def _shutdown():
app.batch_handler.shutdown()
await app.batch_handler.shutdown()

@app.get("/ready")
async def _ready() -> float:
Expand Down
2 changes: 2 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__all__ = ["InferenceEngine"]
from infinity_emb.transformer.utils import InferenceEngine
19 changes: 18 additions & 1 deletion libs/infinity_emb/tests/script_live.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import json
import time
import timeit
from functools import partial

Expand Down Expand Up @@ -58,5 +59,21 @@ def remote(json_data: bytes, iters=1):
assert latency_st * 1.1 > latency_request


def latency_single():
session = requests.Session()

def _post(i):
json_d = json.dumps({"input": [str(i)], "model": "model"})
s = time.perf_counter()
res = session.post(f"{LIVE_URL}/embeddings", data=json_d)
e = time.perf_counter()
assert res.status_code == 200
return (e - s) * 10**3

_post("hi")
times = [_post(i) for i in range(32)]
print(f"{np.median(times)}+-{np.std(times)}")


if __name__ == "__main__":
embedding_live_performance()
latency_single()
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def method_st(_sentences):
# yappi.get_func_stats().print_all()
# yappi.stop()
method_st(sentences[::10])
time.sleep(0.2)
time.sleep(0.5)
time_batch_handler = np.median(
[(await method_batch_handler(sentences)) for _ in range(N_TIMINGS)]
)
time.sleep(0.2)
time.sleep(0.5)
time_st_patched = np.median(
[method_patched(sentences) for _ in range(N_TIMINGS)]
)
time.sleep(0.2)
time.sleep(0.5)
time_st = np.median([method_st(sentences) for _ in range(N_TIMINGS)])

print(
Expand All @@ -113,4 +113,4 @@ def method_st(_sentences):
)

finally:
bh.shutdown()
await bh.shutdown()
35 changes: 35 additions & 0 deletions libs/infinity_emb/tests/unit_test/test_infinity_server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import subprocess

import numpy as np
import pytest
import typer
import uvicorn
from fastapi import FastAPI

from infinity_emb import AsyncEmbeddingEngine, transformer
from infinity_emb.infinity_server import (
UVICORN_LOG_LEVELS,
InferenceEngineTypeHint,
Expand All @@ -14,6 +17,38 @@
from infinity_emb.transformer.utils import InferenceEngine


@pytest.mark.anyio
async def test_async_api_debug():
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.debugengine)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == len(sentences)
assert embeddings.shape[1] >= 10
for idx, s in enumerate(sentences):
assert embeddings[idx][0] == len(s), f"{embeddings}, {s}"


@pytest.mark.anyio
async def test_async_api_torch():
sentences = ["Hi", "how"]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == 2
assert embeddings.shape[1] >= 10


@pytest.mark.anyio
async def test_async_api_fastembed():
sentences = ["Hi", "how"]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.fastembed)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == 2
assert embeddings.shape[1] >= 10


def test_cli_help():
log = subprocess.run(["infinity_emb", "--help"])
assert log.returncode == 0
Expand Down

0 comments on commit bd18c90

Please sign in to comment.