Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance cache service metrics #13

Merged
merged 11 commits into from
Dec 23, 2024
56 changes: 54 additions & 2 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(
)
self.log_stats = log_stats
self.step_return_finished_only = step_return_finished_only
self.cache_service_metrics = CacheServiceMetrics
self.cache_service_metrics = CacheServiceMetrics()

if not self.model_config.skip_tokenizer_init:
self.tokenizer = self._init_tokenizer()
Expand Down Expand Up @@ -1815,7 +1815,14 @@ def _get_stats(self,
best_of_requests: List[int] = []
n_requests: List[int] = []
finished_reason_requests: List[str] = []


# Cache Service Metrics
cache_service_tokens_hit_rate: float
cache_service_blocks_hit_rate: float
cache_service_time_async_update_queue: List[int] = []
cache_service_time_async_update_exec: List[int] = []
cache_service_counter_async_update_updated: List[int] = []

# NOTE: This loop assumes prefill seq_groups are before
# decode seq_groups in scheduled_seq_groups.
if scheduler_outputs is not None:
Expand Down Expand Up @@ -1903,6 +1910,32 @@ def _get_stats(self,
spec_decode_metrics = model_output[0].spec_decode_worker_metrics
else:
spec_decode_metrics = None

if self.cache_service_metrics is not None:
cache_service_hit_tokens = self.cache_service_metrics.hit_tokens
cache_service_total_tokens = self.cache_service_metrics.total_tokens
cache_service_hit_blocks = self.cache_service_metrics.hit_blocks
cache_service_total_blocks = self.cache_service_metrics.total_blocks
cache_service_tokens_hit_rate = self.cache_service_metrics.get_tokens_hit_rate()
cache_service_blocks_hit_rate = self.cache_service_metrics.get_blocks_hit_rate()
cache_service_err_query = self.cache_service_metrics.err_query
cache_service_err_async_update_task_queue_full = self.cache_service_metrics.err_async_update_task_queue_full
cache_service_err_update = self.cache_service_metrics.err_update

cache_service_time_query = self.cache_service_metrics.time_query
cache_service_time_load = self.cache_service_metrics.time_load
cache_service_time_reshape = self.cache_service_metrics.time_reshape
cache_service_time_unload = self.cache_service_metrics.time_unload
cache_service_time_update = self.cache_service_metrics.time_update
cache_service_time_async_update_queue, cache_service_time_async_update_exec, cache_service_counter_async_update_updated = self.cache_service_metrics.get_async_metrics()

self.cache_service_metrics.time_query = []
self.cache_service_metrics.time_load = []
self.cache_service_metrics.time_reshape = []
self.cache_service_metrics.time_unload = []
self.cache_service_metrics.time_update = []
self.cache_service_metrics.reset_async_metrics()


return Stats(
now=now,
Expand Down Expand Up @@ -1935,6 +1968,25 @@ def _get_stats(self,
best_of_requests=best_of_requests,
n_requests=n_requests,
finished_reason_requests=finished_reason_requests,

# Cache Service
cache_service_hit_tokens = cache_service_hit_tokens,
cache_service_total_tokens = cache_service_total_tokens,
cache_service_hit_blocks = cache_service_hit_blocks,
cache_service_total_blocks = cache_service_total_blocks,
cache_service_tokens_hit_rate = cache_service_tokens_hit_rate,
cache_service_blocks_hit_rate = cache_service_blocks_hit_rate,
cache_service_err_query = cache_service_err_query,
cache_service_err_async_update_task_queue_full = cache_service_err_async_update_task_queue_full,
cache_service_err_update = cache_service_err_update,
cache_service_time_query = cache_service_time_query,
cache_service_time_load = cache_service_time_load,
cache_service_time_reshape = cache_service_time_reshape,
cache_service_time_unload = cache_service_time_unload,
cache_service_time_update = cache_service_time_update,
cache_service_time_async_update_queue = cache_service_time_async_update_queue,
cache_service_time_async_update_exec = cache_service_time_async_update_exec,
cache_service_counter_async_update_updated = cache_service_counter_async_update_updated,
)

def add_lora(self, lora_request: LoRARequest) -> bool:
Expand Down
172 changes: 166 additions & 6 deletions vllm/engine/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,132 @@ def __init__(self, labelnames: List[str], max_model_len: int):
labelnames=labelnames,
multiprocess_mode="sum",
)

self.gauge_cache_service_tokens_hit_rate = self._gauge_cls(
name="vllm:cache_service_tokens_hit_rate",
documentation="External cache service tokens hit rate.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_blocks_hit_rate = self._gauge_cls(
name="vllm:cache_service_blocks_hit_rate",
documentation="External cache service blocks hit rate.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_hit_tokens = self._gauge_cls(
name="vllm:cache_service_hit_tokens",
documentation="External cache service hit tokens.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_total_tokens = self._gauge_cls(
name="vllm:cache_service_total_tokens",
documentation="External cache service total tokens.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_hit_blocks = self._gauge_cls(
name="vllm:cache_service_hit_blocks",
documentation="External cache service hit blocks.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_total_blocks = self._gauge_cls(
name="vllm:cache_service_total_blocks",
documentation="External cache service total blocks.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_query = self._gauge_cls(
name="vllm:cache_service_err_query",
documentation="External cache service query errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_async_update_task_queue_full = self._gauge_cls(
name="vllm:cache_service_err_async_update_task_queue_full",
documentation="External cache service async update task queue full errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.gauge_cache_service_err_update = self._gauge_cls(
name="vllm:cache_service_err_update",
documentation="External cache service update errors.",
labelnames=labelnames,
multiprocess_mode="all")

self.histogram_cache_service_time_query_seconds = self._histogram_cls(
name="vllm:cache_service_time_query_seconds",
documentation="Histogram of cache service time query in seconds.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_load_seconds = self._histogram_cls(
name="vllm:cache_service_time_load_seconds",
documentation="Histogram of cache service time load.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_reshape_seconds = self._histogram_cls(
name="vllm:cache_service_time_reshape_seconds",
documentation="Histogram of cache service time update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_unload_seconds = self._histogram_cls(
name="vllm:cache_service_time_unload_seconds",
documentation="Histogram of cache service time unload.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_update_seconds = self._histogram_cls(
name="vllm:cache_service_time_update_seconds",
documentation="Histogram of cache service time update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_async_update_queue_seconds = self._histogram_cls(
name="vllm:cache_service_time_async_update_queue_seconds",
documentation="Histogram of cache service async update time in queue.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_time_async_update_exec_seconds = self._histogram_cls(
name="vllm:cache_service_time_async_update_exec_seconds",
documentation="Histogram of cache service async update time in execution.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])

self.histogram_cache_service_counter_async_update_updated_seconds = self._histogram_cls(
name="vllm:cache_service_counter_async_update_updated_seconds",
documentation="Histogram of cache service async update time in update.",
labelnames=labelnames,
buckets=[
0.01, 0.025, 0.05, 0.075, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75,
1.0, 2.5
])


# end-metrics-definitions
Expand Down Expand Up @@ -375,12 +501,10 @@ def log(self, stats: Stats) -> None:
self._format_spec_decode_metrics_str(
self.spec_decode_metrics))

if self.external_cache_service_metrics is not None:
logger.info(
"Cache service hit rate: by tokens: %.2f%%, by blocks: %.2f%%",
0 if self.external_cache_service_metrics.total_tokens == 0 else self.external_cache_service_metrics.hit_tokens/self.external_cache_service_metrics.total_tokens * 100,
0 if self.external_cache_service_metrics.total_blocks == 0 else self.external_cache_service_metrics.hit_blocks/self.external_cache_service_metrics.total_blocks * 100,
)
logger.info(
"Cache service hit rate: by tokens: %.2f%%, by blocks: %.2f%%",
stats.cache_service_tokens_hit_rate, stats.cache_service_blocks_hit_rate
)
# Reset tracked stats for next interval.
self.num_prompt_tokens = []
self.num_generation_tokens = []
Expand Down Expand Up @@ -482,6 +606,42 @@ def _log_prometheus(self, stats: Stats) -> None:
self._log_histogram(self.metrics.histogram_n_request, stats.n_requests)
self._log_histogram(self.metrics.histogram_best_of_request,
stats.best_of_requests)

# Cache Service
self._log_gauge(self.metrics.gauge_cache_service_hit_tokens,
stats.cache_service_hit_tokens)
self._log_gauge(self.metrics.gauge_cache_service_total_tokens,
stats.cache_service_total_tokens)
self._log_gauge(self.metrics.gauge_cache_service_hit_blocks,
stats.cache_service_hit_blocks)
self._log_gauge(self.metrics.gauge_cache_service_total_blocks,
stats.cache_service_total_blocks)
self._log_gauge(self.metrics.gauge_cache_service_tokens_hit_rate,
stats.cache_service_tokens_hit_rate)
self._log_gauge(self.metrics.gauge_cache_service_blocks_hit_rate,
stats.cache_service_blocks_hit_rate)
self._log_gauge(self.metrics.gauge_cache_service_err_query,
stats.cache_service_err_query)
self._log_gauge(self.metrics.gauge_cache_service_err_async_update_task_queue_full,
stats.cache_service_err_async_update_task_queue_full)
self._log_gauge(self.metrics.gauge_cache_service_err_update,
stats.cache_service_err_update)
self._log_histogram(self.metrics.histogram_cache_service_time_query_seconds,
stats.cache_service_time_query)
self._log_histogram(self.metrics.histogram_cache_service_time_load_seconds,
stats.cache_service_time_load)
self._log_histogram(self.metrics.histogram_cache_service_time_reshape_seconds,
stats.cache_service_time_reshape)
self._log_histogram(self.metrics.histogram_cache_service_time_unload_seconds,
stats.cache_service_time_unload)
self._log_histogram(self.metrics.histogram_cache_service_time_update_seconds,
stats.cache_service_time_update)
self._log_histogram(self.metrics.histogram_cache_service_time_async_update_queue_seconds,
stats.cache_service_time_async_update_queue)
self._log_histogram(self.metrics.histogram_cache_service_time_async_update_exec_seconds,
stats.cache_service_time_async_update_exec)
self._log_histogram(self.metrics.histogram_cache_service_counter_async_update_updated_seconds,
stats.cache_service_counter_async_update_updated)

def _log_prometheus_interval(self, prompt_throughput: float,
generation_throughput: float) -> None:
Expand Down
23 changes: 21 additions & 2 deletions vllm/engine/metrics_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,27 @@ class Stats:
n_requests: List[int]
finished_reason_requests: List[str]

spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None
# Cache Service
cache_service_hit_tokens: int
cache_service_total_tokens: int
cache_service_hit_blocks: int
cache_service_total_blocks: int
cache_service_tokens_hit_rate: float
cache_service_blocks_hit_rate: float
cache_service_err_query: int
cache_service_err_async_update_task_queue_full: int
cache_service_err_update: int
cache_service_time_query: List[float]
cache_service_time_load: List[float]
cache_service_time_reshape: List[float]
cache_service_time_unload: List[float]
cache_service_time_update: List[float]
cache_service_time_async_update_queue: List[float]
cache_service_time_async_update_exec: List[float]
cache_service_counter_async_update_updated: List[float]


spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None


class SupportsMetricsInfo(Protocol):
Expand All @@ -71,7 +91,6 @@ def __init__(self, local_interval: float) -> None:
self.num_generation_tokens: List[int] = []
self.last_local_log = time.time()
self.local_interval = local_interval
self.external_cache_service_metrics = CacheServiceMetrics
self.spec_decode_metrics: Optional["SpecDecodeWorkerMetrics"] = None

@abstractmethod
Expand Down
Loading