Skip to content

Commit

Permalink
docs(source-exact): add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
joelluijmes committed Feb 13, 2024
1 parent 7bd71c0 commit 45819df
Showing 1 changed file with 53 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,34 @@


class ExactStream(HttpStream, IncrementalMixin):
"""
Base stream to sync endpoints from Exact, build upon HttpStream.
It supports both full refresh and incremental sync. The cursor field is either `Timestamp` or `Modified` depending
on the endpoint (see `ExactSyncStream` and `ExactOtherStream`). The cursor field is used to get changes since the
last sync.
A `division` is a separate administration within the Exact Online environment. The API requires to specify the
division in the URL. The stream supports syncing multiple divisions.
For each division, the state is stored separately. The state is a dictionary containing the cursor value for the
last sync. The cursor value is the largest value of the cursor field seen so far.
Exact enforces strict rate limits. The rate limit is 60 requests per minute. The rate limit is enforced by the API
and is not configurable. The stream will automatically wait for 1 minute if the rate limit is exceeded.
In addition, Exact enforces single use refresh tokens. The stream will automatically refresh the access token when
it is expired.
"""

def __init__(self, config: Mapping[str, Any]):
self._divisions = config["divisions"]
self._base_url = config["base_url"]

self._active_division = self._divisions[0]
"""The current division being synced."""

# State per division, simple dictionary with the format {division: {cursor_field: cursor_value}}
# Note: implemented state properties return the full state (i.e., all divisions)
self._state_per_division = {}
for division in self._divisions:
self._state_per_division[str(division)] = {}
Expand Down Expand Up @@ -124,26 +147,33 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
return [self._parse_item(x) for x in results]

def read_records(self, sync_mode: SyncMode, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[StreamData]:
"""Overridden to change the url_base based on the current division, and to keep track of the cursor."""
"""Implements the actual syncing of a division of the current stream."""
# This function is called per division (each division as returned by `stream_slices`), from `read_full_refresh`
# or `read_incremental` (at the base `Stream`).
#
# It is overridden to update the current division, as this needs to be updated in the `url_base` property.
# It also keeps track of the cursor value for the current division.

division = str(stream_slice["division"])
self._url_base = f"{self._base_url}/api/v1/{division}/"
self._active_division = division

self.logger.info(f"Syncing division {division}...")

# Reset state for full refresh
# Reset state if full refresh
if sync_mode == SyncMode.full_refresh:
self._state_per_division[division] = {}

# Perform the actual sync, and update the latest cursor value
division_state = self._state_per_division[division]
for record in super().read_records(sync_mode=sync_mode, stream_slice=stream_slice, **kwargs):
# Track the largest cursor value
if self.cursor_field and sync_mode == SyncMode.incremental:
cursor_value = record[self.cursor_field]
current_cursor_value = self._state_per_division[division].get(self.cursor_field)
current_cursor_value = cursor_value if not current_cursor_value else current_cursor_value
current_value = division_state.get(self.cursor_field)
updated_value = record[self.cursor_field]

if current_cursor_value:
self._state_per_division[division].update({self.cursor_field: max(cursor_value, current_cursor_value)})
if current_value is None:
division_state[self.cursor_field] = updated_value
else:
division_state[self.cursor_field] = max(current_value, updated_value)

yield record

Expand All @@ -154,7 +184,8 @@ def test_access(self) -> bool:
prepared_request = self._create_prepared_request(
path=self.endpoint,
headers=self._single_refresh_token_authenticator.get_auth_header(),
params={"$top": 0}, # Just want to test if we can access the API, don't care about the results
# Just want to test if we can access the API, don't care about any results. With $top=0 we get no results.
params={"$top": 0},
)

response = self._send_request(prepared_request, {})
Expand All @@ -164,22 +195,23 @@ def test_access(self) -> bool:
return False

response.raise_for_status()

return True
except requests.RequestException:
return False

def _is_token_expired(self, response: requests.Response):
if response.status_code == 401:
error_reason = response.headers.get("WWW-Authenticate", "")
error_reason = unquote(error_reason)
"""Checks if the response is a 401 error because the token is expired."""

if "message expired" in error_reason or "access token expired" in error_reason:
return True
if response.status_code != 401:
return False

error_reason = response.headers.get("WWW-Authenticate", "")
error_reason = unquote(error_reason)

raise RuntimeError(f"Unexpected forbidden error: {error_reason}")
if "message expired" in error_reason or "access token expired" in error_reason:
return True

return False
raise RuntimeError(f"Unexpected forbidden error: {error_reason}")

def _get_param_filter(self, cursor_value: str):
"""Returns the $filter clause for the cursor field."""
Expand Down Expand Up @@ -210,16 +242,16 @@ def _get_param_filter(self, cursor_value: str):

def _parse_item(self, obj: dict):
"""
Parses single result item:
- OData dates (/Date(1672531200000)/) are parsed to iso formatted timestamps 2022-12-12T12:00:00
- int, float and booleans are casted based on the JSON Schema type field
Parses response from Exact. It converts the OData date format (e.g., `/Date(1672531200000)/`) to an ISO formatted
timestamp and casts the values to the expected type based on the JSON Schema (`int`, `float` and `bool`).
"""

# Get the first not null type -> i.e., the expected type of the property
property_type_lookup = {k: next(x for x in v["type"] if x != "null") for k, v in self.get_json_schema()["properties"].items()}

regex_timestamp = re.compile(r"^\/Date\((\d+)\)\/$")

# Recursively parse the value
def parse_value(key, value):
if isinstance(value, dict):
return {k: parse_value(k, v) for k, v in value.items()}
Expand Down

0 comments on commit 45819df

Please sign in to comment.