Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TPOT and ITL metrics #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 44 additions & 24 deletions benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ async def send_stream_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, List[float], Dict[str, int]]:
"""Sends stream request to server"""
request_start_time = time.time()
errors = init_errors_map()
Expand Down Expand Up @@ -190,7 +190,9 @@ async def send_stream_request(
raise ValueError(f"Unknown backend: {backend}")

ttft = 0.0
itl = []
st = time.perf_counter()
most_recent_timestamp = st
output = ""
timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session:
Expand All @@ -204,42 +206,45 @@ async def send_stream_request(
# First token
if ttft == 0.0:
ttft = timestamp - st

else:
itl.append(timestamp - most_recent_timestamp)
most_recent_timestamp = timestamp
if backend == "vllm":
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
elif backend == "jetstream":
if chunk_bytes.decode("utf-8") != "":
output += json.loads(chunk_bytes.decode("utf-8"))["text"]

except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, errors
return None, None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
output_len = len(output_token_ids)
request_latency = (prompt_len, output_len, (request_end_time - request_start_time))
return request_latency, ttft, None
return request_latency, ttft, itl, None

async def send_request(
backend: str,
Expand All @@ -253,7 +258,7 @@ async def send_request(
tokenizer: PreTrainedTokenizerBase,
sax_model: str,
model: str,
) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]:
) -> Tuple[Tuple[int, int, float], float, List[float], Dict[str, int]]:
"""Sends request to server."""
request_start_time = time.time()
errors = init_errors_map()
Expand Down Expand Up @@ -339,27 +344,27 @@ async def send_request(
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, errors
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, errors
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, errors
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, errors
return None, None, None, errors

request_end_time = time.time()
# Naive HF transformers generation and TensorRT-LLM generation stops at EOS
Expand Down Expand Up @@ -393,14 +398,14 @@ async def send_request(
prompt_length_metric.observe(prompt_len)
response_length_metric.observe(output_len)

return request_latency, None, None
return request_latency, None, None, None

async def benchmark(
args: argparse.Namespace,
api_url: str,
tokenizer: PreTrainedTokenizerBase,
model: str,
) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]:
) -> Tuple[List[Tuple[int, int, float]], List[float], List[float], List[float], Dict[str, int]]:
"""Runs benchmark with asynchronous requests."""
input_requests = get_filtered_dataset(
args.dataset,
Expand Down Expand Up @@ -453,19 +458,25 @@ async def benchmark(
results = await asyncio.gather(*tasks)
combined_latencies = []
combined_ttfts = []
combined_itls = []
combined_tpots = []
combined_errors = init_errors_map()
for latency, ttft, errors in results:
for latency, ttft, itl, errors in results:
if latency:
combined_latencies.append(latency)
if errors:
for err, count in errors.items():
combined_errors[err] = combined_errors[err] + count
if ttft:
combined_ttfts.append(ttft)
_, output_len, request_latency = latency
combined_tpots.append((request_latency - ttft) / (output_len - 1))
if itl:
combined_itls.extend(itl)

benchmark_duration = time.time() - benchmark_start_time
print_and_save_result(args, benchmark_duration, prompts_sent, model, combined_latencies, combined_ttfts, combined_errors)
return combined_latencies, combined_ttfts, combined_errors
print_and_save_result(args, benchmark_duration, prompts_sent, model, combined_latencies, combined_ttfts, combined_itls, combined_tpots, combined_errors)
return combined_latencies, combined_ttfts, combined_itls, combined_tpots, combined_errors

def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics, model, errors):
# Setup
Expand Down Expand Up @@ -671,7 +682,7 @@ def get_stats_for_set(name, description, points):
f'p99_{name}': p99,
}

def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, ttfts, errors):
def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, ttfts, itls, tpots, errors):
benchmark_result = {}

print(f"====Result for Model: {model}====")
Expand Down Expand Up @@ -707,8 +718,12 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
benchmark_result['total_tokens'] = int(total_tokens)
benchmark_result['tokens_per_min'] = tokens_per_min
ttft_stats = {}
itls_stats = {}
tpot_stats = {}
if args.stream_request:
ttft_stats = get_stats_for_set("TTFT", "Time to First Token (s)", ttfts)
itls_stats = get_stats_for_set("ITL", "Inter-Token Latency (s)", itls)
tpot_stats = get_stats_for_set("TPOT", "Time Per Output Token (s)", tpots)
if args.machine_cost:
print(
"Cost $/1k tokens:"
Expand All @@ -722,6 +737,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
for prompt_len, output_len, latency in request_latencies
])),
**ttft_stats,
**itls_stats,
# NOTE: The latency below includes requests awaiting time on server side.
# It's not comparable with the model inference latency for batch size 1.
**(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in request_latencies])),
Expand Down Expand Up @@ -779,6 +795,8 @@ async def main(args: argparse.Namespace):
# Summarize results
combined_latencies = []
combined_ttfts = []
combined_itls = []
combined_tpots = []
combined_errors = {
"ClientConnectorError": 0,
"TimeoutError": 0,
Expand All @@ -787,15 +805,17 @@ async def main(args: argparse.Namespace):
"unknown_error": 0,
"ServerDisconnectedError": 0,
}
for latencies, ttfts, errors in results:
for latencies, ttfts, itls, tpots, errors in results:
combined_latencies.extend(latencies)
combined_ttfts.extend(ttfts)
combined_itls.extend(itls)
combined_tpots.extend(tpots)
for k, v in errors.items():
combined_errors[k] = combined_errors[k] + v

benchmark_duration_all_models = time.time() - benchmark_start_time
if args.save_aggregated_result:
print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_ttfts, combined_errors)
print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_ttfts, combined_itls, combined_tpots, combined_errors)

if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand Down