diff --git a/00_core.ipynb b/00_core.ipynb index bc27813..b26ab2a 100644 --- a/00_core.ipynb +++ b/00_core.ipynb @@ -35,8 +35,6 @@ "outputs": [], "source": [ "#| export\n", - "import logging\n", - "\n", "import coiled\n", "import dask.distributed\n", "import ee\n", @@ -52,25 +50,30 @@ "#| export\n", "class InitEarthEngine(dask.distributed.WorkerPlugin):\n", " def __init__(self, **kwargs):\n", - " logging.info('InitEarthEngine init') # This appears in the notebook output where the cluster is initiated.\n", + " print('InitEarthEngine.init: starting') # This appears in the notebook output where the cluster is initiated.\n", " self.kwargs = kwargs\n", + " print(f'InitEarthEngine.init: kwargs = {kwargs}')\n", "\n", " def setup(self, worker):\n", - " logging.info('InitEarthEngine setup') # This appears in the dask cluster logs.\n", + " # Print statements output to the dask cluster logs (viewable via Coiled dashboard)\n", + " print('InitEarthEngine.setup: starting')\n", + " print('InitEarthEngine.setup: default to using the high volume endpoint')\n", + " self.kwargs.setdefault('opt_url', 'https://earthengine-highvolume.googleapis.com')\n", " import ee\n", + " print(f'InitEarthEngine.setup: ee.Initialize(**{self.kwargs})')\n", " ee.Initialize(**self.kwargs)\n", "\n", "\n", "class ClusterGEE(coiled.Cluster):\n", " def __init__(self, **kwargs):\n", - " logging.debug('ClusterGEE init')\n", + " print('ClusterGEE init')\n", " super().__init__(**kwargs)\n", " # Wait for the workers to start, then send the ADCs\n", " self.wait_for_workers(kwargs['n_workers'])\n", " coiled.credentials.google.send_application_default_credentials(self)\n", "\n", " def get_client(self):\n", - " logging.debug('ClusterGEE get_client')\n", + " print('ClusterGEE get_client')\n", " client = super().get_client()\n", " client.register_plugin(InitEarthEngine())\n", " return client\n", @@ -79,11 +82,11 @@ "# For local development\n", "class LocalClusterGEE(dask.distributed.LocalCluster): \n", " def __init__(self, **kwargs):\n", - " logging.debug('LocalClusterGEE init')\n", + " print('LocalClusterGEE init')\n", " super().__init__(**kwargs)\n", "\n", " def get_client(self):\n", - " logging.debug('LocalClusterGEE get_client')\n", + " print('LocalClusterGEE get_client')\n", " client = super().get_client()\n", " client.register_plugin(InitEarthEngine())\n", " return client" @@ -121,11 +124,13 @@ "try:\n", " credentials, project_id = google.auth.default()\n", "except google.auth.exceptions.DefaultCredentialsError:\n", + " print('Unable to get auth credentials. Initiating auth flow.')\n", " !gcloud auth application-default login\n", " credentials, project_id = google.auth.default()\n", "try:\n", " ee.Initialize(credentials=credentials, project=project_id)\n", "except google.auth.exceptions.RefreshError:\n", + " print('Credentials refresh error. Initiating auth flow.')\n", " !gcloud auth application-default login\n", " credentials, project_id = google.auth.default()\n", "ee.Initialize(credentials=credentials, project=project_id)" @@ -144,28 +149,17 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/tylere/Documents/GitHub/VorGeo/earthengine-dask/.pixi/envs/default/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n", - "Perhaps you already have a cluster running?\n", - "Hosting the HTTP server on port 50794 instead\n", - " warnings.warn(\n" - ] - } - ], + "outputs": [], "source": [ - "# cluster = ClusterGEE(\n", - "# name='test-class-cluster',\n", - "# n_workers=1,\n", - "# worker_cpu=4,\n", - "# spot_policy=\"spot_with_fallback\",\n", - "# region='us-central1',\n", - "# idle_timeout=\"1 hours\",\n", - "# )\n", - "cluster = LocalClusterGEE()" + "cluster = ClusterGEE(\n", + " name='test-class-cluster',\n", + " n_workers=1,\n", + " worker_cpu=4,\n", + " spot_policy=\"spot_with_fallback\",\n", + " region='us-central1',\n", + " idle_timeout=\"1 hours\",\n", + ")\n", + "# cluster = LocalClusterGEE()" ] }, { @@ -179,365 +173,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "
\n", - "
\n", - "

Client

\n", - "

Client-5ece790e-1f77-11ef-99f8-fe11494405b6

\n", - " \n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - "
Connection method: Cluster objectCluster type: __main__.LocalClusterGEE
\n", - " Dashboard: http://127.0.0.1:50794/status\n", - "
\n", - "\n", - " \n", - "\n", - " \n", - "
\n", - "

Cluster Info

\n", - "
\n", - "
\n", - "
\n", - "
\n", - "

LocalClusterGEE

\n", - "

392efe34

\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - "\n", - " \n", - "
\n", - " Dashboard: http://127.0.0.1:50794/status\n", - " \n", - " Workers: 5\n", - "
\n", - " Total threads: 10\n", - " \n", - " Total memory: 64.00 GiB\n", - "
Status: runningUsing processes: True
\n", - "\n", - "
\n", - " \n", - "

Scheduler Info

\n", - "
\n", - "\n", - "
\n", - "
\n", - "
\n", - "
\n", - "

Scheduler

\n", - "

Scheduler-c63afdec-a377-4bef-9451-691b1b171c36

\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
\n", - " Comm: tcp://127.0.0.1:50795\n", - " \n", - " Workers: 5\n", - "
\n", - " Dashboard: http://127.0.0.1:50794/status\n", - " \n", - " Total threads: 10\n", - "
\n", - " Started: Just now\n", - " \n", - " Total memory: 64.00 GiB\n", - "
\n", - "
\n", - "
\n", - "\n", - "
\n", - " \n", - "

Workers

\n", - "
\n", - "\n", - " \n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "

Worker: 0

\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - "\n", - " \n", - "\n", - "
\n", - " Comm: tcp://127.0.0.1:50811\n", - " \n", - " Total threads: 2\n", - "
\n", - " Dashboard: http://127.0.0.1:50817/status\n", - " \n", - " Memory: 12.80 GiB\n", - "
\n", - " Nanny: tcp://127.0.0.1:50798\n", - "
\n", - " Local directory: /var/folders/1s/21_zf6j56vs_t0j6klrw2znm0000gn/T/dask-scratch-space/worker-uhclrxt6\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "

Worker: 1

\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - "\n", - " \n", - "\n", - "
\n", - " Comm: tcp://127.0.0.1:50809\n", - " \n", - " Total threads: 2\n", - "
\n", - " Dashboard: http://127.0.0.1:50815/status\n", - " \n", - " Memory: 12.80 GiB\n", - "
\n", - " Nanny: tcp://127.0.0.1:50800\n", - "
\n", - " Local directory: /var/folders/1s/21_zf6j56vs_t0j6klrw2znm0000gn/T/dask-scratch-space/worker-mxmx0chx\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "

Worker: 2

\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - "\n", - " \n", - "\n", - "
\n", - " Comm: tcp://127.0.0.1:50810\n", - " \n", - " Total threads: 2\n", - "
\n", - " Dashboard: http://127.0.0.1:50816/status\n", - " \n", - " Memory: 12.80 GiB\n", - "
\n", - " Nanny: tcp://127.0.0.1:50802\n", - "
\n", - " Local directory: /var/folders/1s/21_zf6j56vs_t0j6klrw2znm0000gn/T/dask-scratch-space/worker-qf20mh5l\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "

Worker: 3

\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - "\n", - " \n", - "\n", - "
\n", - " Comm: tcp://127.0.0.1:50808\n", - " \n", - " Total threads: 2\n", - "
\n", - " Dashboard: http://127.0.0.1:50814/status\n", - " \n", - " Memory: 12.80 GiB\n", - "
\n", - " Nanny: tcp://127.0.0.1:50804\n", - "
\n", - " Local directory: /var/folders/1s/21_zf6j56vs_t0j6klrw2znm0000gn/T/dask-scratch-space/worker-6kosms5z\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "

Worker: 4

\n", - "
\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "\n", - " \n", - "\n", - " \n", - "\n", - "
\n", - " Comm: tcp://127.0.0.1:50812\n", - " \n", - " Total threads: 2\n", - "
\n", - " Dashboard: http://127.0.0.1:50813/status\n", - " \n", - " Memory: 12.80 GiB\n", - "
\n", - " Nanny: tcp://127.0.0.1:50806\n", - "
\n", - " Local directory: /var/folders/1s/21_zf6j56vs_t0j6klrw2znm0000gn/T/dask-scratch-space/worker-a1wn1vpy\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "\n", - "
\n", - "
\n", - "\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "\n", - "
\n", - "
" - ], - "text/plain": [ - "" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "client = cluster.get_client()\n", "client" @@ -560,29 +196,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Submitting jobs...\n", - "...done\n", - "Gathering results...\n", - "...done\n" - ] - }, - { - "data": { - "text/plain": [ - "[{'country': 'Abyei Area', 'area_km2': 10460, 'mean_elev': 402.5921903247955},\n", - " {'country': 'Zimbabwe', 'area_km2': 391916, 'mean_elev': 973.2955548809969}]" - ] - }, - "execution_count": null, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "# Get a list of countries to analyze.\n", "country_fc = ee.FeatureCollection('USDOS/LSIB_SIMPLE/2017')\n", @@ -630,19 +244,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "ename": "AttributeError", - "evalue": "'LocalClusterGEE' object has no attribute 'shutdown'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[14], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mcluster\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mshutdown\u001b[49m()\n", - "\u001b[0;31mAttributeError\u001b[0m: 'LocalClusterGEE' object has no attribute 'shutdown'" - ] - } - ], + "outputs": [], "source": [ "cluster.shutdown()" ] diff --git a/earthengine_dask/core.py b/earthengine_dask/core.py index a3d124f..fe3b3b5 100644 --- a/earthengine_dask/core.py +++ b/earthengine_dask/core.py @@ -4,8 +4,6 @@ __all__ = ['InitEarthEngine', 'ClusterGEE', 'LocalClusterGEE'] # %% ../00_core.ipynb 3 -import logging - import coiled import dask.distributed import ee @@ -14,25 +12,30 @@ # %% ../00_core.ipynb 4 class InitEarthEngine(dask.distributed.WorkerPlugin): def __init__(self, **kwargs): - logging.info('InitEarthEngine init') # This appears in the notebook output where the cluster is initiated. + print('InitEarthEngine.init: starting') # This appears in the notebook output where the cluster is initiated. self.kwargs = kwargs + print(f'InitEarthEngine.init: kwargs = {kwargs}') def setup(self, worker): - logging.info('InitEarthEngine setup') # This appears in the dask cluster logs. + # Print statements output to the dask cluster logs (viewable via Coiled dashboard) + print('InitEarthEngine.setup: starting') + print('InitEarthEngine.setup: default to using the high volume endpoint') + self.kwargs.setdefault('opt_url', 'https://earthengine-highvolume.googleapis.com') import ee + print(f'InitEarthEngine.setup: ee.Initialize(**{self.kwargs})') ee.Initialize(**self.kwargs) class ClusterGEE(coiled.Cluster): def __init__(self, **kwargs): - logging.debug('ClusterGEE init') + print('ClusterGEE init') super().__init__(**kwargs) # Wait for the workers to start, then send the ADCs self.wait_for_workers(kwargs['n_workers']) coiled.credentials.google.send_application_default_credentials(self) def get_client(self): - logging.debug('ClusterGEE get_client') + print('ClusterGEE get_client') client = super().get_client() client.register_plugin(InitEarthEngine()) return client @@ -41,11 +44,11 @@ def get_client(self): # For local development class LocalClusterGEE(dask.distributed.LocalCluster): def __init__(self, **kwargs): - logging.debug('LocalClusterGEE init') + print('LocalClusterGEE init') super().__init__(**kwargs) def get_client(self): - logging.debug('LocalClusterGEE get_client') + print('LocalClusterGEE get_client') client = super().get_client() client.register_plugin(InitEarthEngine()) return client