Skip to content

Commit

Permalink
Update python SDK cache support and exception handling (#2979)
Browse files Browse the repository at this point in the history
  • Loading branch information
IridiumOxide authored Nov 29, 2023
1 parent 6901733 commit ba4f26a
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 29 deletions.
12 changes: 3 additions & 9 deletions sdks/aperture-py/aperture_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def start_flow(

span = self.tracer.start_span("Aperture Check", attributes=span_attributes)
stub = FlowControlServiceStub(self.grpc_channel)
error: Optional[Exception] = None
try:
# stub.Check is typed to accept an int, but it actually accepts a float
timeout = typing.cast(int, params.check_timeout.total_seconds())
Expand All @@ -168,6 +169,7 @@ def start_flow(
except grpc.RpcError as e:
self.logger.debug(f"Aperture gRPC call failed: {e.details()}")
response = None
error = e
span.set_attribute(workload_start_timestamp_label, time.monotonic_ns())
return Flow(
fcs_stub=stub,
Expand All @@ -176,6 +178,7 @@ def start_flow(
check_response=response,
ramp_mode=params.ramp_mode,
cache_key=params.result_cache_key,
error=error,
)

def decorate(
Expand All @@ -193,19 +196,10 @@ async def wrapper(*args, **kwargs):
else:
if on_reject:
return on_reject()
raise RejectedFlowException("Flow was rejected")

return wrapper

return decorator

def close(self):
self.otlp_exporter.shutdown()


class ApertureException(Exception):
pass


class RejectedFlowException(ApertureException):
pass
146 changes: 126 additions & 20 deletions sdks/aperture-py/aperture_sdk/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from contextlib import AbstractContextManager
from typing import Optional, TypeVar

import grpc
from aperture_sdk._gen.aperture.flowcontrol.check.v1 import check_pb2
from aperture_sdk._gen.aperture.flowcontrol.check.v1.check_pb2 import (
MISS,
CacheDeleteRequest,
CacheDeleteResponse,
CacheUpsertResponse,
CacheEntry,
CacheUpsertRequest,
)
from aperture_sdk._gen.aperture.flowcontrol.check.v1.check_pb2_grpc import (
FlowControlServiceStub,
Expand All @@ -21,6 +24,7 @@
flow_status_label,
)
from google.protobuf import json_format
from google.protobuf.duration_pb2 import Duration
from opentelemetry import trace


Expand All @@ -47,6 +51,7 @@ def __init__(
check_response: Optional[check_pb2.CheckResponse],
ramp_mode: bool,
cache_key: Optional[str],
error: Optional[Exception],
):
self._fcs_stub = fcs_stub
self._control_point = control_point
Expand All @@ -56,6 +61,7 @@ def __init__(
self._status_code = FlowStatus.OK
self._ended = False
self._ramp_mode = ramp_mode
self._error = error
self.logger = logging.getLogger("aperture-py-sdk-flow")

def should_run(self) -> bool:
Expand Down Expand Up @@ -105,45 +111,64 @@ def end(self) -> None:
)
self._span.end()

def set_result_cache(self, value: str, ttl: datetime.timedelta):
def error(self) -> Optional[Exception]:
return self._error

def set_result_cache(
self, value: str, ttl: datetime.timedelta, **grpc_opts
) -> KeyUpsertResponse:
if not self._cache_key:
return KeyUpsertResponse(ValueError("No cache key"))

cache_upsert_request = {
"controlPoint": self._control_point,
"key": self._cache_key,
"value": value,
"ttl": ttl,
}
cache_upsert_request = CacheUpsertRequest(
control_point=self._control_point,
result_cache_entry=CacheEntry(
key=self._cache_key,
value=bytes(value, "utf-8"),
ttl=Duration().FromTimedelta(ttl),
),
)

res: CacheUpsertResponse = self._fcs_stub.CacheUpsert(cache_upsert_request)
try:
res = self._fcs_stub.CacheUpsert(cache_upsert_request, **grpc_opts)
except grpc.RpcError as e:
self.logger.debug(f"Aperture gRPC call failed: {e.details()}")
return KeyUpsertResponse(e)

if res.result_cache_response == None:
if res.result_cache_response is None:
return KeyUpsertResponse(ValueError("No cache upsert response"))

return KeyUpsertResponse(
convert_cache_error(res.result_cache_response.error),
)

async def delete_result_cache(self):
def delete_result_cache(self, **grpc_opts) -> KeyDeleteResponse:
if not self._cache_key:
raise ValueError("No cache key")
return KeyDeleteResponse(ValueError("No cache key"))

cache_delete_request = {
"controlPoint": self._control_point,
"key": self._cache_key,
}
cache_delete_request = CacheDeleteRequest(
control_point=self._control_point,
result_cache_key=self._cache_key,
)

res: CacheDeleteResponse = self._fcs_stub.CacheDelete(cache_delete_request)
try:
res: CacheDeleteResponse = self._fcs_stub.CacheDelete(
cache_delete_request, **grpc_opts
)
except grpc.RpcError as e:
self.logger.debug(f"Aperture gRPC call failed: {e.details()}")
return KeyDeleteResponse(e)

if res.result_cache_response == None:
if res.result_cache_response is None:
return KeyDeleteResponse(ValueError("No cache delete response"))

return KeyDeleteResponse(
convert_cache_error(res.result_cache_response.error),
)

def result_cache(self):
def result_cache(self) -> KeyLookupResponse:
if self._error is not None:
return KeyLookupResponse(None, MISS, self._error)
if (
not self.check_response
or not self.check_response.cache_lookup_response
Expand All @@ -157,7 +182,88 @@ def result_cache(self):
return KeyLookupResponse(
lookup_response.value,
convert_cache_lookup_status(lookup_response.lookup_status),
None,
convert_cache_error(lookup_response.error),
)

def set_global_cache(
self, key: str, value: str, ttl: datetime.timedelta, **grpc_opts
) -> KeyUpsertResponse:
cache_upsert_request = CacheUpsertRequest(
global_cache_entries={
key: CacheEntry(
value=bytes(value, "utf-8"),
ttl=Duration().FromTimedelta(ttl),
),
},
)

try:
res = self._fcs_stub.CacheUpsert(cache_upsert_request, **grpc_opts)
except grpc.RpcError as e:
self.logger.debug(f"Aperture gRPC call failed: {e.details()}")
return KeyUpsertResponse(e)

responses = res.global_cache_responses
if responses is None:
return KeyUpsertResponse(ValueError("No cache upsert response"))
if key not in responses:
return KeyUpsertResponse(
ValueError("Key missing from global cache response")
)

return KeyUpsertResponse(
convert_cache_error(responses[key].error),
)

def delete_global_cache(self, key: str, **grpc_opts) -> KeyDeleteResponse:
cache_delete_request = CacheDeleteRequest(
global_cache_keys=[key],
)

try:
res: CacheDeleteResponse = self._fcs_stub.CacheDelete(
cache_delete_request, **grpc_opts
)
except grpc.RpcError as e:
self.logger.debug(f"Aperture gRPC call failed: {e.details()}")
return KeyDeleteResponse(e)

delete_responses = res.global_cache_responses

if delete_responses is None:
return KeyDeleteResponse(ValueError("No cache delete response"))
if key not in delete_responses:
return KeyDeleteResponse(
ValueError("Key missing from global cache response")
)

return KeyDeleteResponse(
convert_cache_error(delete_responses[key].error),
)

def global_cache(self, key: str) -> KeyLookupResponse:
if self._error is not None:
return KeyLookupResponse(None, MISS, self._error)
if (
not self.check_response
or not self.check_response.cache_lookup_response
or not self.check_response.cache_lookup_response.global_cache_responses
):
return KeyLookupResponse(
None, MISS, ValueError("No global cache lookup response")
)

lookup_response_map = (
self.check_response.cache_lookup_response.global_cache_responses
)
if key not in lookup_response_map:
return KeyLookupResponse(None, MISS, ValueError("Unknown global cache key"))

lookup_response = lookup_response_map[key]
return KeyLookupResponse(
lookup_response.value,
convert_cache_lookup_status(lookup_response.lookup_status),
convert_cache_error(lookup_response.error),
)

def __enter__(self: TFlow) -> TFlow:
Expand Down

0 comments on commit ba4f26a

Please sign in to comment.