Skip to content

Commit

Permalink
remove custom datapoints in tz retrieve fn
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt committed Feb 20, 2025
1 parent 4a3c0d1 commit 2eec07b
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 1,516 deletions.
2 changes: 1 addition & 1 deletion cognite/client/_api/datapoint_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TypeVar,
cast,
)
from zoneinfo import ZoneInfo

from google.protobuf.internal.containers import RepeatedCompositeFieldContainer

Expand All @@ -41,7 +42,6 @@
from cognite.client.utils._auxiliary import exactly_one_is_not_none, is_finite, is_unlimited
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_snake_case
from cognite.client.utils._time import (
ZoneInfo,
align_start_and_end_for_granularity,
convert_timezone_to_str,
granularity_to_ms,
Expand Down
147 changes: 1 addition & 146 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import itertools
import math
import time
import warnings
from abc import ABC, abstractmethod
from collections import Counter, defaultdict
from collections.abc import Callable, Iterable, Iterator, MutableSequence, Sequence
Expand All @@ -22,6 +21,7 @@
cast,
overload,
)
from zoneinfo import ZoneInfo

from typing_extensions import Self

Expand Down Expand Up @@ -59,13 +59,7 @@
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._importing import import_as_completed, local_import
from cognite.client.utils._time import (
ZoneInfo,
align_large_granularity,
pandas_date_range_tz,
timestamp_to_ms,
to_fixed_utc_intervals,
to_pandas_freq,
validate_timezone,
)
from cognite.client.utils._validation import validate_user_input_dict_with_identifier
from cognite.client.utils.useful_types import SequenceNotStr
Expand Down Expand Up @@ -1233,145 +1227,6 @@ def retrieve_dataframe(
freq = cast(str, granularity).replace("m", "min")
return df.reindex(pd.date_range(start=start, end=end, freq=freq, inclusive="left"))

# TODO: Deprecated, don't add support for new features like instance_id
def retrieve_dataframe_in_tz(
self,
*,
id: int | Sequence[int] | None = None,
external_id: str | SequenceNotStr[str] | None = None,
start: datetime.datetime,
end: datetime.datetime,
aggregates: Aggregate | str | list[Aggregate | str] | None = None,
granularity: str | None = None,
target_unit: str | None = None,
target_unit_system: str | None = None,
ignore_unknown_ids: bool = False,
include_status: bool = False,
ignore_bad_datapoints: bool = True,
treat_uncertain_as_bad: bool = True,
uniform_index: bool = False,
include_aggregate_name: bool = True,
include_granularity_name: bool = False,
column_names: Literal["id", "external_id"] = "external_id",
) -> pd.DataFrame:
"""Get datapoints directly in a pandas dataframe in the same timezone as ``start`` and ``end``.
.. admonition:: Deprecation Warning
This SDK function is deprecated and will be removed in the next major release. Reason: Cognite Data
Fusion now has native support for timezone and calendar-based aggregations. Please consider migrating
already today: The API also supports fixed offsets, yields more accurate results and have better support
for exotic timezones and unusual DST offsets. You can use the normal retrieve methods instead, just
pass 'timezone' as a parameter.
Args:
id (int | Sequence[int] | None): ID or list of IDs.
external_id (str | SequenceNotStr[str] | None): External ID or list of External IDs.
start (datetime.datetime): Inclusive start, must be timezone aware.
end (datetime.datetime): Exclusive end, must be timezone aware and have the same timezone as start.
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``, ``count_uncertain``, ``discrete_variance``, ``duration_bad``, ``duration_good``, ``duration_uncertain``, ``interpolation``, ``max``, ``min``, ``step_interpolation``, ``sum`` and ``total_variation``. Default: None (raw datapoints returned)
granularity (str | None): The granularity to fetch aggregates at. Can be given as an abbreviation or spelled out for clarity: ``s/second(s)``, ``m/minute(s)``, ``h/hour(s)``, ``d/day(s)``, ``w/week(s)``, ``mo/month(s)``, ``q/quarter(s)``, or ``y/year(s)``. Examples: ``30s``, ``5m``, ``1day``, ``2weeks``. Default: None.
target_unit (str | None): The unit_external_id of the datapoints returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None): The unit system of the datapoints returned. Cannot be used with target_unit.
ignore_unknown_ids (bool): Whether to ignore missing time series rather than raising an exception. Default: False
include_status (bool): Also return the status code, an integer, for each datapoint in the response. Only relevant for raw datapoint queries, not aggregates.
ignore_bad_datapoints (bool): Treat datapoints with a bad status code as if they do not exist. If set to false, raw queries will include bad datapoints in the response, and aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. Default: True.
treat_uncertain_as_bad (bool): Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Used for both raw queries and aggregates. Default: True.
uniform_index (bool): If querying aggregates with a non-calendar granularity, specifying ``uniform_index=True`` will return a dataframe with an index with constant spacing between timestamps decided by granularity all the way from `start` to `end` (missing values will be NaNs). Default: False
include_aggregate_name (bool): Include 'aggregate' in the column name, e.g. `my-ts|average`. Ignored for raw time series. Default: True
include_granularity_name (bool): Include 'granularity' in the column name, e.g. `my-ts|12h`. Added after 'aggregate' when present. Ignored for raw time series. Default: False
column_names (Literal['id', 'external_id']): Use either ids or external ids as column names. Time series missing external id will use id as backup. Default: "external_id"
Returns:
pd.DataFrame: A pandas DataFrame containing the requested time series with a DatetimeIndex localized in the given timezone.
"""
warnings.warn(
(
"This SDK method, `retrieve_dataframe_in_tz`, is deprecated and will be removed in the next major release. "
"Reason: Cognite Data Fusion now has native support for timezone and calendar-based aggregations. Please "
"consider migrating already today: The API also supports fixed offsets, yields more accurate results and "
"have better support for exotic timezones and unusual DST offsets. You can use the normal retrieve methods "
"instead, just pass 'timezone' as a parameter."
),
UserWarning,
)
_, pd = local_import("numpy", "pandas") # Verify that deps are available or raise CogniteImportError

if not exactly_one_is_not_none(id, external_id):
raise ValueError("Either input id(s) or external_id(s)")

if exactly_one_is_not_none(aggregates, granularity):
raise ValueError(
"Got only one of 'aggregates' and 'granularity'. "
"Pass both to get aggregates, or neither to get raw data"
)
tz = validate_timezone(start, end)
if aggregates is None and granularity is None:
# For raw data, we only need to convert the timezone:
return (
# TODO: include_outside_points is missing
self.retrieve_dataframe(
id=id,
external_id=external_id,
start=start,
end=end,
aggregates=aggregates,
granularity=granularity,
target_unit=target_unit,
target_unit_system=target_unit_system,
ignore_unknown_ids=ignore_unknown_ids,
include_status=include_status,
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
uniform_index=uniform_index,
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
column_names=column_names,
limit=None,
)
.tz_localize("utc")
.tz_convert(str(tz))
)
assert isinstance(granularity, str) # mypy

identifiers = IdentifierSequence.load(id, external_id)
if not identifiers.are_unique():
duplicated = find_duplicates(identifiers.as_primitives())
raise ValueError(f"The following identifiers were not unique: {duplicated}")

intervals = to_fixed_utc_intervals(start, end, granularity)
queries = [
{**ident_dct, "aggregates": aggregates, **interval}
for ident_dct, interval in itertools.product(identifiers.as_dicts(), intervals)
]
arrays = self.retrieve_arrays(
limit=None,
ignore_unknown_ids=ignore_unknown_ids,
include_status=include_status,
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
target_unit=target_unit,
target_unit_system=target_unit_system,
**{identifiers[0].name(): queries}, # type: ignore [arg-type]
)
assert isinstance(arrays, DatapointsArrayList) # mypy

arrays.concat_duplicate_ids()
for arr in arrays:
# In case 'include_granularity_name' is used, we don't want '2quarters' to show up as '4343h':
arr.granularity = granularity
df = (
arrays.to_pandas(column_names, include_aggregate_name, include_granularity_name)
.tz_localize("utc")
.tz_convert(str(tz))
)
if uniform_index:
freq = to_pandas_freq(granularity, start)
# TODO: Bug, "small" granularities like s/m/h raise here:
start, end = align_large_granularity(start, end, granularity)
return df.reindex(pandas_date_range_tz(start, end, freq, inclusive="left"))
return df

def retrieve_latest(
self,
id: int | LatestDatapointQuery | list[int | LatestDatapointQuery] | None = None,
Expand Down
4 changes: 2 additions & 2 deletions cognite/client/data_classes/data_modeling/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from cognite.client.utils._identifier import InstanceId
from cognite.client.utils._importing import local_import
from cognite.client.utils._text import convert_all_keys_to_snake_case, to_camel_case
from cognite.client.utils._time import convert_data_modelling_timestamp
from cognite.client.utils._time import convert_data_modeling_timestamp
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
Expand Down Expand Up @@ -1697,7 +1697,7 @@ def _deserialize_value(value: Any, parameter: inspect.Parameter) -> Any:
return value
annotation = str(parameter.annotation)
if "datetime" in annotation and isinstance(value, str):
return convert_data_modelling_timestamp(value)
return convert_data_modeling_timestamp(value)
elif "date" in annotation and isinstance(value, str):
return date.fromisoformat(value)
elif isinstance(value, dict):
Expand Down
41 changes: 1 addition & 40 deletions cognite/client/data_classes/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TypedDict,
overload,
)
from zoneinfo import ZoneInfo

from typing_extensions import NotRequired, Self

Expand All @@ -41,7 +42,6 @@
to_snake_case,
)
from cognite.client.utils._time import (
ZoneInfo,
convert_and_isoformat_timestamp,
convert_timezone_to_str,
parse_str_timezone,
Expand Down Expand Up @@ -1270,45 +1270,6 @@ def __init__(self, resources: Collection[Any], cognite_client: CogniteClient | N
self._external_id_to_item.update(xid_dct)
self._instance_id_to_item.update(inst_id_dct)

def concat_duplicate_ids(self) -> None:
"""
Concatenates all arrays with duplicated IDs.
Arrays with the same ids are stacked in chronological order.
**Caveat** This method is not guaranteed to preserve the order of the list.
"""
# Rebuilt list instead of removing duplicated one at a time at the cost of O(n).
self.data.clear()

# This implementation takes advantage of the ordering of the duplicated in the __init__ method
has_external_ids = set()
for ext_id, items in self._external_id_to_item.items():
if not isinstance(items, list):
self.data.append(items)
if items.id is not None:
has_external_ids.add(items.id)
continue
concatenated = DatapointsArray.create_from_arrays(*items)
self._external_id_to_item[ext_id] = concatenated
if concatenated.id is not None:
has_external_ids.add(concatenated.id)
self._id_to_item[concatenated.id] = concatenated
self.data.append(concatenated)

if not (only_ids := set(self._id_to_item) - has_external_ids):
return

for id_, items in self._id_to_item.items():
if id_ not in only_ids:
continue
if not isinstance(items, list):
self.data.append(items)
continue
concatenated = DatapointsArray.create_from_arrays(*items)
self._id_to_item[id_] = concatenated
self.data.append(concatenated)

def get( # type: ignore [override]
self,
id: int | None = None,
Expand Down
5 changes: 3 additions & 2 deletions cognite/client/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from zoneinfo import ZoneInfo

from cognite.client.utils._time import (
MAX_TIMESTAMP_MS,
MIN_TIMESTAMP_MS,
ZoneInfo,
datetime_to_ms,
datetime_to_ms_iso_timestamp,
ms_to_datetime,
Expand All @@ -13,7 +14,7 @@
__all__ = [
"MAX_TIMESTAMP_MS",
"MIN_TIMESTAMP_MS",
"ZoneInfo",
"ZoneInfo", # for backwards compat. (when we supported >=3.8)
"datetime_to_ms",
"datetime_to_ms_iso_timestamp",
"ms_to_datetime",
Expand Down
3 changes: 2 additions & 1 deletion cognite/client/utils/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from itertools import chain
from numbers import Integral
from typing import TYPE_CHECKING, Any, Literal
from zoneinfo import ZoneInfo

from cognite.client.exceptions import CogniteImportError
from cognite.client.utils._importing import local_import
from cognite.client.utils._text import to_camel_case
from cognite.client.utils._time import TIME_ATTRIBUTES, ZoneInfo
from cognite.client.utils._time import TIME_ATTRIBUTES

if TYPE_CHECKING:
import pandas as pd
Expand Down
Loading

0 comments on commit 2eec07b

Please sign in to comment.