Skip to content

Commit

Permalink
python: latest flag for prefetch and copy
Browse files Browse the repository at this point in the history
Signed-off-by: Abhishek Gaikwad <[email protected]>
  • Loading branch information
gaikwadabhishek committed Jan 12, 2024
1 parent 821eb0f commit 4b530a0
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 37 deletions.
3 changes: 1 addition & 2 deletions python/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ spelling-store-unknown-words=no

# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
XXX

# Regular expression of note tags to take in consideration.
#notes-rgx=
Expand Down
3 changes: 1 addition & 2 deletions python/.pylintrc-tests
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ spelling-store-unknown-words=no

# List of note tags to take in consideration, separated by a comma.
notes=FIXME,
XXX,
TODO
XXX

# Regular expression of note tags to take in consideration.
#notes-rgx=
Expand Down
22 changes: 20 additions & 2 deletions python/aistore/sdk/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ def copy(
prepend: str = "",
dry_run: bool = False,
force: bool = False,
latest: bool = False,
sync: bool = False,
) -> str:
"""
Returns job ID that can be used later to check the status of the asynchronous operation.
Expand All @@ -437,6 +439,8 @@ def copy(
dry_run (bool, optional): Determines if the copy should actually
happen or not
force (bool, optional): Override existing destination bucket
latest (bool, optional): GET the latest object version from the associated remote bucket
sync (bool, optional): synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source
Returns:
Job ID (as str) that can be used to check the status of the operation
Expand All @@ -450,7 +454,12 @@ def copy(
requests.ReadTimeout: Timed out receiving response from AIStore
"""
value = CopyBckMsg(
prefix=prefix_filter, prepend=prepend, dry_run=dry_run, force=force
prefix=prefix_filter,
prepend=prepend,
dry_run=dry_run,
force=force,
latest=latest,
sync=sync,
).as_dict()
params = self.qparam.copy()
params[QPARAM_BCK_TO] = to_bck.get_path()
Expand Down Expand Up @@ -646,6 +655,8 @@ def transform(
ext: Dict[str, str] = None,
force: bool = False,
dry_run: bool = False,
latest: bool = False,
sync: bool = False,
) -> str:
"""
Visits all selected objects in the source bucket and for each object, puts the transformed
Expand All @@ -661,6 +672,8 @@ def transform(
(i.e. {"jpg": "txt"})
dry_run (bool, optional): determines if the copy should actually happen or not
force (bool, optional): override existing destination bucket
latest (bool, optional): GET the latest object version from the associated remote bucket
sync (bool, optional): synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source
Returns:
Job ID (as str) that can be used to check the status of the operation
Expand All @@ -669,7 +682,12 @@ def transform(
ext=ext,
transform_msg=TransformBckMsg(etl_name=etl_name, timeout=timeout),
copy_msg=CopyBckMsg(
prefix=prefix_filter, prepend=prepend, force=force, dry_run=dry_run
prefix=prefix_filter,
prepend=prepend,
force=force,
dry_run=dry_run,
latest=latest,
sync=sync,
),
).as_dict()

Expand Down
14 changes: 14 additions & 0 deletions python/aistore/sdk/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,17 @@

# Environment Variables
AIS_SERVER_CRT = "AIS_SERVER_CRT"

# Content Constants
LOREM = (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod"
" tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim"
" veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea"
" commodo consequat."
)
DUIS = (
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum"
" dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non"
" proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
" Et harum quidem.."
)
37 changes: 32 additions & 5 deletions python/aistore/sdk/multiobj/object_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TransformBckMsg,
TCBckMsg,
ArchiveMultiObj,
PrefetchMsg,
)


Expand Down Expand Up @@ -127,11 +128,15 @@ def evict(self):
value=self._obj_collection.get_value(),
).text

def prefetch(self):
def prefetch(self, latest: bool = False, continue_on_error: bool = False):
"""
Prefetches a list or range of objects in a bucket so that they are cached in AIS
NOTE: only Cloud buckets can be prefetched.
Args:
latest (bool, optional): GET the latest object version from the associated remote bucket
continue_on_error (bool, optional): Whether to continue if there is an error prefetching a single object
Raises:
aistore.sdk.errors.AISError: All other types of errors with AIStore
requests.ConnectionError: Connection error
Expand All @@ -145,10 +150,17 @@ def prefetch(self):
"""
self.bck.verify_cloud_bucket()

value = PrefetchMsg(
object_selection=self._obj_collection.get_value(),
continue_on_err=continue_on_error,
latest=latest,
).as_dict()

return self.bck.make_request(
HTTP_METHOD_POST,
ACT_PREFETCH_OBJECTS,
value=self._obj_collection.get_value(),
value=value,
).text

# pylint: disable=too-many-arguments
Expand All @@ -159,6 +171,8 @@ def copy(
continue_on_error: bool = False,
dry_run: bool = False,
force: bool = False,
latest: bool = False,
sync: bool = False,
):
"""
Copies a list or range of objects in a bucket
Expand All @@ -170,6 +184,8 @@ def copy(
dry_run (bool, optional): Skip performing the copy and just log the intended actions
force (bool, optional): Force this job to run over others in case it conflicts
(see "limited coexistence" and xact/xreg/xreg.go)
latest (bool, optional): GET the latest object version from the associated remote bucket
sync (bool, optional): synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source
Raises:
aistore.sdk.errors.AISError: All other types of errors with AIStore
Expand All @@ -191,16 +207,21 @@ def copy(
f"{to_bck.get_path()}",
list(self._obj_collection),
)
copy_msg = CopyBckMsg(prepend=prepend, dry_run=dry_run, force=force)
copy_msg = CopyBckMsg(
prepend=prepend, dry_run=dry_run, force=force, latest=latest, sync=sync
)

value = TCMultiObj(
to_bck=to_bck.as_model(),
tc_msg=TCBckMsg(copy_msg=copy_msg),
object_selection=self._obj_collection.get_value(),
continue_on_err=continue_on_error,
).as_dict()

return self.bck.make_request(
HTTP_METHOD_POST, ACT_COPY_OBJECTS, value=value
HTTP_METHOD_POST,
ACT_COPY_OBJECTS,
value=value,
).text

# pylint: disable=too-many-arguments
Expand All @@ -213,6 +234,8 @@ def transform(
continue_on_error: bool = False,
dry_run: bool = False,
force: bool = False,
latest: bool = False,
sync: bool = False,
):
"""
Performs ETL operation on a list or range of objects in a bucket, placing the results in the destination bucket
Expand All @@ -226,6 +249,8 @@ def transform(
dry_run (bool, optional): Skip performing the transform and just log the intended actions
force (bool, optional): Force this job to run over others in case it conflicts
(see "limited coexistence" and xact/xreg/xreg.go)
latest (bool, optional): GET the latest object version from the associated remote bucket
sync (bool, optional): synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source
Raises:
aistore.sdk.errors.AISError: All other types of errors with AIStore
Expand All @@ -247,7 +272,9 @@ def transform(
list(self._obj_collection),
)

copy_msg = CopyBckMsg(prepend=prepend, dry_run=dry_run, force=force)
copy_msg = CopyBckMsg(
prepend=prepend, dry_run=dry_run, force=force, latest=latest, sync=sync
)
transform_msg = TransformBckMsg(etl_name=etl_name, timeout=timeout)
value = TCMultiObj(
to_bck=to_bck.as_model(),
Expand Down
20 changes: 20 additions & 0 deletions python/aistore/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,13 +389,17 @@ class CopyBckMsg(BaseModel):
prepend: str
dry_run: bool
force: bool
latest: bool
sync: bool

def as_dict(self):
return {
"prefix": self.prefix,
"prepend": self.prepend,
"dry_run": self.dry_run,
"force": self.force,
"latest-ver": self.latest,
"synchronize": self.sync,
}


Expand Down Expand Up @@ -459,6 +463,22 @@ def as_dict(self):
return dict_rep


class PrefetchMsg(BaseModel):
"""
API message structure for prefetching objects from remote buckets.
"""

object_selection: dict
continue_on_err: bool
latest: bool

def as_dict(self):
dict_rep = self.object_selection
dict_rep["coer"] = self.continue_on_err
dict_rep["latest-ver"] = self.latest
return dict_rep


class TCMultiObj(BaseModel):
"""
API message structure for transforming or copying multiple objects between buckets
Expand Down
27 changes: 7 additions & 20 deletions python/tests/integration/sdk/test_bucket_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import requests

from aistore.sdk import ListObjectFlag
from aistore.sdk.const import PROVIDER_AIS, UTF_ENCODING
from aistore.sdk.const import PROVIDER_AIS, UTF_ENCODING, LOREM, DUIS
from aistore.sdk.errors import InvalidBckProvider, AISError, ErrBckNotFound

from tests.integration.sdk.remote_enabled_test import RemoteEnabledTest
Expand Down Expand Up @@ -130,19 +130,6 @@ def test_get_latest_flag(self):
obj_name = random_string()
self.cloud_objects.append(obj_name)

lorem = (
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod"
" tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim"
" veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea"
" commodo consequat."
)
duis = (
"Duis aute irure dolor in reprehenderit in voluptate velit esse cillum"
" dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non"
" proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
" Et harum quidem.."
)

s3_client = boto3.client(
"s3",
region_name=AWS_REGION,
Expand All @@ -152,29 +139,29 @@ def test_get_latest_flag(self):
)

# out-of-band PUT: first version
s3_client.put_object(Bucket=self.bucket.name, Key=obj_name, Body=lorem)
s3_client.put_object(Bucket=self.bucket.name, Key=obj_name, Body=LOREM)

# cold GET, and check
content = self.bucket.object(obj_name).get().read_all()
self.assertEqual(lorem, content.decode("utf-8"))
self.assertEqual(LOREM, content.decode("utf-8"))

# out-of-band PUT: 2nd version (overwrite)
s3_client.put_object(Bucket=self.bucket.name, Key=obj_name, Body=duis)
s3_client.put_object(Bucket=self.bucket.name, Key=obj_name, Body=DUIS)

# warm GET and check (expecting the first version's content)
content = self.bucket.object(obj_name).get().read_all()
self.assertEqual(lorem, content.decode("utf-8"))
self.assertEqual(LOREM, content.decode("utf-8"))

# warm GET with `--latest` flag, content should be updated
content = self.bucket.object(obj_name).get(latest=True).read_all()
self.assertEqual(duis, content.decode("utf-8"))
self.assertEqual(DUIS, content.decode("utf-8"))

# out-of-band DELETE
s3_client.delete_object(Bucket=self.bucket.name, Key=obj_name)

# warm GET must be fine
content = self.bucket.object(obj_name).get().read_all()
self.assertEqual(duis, content.decode("utf-8"))
self.assertEqual(DUIS, content.decode("utf-8"))

# cold GET must result in Error
with self.assertRaises(AISError):
Expand Down
Loading

0 comments on commit 4b530a0

Please sign in to comment.