Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python api #32

Merged
merged 6 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ pip install infinity-emb[all]
### Launch via Python
```Python
from infinity_emb import create server
create_server()
fastapi_app = create_server()
```
or use the AsyncAPI directly.:

```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop()
```

### or launch the `create_server()` command via CLI
Expand Down
12 changes: 11 additions & 1 deletion libs/infinity_emb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@ pip install infinity-emb[all]
### Launch via Python
```Python
from infinity_emb import create server
create_server()
fastapi_app = create_server()
```
or use the AsyncAPI directly.:

```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop()
```

### or launch the `create_server()` command via CLI
Expand Down
3 changes: 2 additions & 1 deletion libs/infinity_emb/infinity_emb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
"fastapi_schemas",
"logger",
"create_server",
"AsyncEmbeddingEngine",
"__version__",
]
import importlib.metadata

from infinity_emb import fastapi_schemas, inference, transformer

# reexports
from infinity_emb.infinity_server import create_server
from infinity_emb.infinity_server import AsyncEmbeddingEngine, create_server
from infinity_emb.log_handler import logger

__version__ = importlib.metadata.version("infinity_emb")
16 changes: 10 additions & 6 deletions libs/infinity_emb/infinity_emb/inference/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ def _preprocess_batch(self):
continue
except Exception as ex:
logger.exception(ex)
exit("_preprocess_batch crashed")
raise ValueError("_preprocess_batch crashed")
self._ready = False

def _core_batch(self):
"""waiting for preprocessed batches (on device)
Expand Down Expand Up @@ -338,7 +339,7 @@ def _core_batch(self):
self._feature_queue.task_done()
except Exception as ex:
logger.exception(ex)
exit("_core_batch crashed.")
raise ValueError("_core_batch crashed.")

async def _postprocess_batch(self):
"""collecting forward(.encode) results and put them into the result store"""
Expand Down Expand Up @@ -379,20 +380,23 @@ async def _postprocess_batch(self):
self._postprocess_queue.task_done()
except Exception as ex:
logger.exception(ex)
exit("Postprocessor crashed")
raise ValueError("Postprocessor crashed")

async def spawn(self):
"""set up the resources in batch"""
if self._ready:
raise ValueError("previous threads are still running.")
logger.info("creating batching engine")
self.loop = asyncio.get_event_loop() # asyncio.events._get_running_loop()
self.loop = asyncio.get_event_loop()
self._threadpool.submit(self._preprocess_batch)
self._threadpool.submit(self._core_batch)
asyncio.create_task(self._postprocess_batch())

def shutdown(self):
async def shutdown(self):
"""
set the shutdown event and close threadpool.
Blocking event, until shutdown complete.
"""
self._shutdown.set()
self._threadpool.shutdown(wait=True)
with ThreadPoolExecutor() as tp_temp:
await to_thread(self._threadpool.shutdown, tp_temp)
103 changes: 102 additions & 1 deletion libs/infinity_emb/infinity_emb/infinity_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from typing import List

import typer
import uvicorn
Expand All @@ -19,6 +20,106 @@
from infinity_emb.transformer.utils import InferenceEngine, InferenceEngineTypeHint


class AsyncEmbeddingEngine:
def __init__(
self,
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
batch_size: int = 64,
engine: InferenceEngine = InferenceEngine.torch,
model_warmup=True,
) -> None:
"""Creating a Async EmbeddingEngine object.

Args:
model_name_or_path, str: Defaults to "BAAI/bge-small-en-v1.5".
batch_size, int: Defaults to 64.
engine, InferenceEngine: backend for inference.
Defaults to InferenceEngine.torch.
model_warmup bool: decide which . Defaults to True.

Example:
```python
from infinity_emb import AsyncEmbeddingEngine, transformer
sentences = ["Embedded this via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine: # engine starts with engine.astart()
embeddings = np.array(await engine.embed(sentences))
# engine stops with engine.astop().
# For frequent restarts, handle start/stop yourself.
```
"""
self.batch_size = batch_size
self.running = False
self._model, self._min_inference_t = select_model_to_functional(
model_name_or_path=model_name_or_path,
batch_size=batch_size,
engine=engine,
model_warmup=model_warmup,
)

async def astart(self):
"""startup engine"""
self.running = True
self._batch_handler = BatchHandler(
max_batch_size=self.batch_size,
model=self._model,
verbose=logger.level <= 10,
batch_delay=self._min_inference_t / 2,
)
await self._batch_handler.spawn()

async def astop(self):
"""stop engine"""
self.running = False
await self._batch_handler.shutdown()

async def __aenter__(self):
if self.running:
raise ValueError(
"DoubleSpawn: already started `AsyncEmbeddingEngine`. "
" recommended use is via AsyncContextManager"
" `async with engine: ..`"
)
await self.astart()

async def __aexit__(self, *args):
self._check_running()
await self.astop()

def overload_status(self):
self._check_running()
return self._batch_handler.overload_status()

def is_overloaded(self) -> bool:
self._check_running()
return self._batch_handler.is_overloaded()

async def embed(self, sentences: List[str]) -> List[List[float]]:
"""embed multiple sentences

Args:
sentences (List[str]): sentences to be embedded

Raises:
ValueError: raised if engine is not started yet"

Returns:
List[List[float]]: embeddings
2D list-array of shape( len(sentences),embed_dim )
"""
self._check_running()
embeddings, _ = await self._batch_handler.schedule(sentences)
return embeddings

def _check_running(self):
if not self.running:
raise ValueError(
"didn't start `AsyncEmbeddingEngine` "
" recommended use is via AsyncContextManager"
" `async with engine: ..`"
)


def create_server(
model_name_or_path: str = "BAAI/bge-small-en-v1.5",
url_prefix: str = "/v1",
Expand Down Expand Up @@ -76,7 +177,7 @@ async def _startup():

@app.on_event("shutdown")
async def _shutdown():
app.batch_handler.shutdown()
await app.batch_handler.shutdown()

@app.get("/ready")
async def _ready() -> float:
Expand Down
2 changes: 2 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__all__ = ["InferenceEngine"]
from infinity_emb.transformer.utils import InferenceEngine
19 changes: 18 additions & 1 deletion libs/infinity_emb/tests/script_live.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import json
import time
import timeit
from functools import partial

Expand Down Expand Up @@ -58,5 +59,21 @@ def remote(json_data: bytes, iters=1):
assert latency_st * 1.1 > latency_request


def latency_single():
session = requests.Session()

def _post(i):
json_d = json.dumps({"input": [str(i)], "model": "model"})
s = time.perf_counter()
res = session.post(f"{LIVE_URL}/embeddings", data=json_d)
e = time.perf_counter()
assert res.status_code == 200
return (e - s) * 10**3

_post("hi")
times = [_post(i) for i in range(32)]
print(f"{np.median(times)}+-{np.std(times)}")


if __name__ == "__main__":
embedding_live_performance()
latency_single()
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def method_st(_sentences):
# yappi.get_func_stats().print_all()
# yappi.stop()
method_st(sentences[::10])
time.sleep(0.2)
time.sleep(0.5)
time_batch_handler = np.median(
[(await method_batch_handler(sentences)) for _ in range(N_TIMINGS)]
)
time.sleep(0.2)
time.sleep(0.5)
time_st_patched = np.median(
[method_patched(sentences) for _ in range(N_TIMINGS)]
)
time.sleep(0.2)
time.sleep(0.5)
time_st = np.median([method_st(sentences) for _ in range(N_TIMINGS)])

print(
Expand All @@ -113,4 +113,4 @@ def method_st(_sentences):
)

finally:
bh.shutdown()
await bh.shutdown()
35 changes: 35 additions & 0 deletions libs/infinity_emb/tests/unit_test/test_infinity_server.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import subprocess

import numpy as np
import pytest
import typer
import uvicorn
from fastapi import FastAPI

from infinity_emb import AsyncEmbeddingEngine, transformer
from infinity_emb.infinity_server import (
UVICORN_LOG_LEVELS,
InferenceEngineTypeHint,
Expand All @@ -14,6 +17,38 @@
from infinity_emb.transformer.utils import InferenceEngine


@pytest.mark.anyio
async def test_async_api_debug():
sentences = ["Embedded this is sentence via Infinity.", "Paris is in France."]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.debugengine)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == len(sentences)
assert embeddings.shape[1] >= 10
for idx, s in enumerate(sentences):
assert embeddings[idx][0] == len(s), f"{embeddings}, {s}"


@pytest.mark.anyio
async def test_async_api_torch():
sentences = ["Hi", "how"]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.torch)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == 2
assert embeddings.shape[1] >= 10


@pytest.mark.anyio
async def test_async_api_fastembed():
sentences = ["Hi", "how"]
engine = AsyncEmbeddingEngine(engine=transformer.InferenceEngine.fastembed)
async with engine:
embeddings = np.array(await engine.embed(sentences))
assert embeddings.shape[0] == 2
assert embeddings.shape[1] >= 10


def test_cli_help():
log = subprocess.run(["infinity_emb", "--help"])
assert log.returncode == 0
Expand Down