diff --git a/CHANGES.rst b/CHANGES.rst index 25b6ad6d..91ee6e6f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,11 +6,21 @@ Changelog v0.6.0 (unreleased) ------------------- +Contributors to this version: Travis Logan (:user:`tlogan2000`), Trevor James Smith (:user:`Zeitsperre`). Announcements ^^^^^^^^^^^^^ * `miranda` boilerplate code is now versioned with `cruft `_ and the `Ouranosinc/cookiecutter-pypackage `_ template. +New features +^^^^^^^^^^^^ +* Aggregation operations now support more variables (`hur`, `hurs`, `huss`, `rlds`, `ta`, `tdp`, `ua`, `uas`, `va`, `vas`) +* `RDRDv21` has been added as a dataset to be converted. + +Bug fixes +^^^^^^^^^ +* Transformation docstrings are now only updated when the transformation is actually applied. + Internal changes ^^^^^^^^^^^^^^^^ * `miranda` now has a security policy (`SECURITY.md`) for disclosing sensitive issues using secure communication channels. This has also been added to the documentation. diff --git a/docs/conf.py b/docs/conf.py index 19c8d58f..17606117 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -180,7 +180,6 @@ "navigation_with_keys": True, "source_branch": "main", "source_repository": "https://github.com/Ouranosinc/miranda/", - "top_of_page_button": "edit" if not on_rtd else None, } # Add any paths that contain custom themes here, relative to this directory. diff --git a/miranda/convert/__init__.py b/miranda/convert/__init__.py index 71bcca04..2c427170 100644 --- a/miranda/convert/__init__.py +++ b/miranda/convert/__init__.py @@ -6,5 +6,4 @@ from ._aggregation import * from ._data_corrections import * from ._data_definitions import * - -# from ._reconstruction import * +from ._reconstruction import * diff --git a/miranda/convert/_aggregation.py b/miranda/convert/_aggregation.py index aa80e7ea..64a18a42 100644 --- a/miranda/convert/_aggregation.py +++ b/miranda/convert/_aggregation.py @@ -39,25 +39,29 @@ def aggregations_possible(ds: xr.Dataset, freq: str = "day") -> dict[str, set[st offset, meaning = get_time_frequency(ds, minimum_continuous_period="1h") aggregation_legend = dict() - for v in ["tas", "tdps"]: + for v in ["tas", "tdps", "hurs"]: if freq == meaning: if not hasattr(ds, v) and ( hasattr(ds, f"{v}max") and hasattr(ds, f"{v}min") ): aggregation_legend[f"_{v}"] = {"mean"} for variable in ds.data_vars: - if variable in ["tas", "tdps"]: + if variable in ["tas", "ta", "tdps", "tdp", "hurs", "hur"]: aggregation_legend[variable] = {"max", "mean", "min"} + elif variable in ["sfcWind"]: + aggregation_legend[variable] = {"max", "mean"} elif variable in [ "evspsblpot", "hfls", "hfss", - "hur", + "huss", "hus", "pr", "prsn", + "prmod", "ps", "psl", + "rlds", "rsds", "rss", "rlds", @@ -66,6 +70,13 @@ def aggregations_possible(ds: xr.Dataset, freq: str = "day") -> dict[str, set[st "snr", "snw", "swe", + "uas", + "ua", + "vas", + "va", + "40mWind", + "zcrd10000", + "zcrd09944", ]: aggregation_legend[variable] = {"mean"} diff --git a/miranda/convert/_data_corrections.py b/miranda/convert/_data_corrections.py index 03c2697f..83e5f122 100644 --- a/miranda/convert/_data_corrections.py +++ b/miranda/convert/_data_corrections.py @@ -409,18 +409,15 @@ def _transform(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: converted.append(vv) else: raise NotImplementedError(f"Unknown transformation: {trans}") + prev_history = d.attrs.get("history", "") + history = f"Transformed variable `{vv}` values using method `{trans}`. {prev_history}" + d_out.attrs.update(dict(history=history)) elif trans is False: logging.info( f"No transformations needed for `{vv}` (Explicitly set to False)." ) continue - prev_history = d.attrs.get("history", "") - history = ( - f"Transformed variable `{vv}` values using method `{trans}`. {prev_history}" - ) - d_out.attrs.update(dict(history=history)) - # Copy unconverted variables for vv in d.data_vars: if vv not in converted: @@ -461,14 +458,14 @@ def _offset_time(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: out["time"] = out.time - np.timedelta64(offset[0], offset[1]) d_out[vv] = out converted.append(vv) + prev_history = d.attrs.get("history", "") + history = f"Offset variable `{vv}` values by `{offset[0]} {offset_meaning}(s). {prev_history}" + d_out.attrs.update(dict(history=history)) elif offs is False: logging.info( f"No time offsetting needed for `{vv}` in `{p}` (Explicitly set to False)." ) continue - prev_history = d.attrs.get("history", "") - history = f"Offset variable `{vv}` values by `{offset[0]} {offset_meaning}(s). {prev_history}" - d_out.attrs.update(dict(history=history)) # Copy unconverted variables for vv in d.data_vars: @@ -488,14 +485,14 @@ def _invert_sign(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: out = d[vv] d_out[out.name] = -out converted.append(vv) + prev_history = d.attrs.get("history", "") + history = f"Inverted sign for variable `{vv}` (switched direction of values). {prev_history}" + d_out.attrs.update(dict(history=history)) elif inv_sign is False: logging.info( f"No sign inversion needed for `{vv}` in `{p}` (Explicitly set to False)." ) continue - prev_history = d.attrs.get("history", "") - history = f"Inverted sign for variable `{vv}` (switched direction of values). {prev_history}" - d_out.attrs.update(dict(history=history)) # Copy unconverted variables for vv in d.data_vars: @@ -549,6 +546,9 @@ def _clip_values(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: out = d[vv] d_out[out.name] = out.clip(min_value, max_value) converted.append(vv) + prev_history = d.attrs.get("history", "") + history = f"Clipped variable `{vv}` with `min={min_value}` and `max={max_value}`. {prev_history}" + d_out.attrs.update(dict(history=history)) elif clip_values is False: logging.info( f"No clipping of values needed for `{vv}` in `{p}` (Explicitly set to False)." @@ -558,10 +558,6 @@ def _clip_values(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: logging.info(f"No clipping of values needed for `{vv}` in `{p}`.") continue - prev_history = d.attrs.get("history", "") - history = f"Clipped variable `{vv}` with `min={min_value}` and `max={max_value}`. {prev_history}" - d_out.attrs.update(dict(history=history)) - # Copy unconverted variables for vv in d.data_vars: if vv not in converted: @@ -626,17 +622,18 @@ def _ensure_correct_time(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: raise ValueError(error_msg) logging.info(f"Resampling dataset with time frequency: {freq_found}.") + with xr.set_options(keep_attrs=True): d_out = d.assign_coords( time=d.time.resample(time=freq_found).mean(dim="time").time ) - d_out.time.attrs.update(d.time.attrs) - prev_history = d.attrs.get("history", "") - history = f"Resampled time with `freq={freq_found}`. {prev_history}" - d_out.attrs.update(dict(history=history)) - return d_out + if any(d_out.time != d.time): + prev_history = d.attrs.get("history", "") + history = f"Resampled time with `freq={freq_found}`. {prev_history}" + d_out.attrs.update(dict(history=history)) + return d_out return d @@ -665,7 +662,11 @@ def dims_conversion(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: ) if cf_name: rename_dims[dim] = cf_name - d = d.rename(rename_dims) + if rename_dims: + d = d.rename(rename_dims) + prev_history = d.attrs.get("history", "") + history = f"Renamed dimensons ({'; '.join([f'{k} : {i}' for k, i in rename_dims.items()])}). {prev_history}" + d.attrs.update(dict(history=history)) for new in ["lon", "lat"]: if new == "lon" and "lon" in d.coords: if np.any(d.lon > 180): @@ -685,8 +686,14 @@ def dims_conversion(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: if "time" in d.dims and transpose_order: transpose_order.insert(0, "time") transpose_order.extend(list(set(d.dims) - set(transpose_order))) + d = d.transpose(*transpose_order) d = d.sortby(transpose_order) + # add history only when we actually changed something + if any([list(d[v].dims) != transpose_order for v in d.data_vars]): + prev_history = d.attrs.get("history", "") + history = f"Transposed dimension order to {transpose_order}. {prev_history}" + d.attrs.update(dict(history=history)) # Add dimension original name and update attrs dim_descriptions = m["dimensions"] @@ -701,10 +708,6 @@ def dims_conversion(d: xr.Dataset, p: str, m: dict) -> xr.Dataset: if not field.startswith("_"): d[cf_name].attrs.update({field: dim_descriptions[dim][field]}) - prev_history = d.attrs.get("history", "") - history = f"Transposed and renamed dimensions. {prev_history}" - d.attrs.update(dict(history=history)) - return d diff --git a/miranda/convert/_data_definitions.py b/miranda/convert/_data_definitions.py index fa51a303..ec692237 100644 --- a/miranda/convert/_data_definitions.py +++ b/miranda/convert/_data_definitions.py @@ -267,7 +267,7 @@ def gather_rdrs( source=path.joinpath(vv), glob_pattern="{variable}_*_{name}_*.{suffix}", suffix=suffix, - recursive=True, + recursive=False, ) files[name][vv] = tmp[name] return files diff --git a/miranda/convert/_reconstruction.py b/miranda/convert/_reconstruction.py index ee5b552c..b3453f02 100644 --- a/miranda/convert/_reconstruction.py +++ b/miranda/convert/_reconstruction.py @@ -142,7 +142,9 @@ def reanalysis_processing( file_name = "_".join([var, time_freq, institute, project]) if domain != "not-specified": file_name = f"{file_name}_{domain}" - + if not chunks: + chunks = dict(time=24 * 10, lon=50, lat=50) + print(chunks) xr_kwargs = dict( chunks=chunks, engine=engine, diff --git a/miranda/convert/data/eccc_rdrs_cf_attrs.json b/miranda/convert/data/eccc_rdrs_cf_attrs.json index d3d985b3..03d1cf66 100644 --- a/miranda/convert/data/eccc_rdrs_cf_attrs.json +++ b/miranda/convert/data/eccc_rdrs_cf_attrs.json @@ -3,7 +3,7 @@ "Conventions": "CF-1.8", "Remarks": "Original variable names are following the convention ___. Variables with level '10000' are at surface level. The height [m] of variables with level '0XXXX' needs to be inferrred using the corresponding fields of geopotential height (GZ_0XXXX-GZ_10000). The variables UUC, VVC, UVC, and WDC are not modelled but inferred from UU and VV for convenience of the users. Precipitation (PR) is reported as 6-hr accumulations for CaPA_fine and CaPA_coarse. Precipitation (PR) are accumulations since beginning of the forecast for GEPS, GDPS, REPS, RDPS, HRDPS, and CaLDAS. The re-analysis product RDRS_v2 contains two variables for precipitation: 'RDRS_v2_P_PR0_SFC' is the model precipitation (trial field used by CaPA) and 'RDRS_v2_A_PR0_SFC' is precipitations adjusted with CaPA 24h precipitation (as in 'RDRS_P_PR0_SFC'). Please be aware that the baseflow 'O1' of the current version of WCPS is not reliable during the spring melt period.", "_contact": { - "rdrs-v21": "???" + "rdrs-v21": "" }, "_doi": { "rdrs-v21": "https://doi.org/10.5194/hess-25-4917-2021" @@ -29,6 +29,7 @@ "realm": "atmos", "table_date": "2023-03-23", "table_id": "eccc", + "terms_of_use": "RDRS v2.1 data users should acknowledge The Canadian Surface Prediction Archive (CaSPAr) as the original data source (https://caspar-data.ca/, https://doi.org/10.1175/BAMS-D-19-0143.1)", "type": "reconstruction" }, "dimensions": { @@ -36,7 +37,7 @@ "_ensure_correct_time": { "rdrs-v21": "1H" }, - "_strict_time": false, + "_strict_time": true, "axis": "T", "long_name": "time", "standard_name": "time" @@ -45,31 +46,204 @@ "variables": { "RDRS_v2.1_A_PR0_SFC": { "_cf_variable_name": "pr", - "_corrected_units": false, - "_invert_sign": {}, - "_offset_time": {}, "_transformation": { "rdrs-v21": "amount2rate" }, "cell_methods": "time: mean (interval: 1 hour)", "comments": "Converted from Total Precipitation using a density of 1000 kg/m³.", "long_name": "Precipitation", - "original_long_name": "Total Precipitation", "standard_name": "precipitation_flux", "units": "kg m-2 s-1" }, + "RDRS_v2.1_P_FB_SFC": { + "_cf_variable_name": "rsds", + "cell_methods": "time: mean (interval: 1 hour)", + "long_name": "Surface Downwelling Shortwave Radiation", + "standard_name": "surface_downwelling_shortwave_flux_in_air", + "units": "W m-2" + }, + "RDRS_v2.1_P_FI_SFC": { + "_cf_variable_name": "rlds", + "cell_methods": "time: mean (interval: 1 hour)", + "long_name": "Surface Downwelling Longwave Radiation", + "standard_name": "surface_downwelling_longwave_flux_in_air", + "units": "W m-2" + }, + "RDRS_v2.1_P_GZ_09944": { + "_cf_variable_name": "zcrd09944", + "cell_methods": "time: point", + "description": "Corresponding coordinate (09944) on the GEM Charney-Phillips vertical grid (https://doi.org/10.1175/MWR-D-13-00255.1). The height of variables with level 0XXXX can be inferred zcrd0XXXX - zcrd10000.", + "long_name": "Model vertical grid coordinate", + "units": "m" + }, + "RDRS_v2.1_P_GZ_SFC": { + "_cf_variable_name": "zcrd10000", + "cell_methods": "time: point", + "description": "Surface coordinate (10000) on the GEM Charney-Phillips vertical grid (https://doi.org/10.1175/MWR-D-13-00255.1). The height of variables with level 0XXXX can be inferred zcrd0XXXX - zcrd10000.", + "long_name": "Model vertical grid coordinate at the surface", + "units": "m" + }, + "RDRS_v2.1_P_HR_09944": { + "_cf_variable_name": "hur", + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "40 metre Relative Humidity (height is approximate : see description)", + "standard_name": "relative_humidity", + "units": "%" + }, + "RDRS_v2.1_P_HR_1.5m": { + "_cf_variable_name": "hurs", + "cell_methods": "time: point", + "long_name": "Near-Surface Relative Humidity (1.5m)", + "standard_name": "relative_humidity", + "units": "%" + }, + "RDRS_v2.1_P_HU_09944": { + "_cf_variable_name": "hus", + "_corrected_units": { + "rdrs-v21": "1" + }, + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "40 metre Specific Humidity (height is approximate : see description)", + "standard_name": "specific_humidity", + "units": "1" + }, + "RDRS_v2.1_P_HU_1.5m": { + "_cf_variable_name": "huss", + "_corrected_units": { + "rdrs-v21": "1" + }, + "cell_methods": "time: point", + "long_name": "Near-Surface Specific Humidity (1.5m)", + "standard_name": "specific_humidity", + "units": "1" + }, + "RDRS_v2.1_P_P0_SFC": { + "_cf_variable_name": "ps", + "_corrected_units": { + "rdrs-v21": "mbar" + }, + "cell_methods": "time: point", + "long_name": "Surface Air Pressure", + "standard_name": "surface_air_pressure", + "units": "Pa" + }, + "RDRS_v2.1_P_PN_SFC": { + "_cf_variable_name": "psl", + "cell_methods": "time: point", + "corrected_units": { + "rdrs-v21": "mbar" + }, + "long_name": "Sea Level Pressure", + "standard_name": "air_pressure_at_sea_level", + "units": "Pa" + }, + "RDRS_v2.1_P_PR0_SFC": { + "_cf_variable_name": "prmod", + "_transformation": { + "rdrs-v21": "amount2rate" + }, + "cell_methods": "time: mean (interval: 1 hour)", + "comments": "Converted from Total Precipitation using a density of 1000 kg/m³.", + "description": "This field differs from the 'pr' variable. It is the model background field subsequently used by CaPA to produce 'pr'.", + "long_name": "Precipitation", + "standard_name": "precipitation_flux", + "units": "kg m-2 s-1" + }, + "RDRS_v2.1_P_TD_09944": { + "_cf_variable_name": "tdp", + "_corrected_units": { + "rdrs-v21": "degC" + }, + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "40 metre dewpoint temperature (height is approximate : see description)", + "standard_name": "dew_point_temperature", + "units": "K" + }, + "RDRS_v2.1_P_TD_1.5m": { + "_cf_variable_name": "tdps", + "_corrected_units": { + "rdrs-v21": "degC" + }, + "cell_methods": "time: point", + "long_name": "1.5 metre dewpoint temperature", + "standard_name": "dew_point_temperature", + "units": "K" + }, + "RDRS_v2.1_P_TT_09944": { + "_cf_variable_name": "ta", + "_corrected_units": { + "rdrs-v21": "degC" + }, + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "40 metre temperature. Height is approximate, see description.", + "standard_name": "air_temperature", + "units": "K" + }, "RDRS_v2.1_P_TT_1.5m": { "_cf_variable_name": "tas", "_corrected_units": { "rdrs-v21": "degC" }, - "_invert_sign": {}, - "_offset_time": {}, - "_transformation": {}, "cell_methods": "time: point", "long_name": "1.5 metre temperature", "standard_name": "air_temperature", "units": "K" + }, + "RDRS_v2.1_P_UUC_09944": { + "_cf_variable_name": "ua", + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "Eastward Wind (40m). Height is approximate, see description.", + "standard_name": "eastward_wind", + "units": "m s-1" + }, + "RDRS_v2.1_P_UUC_10m": { + "_cf_variable_name": "uas", + "cell_methods": "time: point", + "long_name": "Eastward Near-Surface Wind (10m)", + "standard_name": "eastward_wind", + "units": "m s-1" + }, + "RDRS_v2.1_P_UVC_09944": { + "_cf_variable_name": "40mWind", + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "Wind Speed (40m). Height is approximate, see description.", + "standard_name": "wind_speed", + "units": "m s-1" + }, + "RDRS_v2.1_P_UVC_10m": { + "_cf_variable_name": "sfcWind", + "cell_methods": "time: point", + "long_name": "Near-Surface Wind Speed (10m)", + "standard_name": "wind_speed", + "units": "m s-1" + }, + "RDRS_v2.1_P_VVC_09944": { + "_cf_variable_name": "va", + "cell_methods": "time: point", + "description": "The approximate 40 metre level is more specifically 99.44% of the atmosphere based on pressure elevation, where 100% is the surface. \nThe true height needs to be inferred using the corresponding fields on the Charney-Phillips vertical grid (zcrd09944 - zcrd1000) (https://doi.org/10.1175/MWR-D-13-00255.1)", + "long_name": "Northward Wind (40m). Height is approximate, see description.", + "standard_name": "northward_wind", + "units": "m s-1" + }, + "RDRS_v2.1_P_VVC_10m": { + "_cf_variable_name": "vas", + "cell_methods": "time: point", + "long_name": "Northward Near-Surface Wind (10m)", + "standard_name": "northward_wind", + "units": "m s-1" + }, + "RDRS_v2.1_P_WDC_10m": { + "_cf_variable_name": "winddir", + "cell_methods": "time: point", + "long_name": "Near-Surface Meteorological Wind Direction (10m)", + "standard_name": "wind_from_direction", + "units": "deg" } } } diff --git a/miranda/convert/eccc_rdrs.py b/miranda/convert/eccc_rdrs.py index 628c216a..afc34f84 100644 --- a/miranda/convert/eccc_rdrs.py +++ b/miranda/convert/eccc_rdrs.py @@ -38,6 +38,7 @@ def convert_rdrs( output_format: str = "zarr", working_folder: str | os.PathLike | None = None, overwrite: bool = False, + cfvariable_list: list | None = None, **dask_kwargs, ) -> None: r"""Convert RDRS dataset. @@ -50,6 +51,7 @@ def convert_rdrs( output_format : {"netcdf", "zarr"} working_folder : str or os.PathLike, optional overwrite : bool + cfvariable_list : list, optional \*\*dask_kwargs Returns @@ -58,6 +60,12 @@ def convert_rdrs( """ # TODO: This setup configuration is near-universally portable. Should we consider applying it to all conversions? var_attrs = load_json_data_mappings(project=project)["variables"] + if cfvariable_list: + var_attrs = { + v: var_attrs[v] + for v in var_attrs + if var_attrs[v]["_cf_variable_name"] in cfvariable_list + } freq_dict = dict(h="hr", d="day") if isinstance(input_folder, str): @@ -114,7 +122,7 @@ def convert_rdrs( output_folder=output_folder.joinpath(out_freq), temp_folder=working_folder, output_format=output_format, - overwrite=False, + overwrite=overwrite, chunks=chunks, **dask_kwargs, ) diff --git a/miranda/io/_output.py b/miranda/io/_output.py index af610cc0..c6ae925f 100644 --- a/miranda/io/_output.py +++ b/miranda/io/_output.py @@ -138,7 +138,7 @@ def write_dataset_dict( if not outpath.exists() or overwrite: outpath.parent.mkdir(parents=True, exist_ok=True) if outpath.exists(): - shutil.rmtree(outfile) + shutil.rmtree(outpath) tmp_path = None if temp_folder: @@ -202,6 +202,8 @@ def concat_rechunk_zarr( out_zarr = output_folder.joinpath(f"{out_stem}_{start_year}_{end_year}.zarr") if not out_zarr.exists() or overwrite: + if out_zarr.exists(): + shutil.rmtree(out_zarr) # maketemp files 1 zarr per 4 years years = [y for y in range(int(start_year), int(end_year) + 1)] years = [years[x : x + 4] for x in range(0, len(years), 4)] @@ -232,7 +234,6 @@ def concat_rechunk_zarr( tmp_zarr = tmp_folder.joinpath( f"{out_zarr.stem.split(f'_{start_year}_')[0]}_{year[0]}-{year[-1]}.zarr", ) - tmp_zarr.parent.mkdir(exist_ok=True, parents=True) logging.info(f"Writing year {year} to {tmp_zarr.as_posix()}.") job = delayed_write( @@ -246,17 +247,22 @@ def concat_rechunk_zarr( with Client(**dask_kwargs): dask.compute(job) - # get tmp zarrs - list_zarr = sorted(list(tmp_folder.glob("*zarr"))) - ds = xr.open_mfdataset(list_zarr, engine="zarr") - # FIXME: Client is only needed for computation. Should be elsewhere. + # write to final + tmpzarrlist = sorted(list(tmp_zarr.parent.glob("*.zarr"))) + del ds + ds = xr.open_mfdataset( + tmpzarrlist, engine="zarr", parallel=True, combine="by_coords" + ) + zarr_kwargs = None job = delayed_write( ds=ds, outfile=out_zarr, output_format="zarr", target_chunks=chunks, - overwrite=overwrite, - ) # kwargs=zarr_kwargs) + overwrite=False, + encode=False, + kwargs=zarr_kwargs, + ) with Client(**dask_kwargs): dask.compute(job) shutil.rmtree(tmp_folder) diff --git a/miranda/io/utils.py b/miranda/io/utils.py index 4b97a5c8..63bb46f9 100644 --- a/miranda/io/utils.py +++ b/miranda/io/utils.py @@ -155,7 +155,9 @@ def delayed_write( outfile: str | os.PathLike, output_format: str, overwrite: bool, + encode: bool = True, target_chunks: dict | None = None, + kwargs: dict | None = None, ) -> dask.delayed: """Stage a Dataset writing job using `dask.delayed` objects. @@ -166,13 +168,16 @@ def delayed_write( target_chunks : dict output_format : {"netcdf", "zarr"} overwrite : bool + encode : bool + kwargs : dict Returns ------- dask.delayed.delayed """ # Set correct chunks in encoding options - kwargs = dict() + if not kwargs: + kwargs = dict() kwargs["encoding"] = dict() try: for name, da in ds.data_vars.items(): @@ -194,15 +199,17 @@ def delayed_write( kwargs["mode"] = "a" elif output_format == "zarr": ds = ds.chunk(target_chunks) - kwargs["encoding"][name] = { - "chunks": chunks, - "compressor": zarr.Blosc(), - } + # if encode: + if "append_dim" not in kwargs.keys(): + kwargs["encoding"][name] = { + "chunks": chunks, + } kwargs["compute"] = False if overwrite: kwargs["mode"] = "w" if kwargs["encoding"]: - kwargs["encoding"]["time"] = {"dtype": "int32"} + if "append_dim" not in kwargs.keys(): + kwargs["encoding"]["time"] = {"dtype": "int32"} except KeyError: logging.error("Unable to encode chunks. Verify dataset.") @@ -302,7 +309,8 @@ def get_chunks_on_disk(file: os.PathLike | str) -> dict: for v in ds.variables: chunks[v] = dict() for ii, dim in enumerate(ds[v].dimensions): - chunks[v][dim] = ds[v].chunking()[ii] + if ds[v].chunking(): + chunks[v][dim] = ds[v].chunking()[ii] elif file.suffix.lower() == "zarr" and file.is_dir(): with zarr.open(file, "r") as ds: # noqa for v in ds.arrays(): diff --git a/miranda/validators.py b/miranda/validators.py index 893864d9..52e58c28 100644 --- a/miranda/validators.py +++ b/miranda/validators.py @@ -79,12 +79,12 @@ Optional("member"): str, Optional("variable"): str, Optional("timedelta"): Or(pd.Timedelta, NaTType, "NaT"), - Optional("date"): Or(Regex(BASIC_DT_VALIDATION, flags=re.I), "fx"), + Optional("date"): Or(Regex(BASIC_DT_VALIDATION, flags=int(re.I)), "fx"), Optional("date_start"): Or( - Regex(DATE_VALIDATION, flags=re.I), NaTType, "NaT" + Regex(DATE_VALIDATION, flags=int(re.I)), NaTType, "NaT" ), Optional("date_end"): Or( - Regex(DATE_VALIDATION, flags=re.I), NaTType, "NaT" + Regex(DATE_VALIDATION, flags=int(re.I)), NaTType, "NaT" ), Optional("processing_level"): Or(*PROCESSING_LEVELS), "format": Or("netcdf", "zarr"), diff --git a/templates/eccc_rdrs_processing.py b/templates/eccc_rdrs_processing.py index c7563157..1d68da11 100644 --- a/templates/eccc_rdrs_processing.py +++ b/templates/eccc_rdrs_processing.py @@ -6,14 +6,16 @@ def main(): + vars_to_process = ["tas", "pr"] + home = Path("~").expanduser() dask_dir = home.joinpath("tmpout", "dask") dask_dir.mkdir(parents=True, exist_ok=True) dask_kwargs = dict( - n_workers=8, - threads_per_worker=4, - memory_limit="7GB", - dashboard_address=8998, + n_workers=6, + threads_per_worker=6, + memory_limit="12GB", + dashboard_address=8991, local_directory=dask_dir, silence_logs=logging.ERROR, ) @@ -25,6 +27,8 @@ def main(): output_folder=Path(home).joinpath("RDRS_v21", "tmp/ECCC/RDRS_v21/NAM"), output_format="zarr", working_folder=Path(home).joinpath("tmpout", "rdrs"), + cfvariable_list=vars_to_process, + overwrite=False, **dask_kwargs, ) @@ -36,13 +40,16 @@ def main(): overwrite=False, year_start=None, year_end=None, - process_variables=None, + process_variables=vars_to_process, **dask_kwargs, ) + # # for freq in ["day", "1hr"]: infolder = Path(home).joinpath("RDRS_v21", f"tmp/ECCC/RDRS_v21/NAM/{freq}") - for variable in [v for v in infolder.glob("*") if v.is_dir()]: + for variable in [ + v for v in infolder.glob("*") if v.is_dir() and v.name in vars_to_process + ]: concat_rechunk_zarr( project=project, freq=freq, @@ -51,6 +58,7 @@ def main(): "RDRS_v21", f"converted/ECCC/RDRS_v21/NAM/{freq}/{variable.name}" ), overwrite=False, + **dask_kwargs, )