From a54f4b1eb119e5aed1db70e086a209b14e39a8d1 Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 7 Jan 2025 12:11:26 +0100 Subject: [PATCH 1/6] first implementation of local udf --- .../process_implementations/__init__.py | 1 + .../process_implementations/udf/udf.py | 26 +++++++++++++ tests/test_udf.py | 38 +++++++++++++++++++ 3 files changed, 65 insertions(+) create mode 100644 openeo_processes_dask/process_implementations/udf/udf.py create mode 100644 tests/test_udf.py diff --git a/openeo_processes_dask/process_implementations/__init__.py b/openeo_processes_dask/process_implementations/__init__.py index 5dd1333..5f8c7dd 100644 --- a/openeo_processes_dask/process_implementations/__init__.py +++ b/openeo_processes_dask/process_implementations/__init__.py @@ -9,6 +9,7 @@ from .logic import * from .math import * from .text import * +from .udf import * try: from .ml import * diff --git a/openeo_processes_dask/process_implementations/udf/udf.py b/openeo_processes_dask/process_implementations/udf/udf.py new file mode 100644 index 0000000..6d77801 --- /dev/null +++ b/openeo_processes_dask/process_implementations/udf/udf.py @@ -0,0 +1,26 @@ +from typing import Optional + +import dask.array as da +import xarray as xr +from openeo.udf import UdfData +from openeo.udf.run_code import run_udf_code +from openeo.udf.xarraydatacube import XarrayDataCube + +from openeo_processes_dask.process_implementations.data_model import RasterCube + +__all__ = ["run_udf"] + + +def run_udf( + data: da.Array, udf: str, runtime: str, context: Optional[dict] = None +) -> RasterCube: + data = XarrayDataCube(xr.DataArray(data)) + data = UdfData(proj={"EPSG": 900913}, datacube_list=[data], user_context=context) + result = run_udf_code(code=udf, data=data) + cubes = result.get_datacube_list() + if len(cubes) != 1: + raise ValueError( + f"The provided UDF should return one datacube, but got: {result}" + ) + result_array: xr.DataArray = cubes[0].array + return result_array diff --git a/tests/test_udf.py b/tests/test_udf.py new file mode 100644 index 0000000..8e52d0b --- /dev/null +++ b/tests/test_udf.py @@ -0,0 +1,38 @@ +import numpy as np +import openeo +import pytest +import xarray as xr + +from openeo_processes_dask.process_implementations.udf.udf import run_udf +from tests.general_checks import general_output_checks +from tests.mockdata import create_fake_rastercube + + +@pytest.mark.parametrize("size", [(6, 5, 4, 4)]) +@pytest.mark.parametrize("dtype", [np.float32]) +def test_run_udf(temporal_interval, bounding_box, random_raster_data): + input_cube = create_fake_rastercube( + data=random_raster_data, + spatial_extent=bounding_box, + temporal_extent=temporal_interval, + bands=["B02", "B03", "B04", "B08"], + backend="dask", + ) + + udf = """ +from openeo.udf import XarrayDataCube +def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: + return cube +""" + + output_cube = run_udf(data=input_cube, udf=udf, runtime="Python") + + general_output_checks( + input_cube=input_cube, + output_cube=output_cube, + verify_attrs=True, + verify_crs=True, + expected_results=input_cube, + ) + + xr.testing.assert_equal(output_cube, input_cube) From ceb64d2ed96569ce1acd53e14c841543308c61e5 Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 7 Jan 2025 12:17:48 +0100 Subject: [PATCH 2/6] add openeo dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index a3868a6..6587477 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ xvec = { version = "0.2.0", optional = true } joblib = { version = ">=1.3.2", optional = true } geoparquet = "^0.0.3" pyarrow = "^15.0.2" +openeo = ">=0.36.0" [tool.poetry.group.dev.dependencies] pytest = "^7.2.0" From 19d8cc17385e50b4204e8c95c8603d65a25f939e Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Wed, 22 Jan 2025 11:36:45 +0100 Subject: [PATCH 3/6] added __init__.py to udf module --- openeo_processes_dask/process_implementations/udf/__init__.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 openeo_processes_dask/process_implementations/udf/__init__.py diff --git a/openeo_processes_dask/process_implementations/udf/__init__.py b/openeo_processes_dask/process_implementations/udf/__init__.py new file mode 100644 index 0000000..e926694 --- /dev/null +++ b/openeo_processes_dask/process_implementations/udf/__init__.py @@ -0,0 +1 @@ +from .udf import * From 00d8a73391e4ba995291f284813d34cc42bcb5c4 Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Wed, 22 Jan 2025 11:41:50 +0100 Subject: [PATCH 4/6] update run_udf unit tests --- .../process_implementations/udf/udf.py | 2 +- tests/test_udf.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openeo_processes_dask/process_implementations/udf/udf.py b/openeo_processes_dask/process_implementations/udf/udf.py index 6d77801..a471e3c 100644 --- a/openeo_processes_dask/process_implementations/udf/udf.py +++ b/openeo_processes_dask/process_implementations/udf/udf.py @@ -15,7 +15,7 @@ def run_udf( data: da.Array, udf: str, runtime: str, context: Optional[dict] = None ) -> RasterCube: data = XarrayDataCube(xr.DataArray(data)) - data = UdfData(proj={"EPSG": 900913}, datacube_list=[data], user_context=context) + data = UdfData(datacube_list=[data], user_context=context) result = run_udf_code(code=udf, data=data) cubes = result.get_datacube_list() if len(cubes) != 1: diff --git a/tests/test_udf.py b/tests/test_udf.py index 8e52d0b..2ee40ae 100644 --- a/tests/test_udf.py +++ b/tests/test_udf.py @@ -20,9 +20,9 @@ def test_run_udf(temporal_interval, bounding_box, random_raster_data): ) udf = """ -from openeo.udf import XarrayDataCube -def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: - return cube +import xarray as xr +def apply_datacube(cube: xr.DataArray, context: dict) -> xr.DataArray: + return cube + 1 """ output_cube = run_udf(data=input_cube, udf=udf, runtime="Python") @@ -32,7 +32,7 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: output_cube=output_cube, verify_attrs=True, verify_crs=True, - expected_results=input_cube, + expected_results=input_cube + 1, ) - xr.testing.assert_equal(output_cube, input_cube) + xr.testing.assert_equal(output_cube, input_cube + 1) From abd2feaaef235078389ce3ec263a517668450b9f Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Wed, 22 Jan 2025 12:27:27 +0100 Subject: [PATCH 5/6] keep attributes in apply process --- openeo_processes_dask/process_implementations/cubes/apply.py | 1 + 1 file changed, 1 insertion(+) diff --git a/openeo_processes_dask/process_implementations/cubes/apply.py b/openeo_processes_dask/process_implementations/cubes/apply.py index d39b36d..aba8715 100644 --- a/openeo_processes_dask/process_implementations/cubes/apply.py +++ b/openeo_processes_dask/process_implementations/cubes/apply.py @@ -34,6 +34,7 @@ def apply( "positional_parameters": positional_parameters, "named_parameters": named_parameters, }, + keep_attrs=True, ) return result From e33fd4e5bb3280fcebcd1e551f07fa20e8cb9390 Mon Sep 17 00:00:00 2001 From: Vincent Verelst Date: Tue, 28 Jan 2025 10:01:12 +0100 Subject: [PATCH 6/6] update to dask_geopandas 0.4.3 --- .../process_implementations/ml/random_forest.py | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/openeo_processes_dask/process_implementations/ml/random_forest.py b/openeo_processes_dask/process_implementations/ml/random_forest.py index 498b4d7..de248b6 100644 --- a/openeo_processes_dask/process_implementations/ml/random_forest.py +++ b/openeo_processes_dask/process_implementations/ml/random_forest.py @@ -95,7 +95,7 @@ def drop_col(df, keep_var): if isinstance(predictors, gpd.GeoDataFrame): predictors = dask_geopandas.from_geopandas(predictors, npartitions=1) - if isinstance(predictors, dask_geopandas.core.GeoDataFrame): + if isinstance(predictors, dask_geopandas.expr.GeoDataFrame): data_ddf = ( predictors.to_dask_dataframe().reset_index().repartition(npartitions=1) ) diff --git a/pyproject.toml b/pyproject.toml index e626bec..2a57553 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ pandas = { version = ">=2.0.0", optional = true } xarray = { version = ">=2022.11.0,<=2024.3.0", optional = true } dask = {extras = ["array", "dataframe", "distributed"], version = ">=2023.4.0", optional = true} rasterio = { version = "^1.3.4", optional = true } -dask-geopandas = { version = ">=0.2.0,<1", optional = true } +dask-geopandas = { version = "0.4.3", optional = true } xgboost = { version = ">=1.5.1", optional = true } rioxarray = { version = ">=0.12.0,<1", optional = true } openeo-pg-parser-networkx = { version = ">=2024.7", optional = true }