From 946b0c8b2aad4e886412ff9d69db3e076abb20c1 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Wed, 23 Oct 2024 18:28:25 +0200 Subject: [PATCH 1/7] dask and asv_runner to the asv.conf file and bench example with dask --- benchmarks/asv.conf.json | 3 + benchmarks/benchmarks/preprocessing_counts.py | 2 +- .../benchmarks/preprocessing_counts_dask.py | 209 ++++++++++++++++++ 3 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 benchmarks/benchmarks/preprocessing_counts_dask.py diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index 98192b3725..b134476efa 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -86,6 +86,9 @@ "pooch": [""], "scikit-image": [""], // "scikit-misc": [""], + "scikit-learn": [""], + "pip+asv_runner": [""], + "dask": [""] }, // Combinations of libraries/python versions can be excluded/included diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index 587b441e1c..aaa56d755b 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,7 +32,7 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc68k_reduced", "pbmc3k"], + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py new file mode 100644 index 0000000000..4af1e9eeee --- /dev/null +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -0,0 +1,209 @@ +""" +This module will benchmark preprocessing operations in Scanpy that run on counts, +with both Dask and non-Dask implementations. +API documentation: https://scanpy.readthedocs.io/en/stable/api/preprocessing.html +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import dask.array as dd +from dask.distributed import Client, LocalCluster +import scanpy as sc + +from ._utils import get_count_dataset + +if TYPE_CHECKING: + from anndata import AnnData + from ._utils import Dataset, KeyCount + +# Setup variables + +adata: AnnData +batch_key: str | None + + +def setup(dataset: Dataset, layer: KeyCount, *_): + """Setup global variables before each benchmark.""" + global adata, batch_key + adata, batch_key = get_count_dataset(dataset, layer=layer) + assert "log1p" not in adata.uns + + +# Dask Setup for Dask-based benchmarks +def setup_dask_cluster(): + """Set up a local Dask cluster for benchmarking.""" + cluster = LocalCluster(n_workers=6, threads_per_worker=2) + client = Client(cluster) + return client + + +# ASV suite + +params: tuple[list[Dataset], list[KeyCount]] = ( + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc68k_reduced", "pbmc3k"], + ["counts", "counts-off-axis"], +) +param_names = ["dataset", "layer"] + +# ### Non-Dask Benchmarks ### + +# def time_filter_cells(*_): +# sc.pp.filter_cells(adata, min_genes=100) + + +# def peakmem_filter_cells(*_): +# sc.pp.filter_cells(adata, min_genes=100) + + +# def time_filter_genes(*_): +# sc.pp.filter_genes(adata, min_cells=3) + + +# def peakmem_filter_genes(*_): +# sc.pp.filter_genes(adata, min_cells=3) + + +# def time_scrublet(*_): +# sc.pp.scrublet(adata, batch_key=batch_key) + + +# def peakmem_scrublet(*_): +# sc.pp.scrublet(adata, batch_key=batch_key) + + +### Dask-Based Benchmarks ### + +def time_filter_cells_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + adata.X = adata.X.persist() + client.rebalance() + sc.pp.filter_cells(adata, min_genes=100) + finally: + client.close() + + +def peakmem_filter_cells_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_cells(adata, min_genes=100) + finally: + client.close() + + +def time_filter_genes_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_genes(adata, min_cells=3) + finally: + client.close() + + +def peakmem_filter_genes_dask(*_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.filter_genes(adata, min_cells=3) + finally: + client.close() + + +### Suite for Dask and Non-Dask Operations ### + +class FastSuite: + """Suite for fast preprocessing operations.""" + + params: tuple[list[Dataset], list[KeyCount]] = ( + ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced"],#, "bmmc", "lung93k"], + ["counts", "counts-off-axis"], + ) + param_names = ["dataset", "layer"] + + ### Non-Dask Versions ### + + def time_calculate_qc_metrics(self, *_): + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + def peakmem_calculate_qc_metrics(self, *_): + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + def time_normalize_total(self, *_): + sc.pp.normalize_total(adata, target_sum=1e4) + + def peakmem_normalize_total(self, *_): + sc.pp.normalize_total(adata, target_sum=1e4) + + def time_log1p(self, *_): + adata.uns.pop("log1p", None) + sc.pp.log1p(adata) + + def peakmem_log1p(self, *_): + adata.uns.pop("log1p", None) + sc.pp.log1p(adata) + + ### Dask Versions ### + + def time_calculate_qc_metrics_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + finally: + client.close() + + def peakmem_calculate_qc_metrics_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + finally: + client.close() + + def time_normalize_total_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.normalize_total(adata, target_sum=1e4) + finally: + client.close() + + def peakmem_normalize_total_dask(self, *_): + client = setup_dask_cluster() + try: + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.normalize_total(adata, target_sum=1e4) + finally: + client.close() + + def time_log1p_dask(self, *_): + client = setup_dask_cluster() + try: + adata.uns.pop("log1p", None) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.log1p(adata) + finally: + client.close() + + def peakmem_log1p_dask(self, *_): + client = setup_dask_cluster() + try: + adata.uns.pop("log1p", None) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + sc.pp.log1p(adata) + finally: + client.close() From d932e593af2c235f8d5a04a660c5429e30c3ec87 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 24 Oct 2024 17:27:01 +0200 Subject: [PATCH 2/7] peakmem_log1p better w/ Dask --- benchmarks/benchmarks/preprocessing_counts.py | 6 +- .../benchmarks/preprocessing_counts_dask.py | 78 ++++--------------- 2 files changed, 18 insertions(+), 66 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index aaa56d755b..41dc686f5a 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,7 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -78,7 +79,8 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 4af1e9eeee..6af1f14865 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -34,7 +34,7 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # Dask Setup for Dask-based benchmarks def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" - cluster = LocalCluster(n_workers=6, threads_per_worker=2) + cluster = LocalCluster(n_workers=4, threads_per_worker=2) client = Client(cluster) return client @@ -42,38 +42,12 @@ def setup_dask_cluster(): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - # ["pbmc68k_reduced", "pbmc3k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] -# ### Non-Dask Benchmarks ### - -# def time_filter_cells(*_): -# sc.pp.filter_cells(adata, min_genes=100) - - -# def peakmem_filter_cells(*_): -# sc.pp.filter_cells(adata, min_genes=100) - - -# def time_filter_genes(*_): -# sc.pp.filter_genes(adata, min_cells=3) - - -# def peakmem_filter_genes(*_): -# sc.pp.filter_genes(adata, min_cells=3) - - -# def time_scrublet(*_): -# sc.pp.scrublet(adata, batch_key=batch_key) - - -# def peakmem_scrublet(*_): -# sc.pp.scrublet(adata, batch_key=batch_key) - - ### Dask-Based Benchmarks ### def time_filter_cells_dask(*_): @@ -90,7 +64,7 @@ def time_filter_cells_dask(*_): def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.filter_cells(adata, min_genes=100) finally: client.close() @@ -120,54 +94,30 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - # ["pbmc3k", "pbmc68k_reduced"],#, "bmmc", "lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["lung93k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] - ### Non-Dask Versions ### - - def time_calculate_qc_metrics(self, *_): - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) - - def peakmem_calculate_qc_metrics(self, *_): - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) - - def time_normalize_total(self, *_): - sc.pp.normalize_total(adata, target_sum=1e4) - - def peakmem_normalize_total(self, *_): - sc.pp.normalize_total(adata, target_sum=1e4) - - def time_log1p(self, *_): - adata.uns.pop("log1p", None) - sc.pp.log1p(adata) - - def peakmem_log1p(self, *_): - adata.uns.pop("log1p", None) - sc.pp.log1p(adata) - ### Dask Versions ### - def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + print(f"Dask Array Shape: {adata.X.shape}") + print(f"Dask Array Type: {type(adata.X)}") sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() + def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) @@ -177,7 +127,7 @@ def peakmem_calculate_qc_metrics_dask(self, *_): def time_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.normalize_total(adata, target_sum=1e4) finally: client.close() @@ -203,7 +153,7 @@ def peakmem_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100)) sc.pp.log1p(adata) finally: - client.close() + client.close() \ No newline at end of file From 1909f4c3d4f478c4cfcf406899029e184fefdef0 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 24 Oct 2024 18:20:02 +0200 Subject: [PATCH 3/7] draft benchmarks with dask --- .../benchmarks/preprocessing_counts_dask.py | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 6af1f14865..34ff869012 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -11,6 +11,7 @@ import dask.array as dd from dask.distributed import Client, LocalCluster import scanpy as sc +from scipy import sparse from ._utils import get_count_dataset @@ -50,17 +51,32 @@ def setup_dask_cluster(): ### Dask-Based Benchmarks ### +# def time_filter_cells_dask(*_): +# client = setup_dask_cluster() +# try: +# adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) +# adata.X = adata.X.persist() +# client.rebalance() +# sc.pp.filter_cells(adata, min_genes=100) +# finally: +# client.close() + def time_filter_cells_dask(*_): client = setup_dask_cluster() try: adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.persist() - client.rebalance() + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert to sparse chunks + + # If Scanpy requires dense arrays, convert to dense + adata.X = adata.X.map_blocks(lambda x: x.toarray(), dtype=adata.X.dtype, meta=np.array([])) + + # Rechunk after conversion + adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + sc.pp.filter_cells(adata, min_genes=100) finally: client.close() - def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: @@ -104,15 +120,31 @@ class FastSuite: def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: + # Use sparse chunks adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - print(f"Dask Array Shape: {adata.X.shape}") - print(f"Dask Array Type: {type(adata.X)}") + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert dense to sparse chunks + + # Rechunk if necessary + adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() + # def time_calculate_qc_metrics_dask(self, *_): + # client = setup_dask_cluster() + # try: + # adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + # print(f"Dask Array Shape: {adata.X.shape}") + # print(f"Dask Array Type: {type(adata.X)}") + # sc.pp.calculate_qc_metrics( + # adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + # ) + # finally: + # client.close() + def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() From 5494efd93d9ef073f2aacadbaf365af8de6713aa Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 7 Nov 2024 16:13:50 +0100 Subject: [PATCH 4/7] add larger synthetic dataset --- benchmarks/benchmarks/_utils.py | 38 ++++++++++ benchmarks/benchmarks/preprocessing_counts.py | 8 +- .../benchmarks/preprocessing_counts_dask.py | 76 ++++--------------- 3 files changed, 56 insertions(+), 66 deletions(-) diff --git a/benchmarks/benchmarks/_utils.py b/benchmarks/benchmarks/_utils.py index 810ace74fd..e6833aeaad 100644 --- a/benchmarks/benchmarks/_utils.py +++ b/benchmarks/benchmarks/_utils.py @@ -117,6 +117,42 @@ def lung93k() -> AnnData: return _lung93k().copy() +@cache +def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: + """ + Generate a synthetic dataset suitable for Dask testing. + + Parameters: + n_obs: int + Number of observations (rows, typically cells). + n_vars: int + Number of variables (columns, typically genes). + density: float + Fraction of non-zero entries in the sparse matrix. + + Returns: + AnnData + The synthetic dataset. + """ + + X = sparse.random(n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42) + obs = {"obs_names": [f"cell_{i}" for i in range(n_obs)]} + var = {"var_names": [f"gene_{j}" for j in range(n_vars)]} + adata = anndata.AnnData(X=X, obs=obs, var=var) + adata.layers["counts"] = X.copy() + sc.pp.log1p(adata) + adata.var["mt"] = adata.var_names.str.startswith("MT-") + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) + + return adata + + +def large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: + return _large_synthetic_dataset(n_obs, n_vars, density).copy() + + def to_off_axis(x: np.ndarray | sparse.csr_matrix) -> np.ndarray | sparse.csc_matrix: if isinstance(x, sparse.csr_matrix): return x.tocsc() @@ -138,6 +174,8 @@ def _get_dataset_raw(dataset: Dataset) -> tuple[AnnData, str | None]: adata, batch_key = bmmc(400), "sample" case "lung93k": adata, batch_key = lung93k(), "PatientNumber" + case "large_synthetic": + adata, batch_key = large_synthetic_dataset(), None case _: msg = f"Unknown dataset {dataset}" raise AssertionError(msg) diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index 41dc686f5a..6521d258f0 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,8 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["lung93k"], - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], + ["pbmc3k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -79,8 +79,8 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], + ["pbmc3k"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 34ff869012..ab91d15993 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -1,13 +1,6 @@ -""" -This module will benchmark preprocessing operations in Scanpy that run on counts, -with both Dask and non-Dask implementations. -API documentation: https://scanpy.readthedocs.io/en/stable/api/preprocessing.html -""" - from __future__ import annotations from typing import TYPE_CHECKING - import dask.array as dd from dask.distributed import Client, LocalCluster import scanpy as sc @@ -19,12 +12,10 @@ from anndata import AnnData from ._utils import Dataset, KeyCount -# Setup variables - +# Setup global variables adata: AnnData batch_key: str | None - def setup(dataset: Dataset, layer: KeyCount, *_): """Setup global variables before each benchmark.""" global adata, batch_key @@ -32,7 +23,6 @@ def setup(dataset: Dataset, layer: KeyCount, *_): assert "log1p" not in adata.uns -# Dask Setup for Dask-based benchmarks def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" cluster = LocalCluster(n_workers=4, threads_per_worker=2) @@ -41,42 +31,24 @@ def setup_dask_cluster(): # ASV suite - params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + ["pbmc3k"], # Extend with larger datasets as needed ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] ### Dask-Based Benchmarks ### -# def time_filter_cells_dask(*_): -# client = setup_dask_cluster() -# try: -# adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) -# adata.X = adata.X.persist() -# client.rebalance() -# sc.pp.filter_cells(adata, min_genes=100) -# finally: -# client.close() - def time_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert to sparse chunks - - # If Scanpy requires dense arrays, convert to dense - adata.X = adata.X.map_blocks(lambda x: x.toarray(), dtype=adata.X.dtype, meta=np.array([])) - - # Rechunk after conversion - adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) - + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + adata.X = adata.X.map_blocks(sparse.csr_matrix) # Ensure sparse chunks sc.pp.filter_cells(adata, min_genes=100) finally: client.close() + def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: @@ -89,7 +61,7 @@ def peakmem_filter_cells_dask(*_): def time_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.filter_genes(adata, min_cells=3) finally: client.close() @@ -98,58 +70,38 @@ def time_filter_genes_dask(*_): def peakmem_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.filter_genes(adata, min_cells=3) finally: client.close() -### Suite for Dask and Non-Dask Operations ### +### General Dask and Non-Dask Preprocessing Benchmarks ### class FastSuite: - """Suite for fast preprocessing operations.""" + """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k"], - ["lung93k"], + ["pbmc3k"], # Extend as needed with larger datasets ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] - ### Dask Versions ### def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - # Use sparse chunks adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) # Convert dense to sparse chunks - - # Rechunk if necessary - adata.X = adata.X.rechunk((adata.X.shape[0] // 50, adata.X.shape[1] // 50)) - + adata.X = adata.X.map_blocks(sparse.csr_matrix) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) finally: client.close() - # def time_calculate_qc_metrics_dask(self, *_): - # client = setup_dask_cluster() - # try: - # adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - # print(f"Dask Array Shape: {adata.X.shape}") - # print(f"Dask Array Type: {type(adata.X)}") - # sc.pp.calculate_qc_metrics( - # adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - # ) - # finally: - # client.close() - - def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) sc.pp.calculate_qc_metrics( adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True ) @@ -176,7 +128,7 @@ def time_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) sc.pp.log1p(adata) finally: client.close() @@ -188,4 +140,4 @@ def peakmem_log1p_dask(self, *_): adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100)) sc.pp.log1p(adata) finally: - client.close() \ No newline at end of file + client.close() From ca9b8f671ae4403a89d775707498c7422e0384a3 Mon Sep 17 00:00:00 2001 From: mikelkou Date: Thu, 7 Nov 2024 16:15:36 +0100 Subject: [PATCH 5/7] Update preprocessing_counts_dask.py --- benchmarks/benchmarks/preprocessing_counts_dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index ab91d15993..c28f984d00 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -32,7 +32,7 @@ def setup_dask_cluster(): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k"], # Extend with larger datasets as needed + ["pbmc68k_reduced"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -82,7 +82,7 @@ class FastSuite: """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k"], # Extend as needed with larger datasets + ["pbmc68k_reduced"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] From 0482e7514d6530e362cacd2b743a2d7744177503 Mon Sep 17 00:00:00 2001 From: Mikaela Koutrouli Date: Tue, 4 Feb 2025 17:10:43 -0800 Subject: [PATCH 6/7] Changes by Mikaela --- benchmarks/asv.conf.json | 5 +- benchmarks/benchmarks/_utils.py | 19 +++++ benchmarks/benchmarks/preprocessing_counts.py | 8 +- .../benchmarks/preprocessing_counts_dask.py | 74 +++++++++++++------ 4 files changed, 78 insertions(+), 28 deletions(-) diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index b134476efa..e2e260a7ec 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -2,6 +2,8 @@ // The version of the config file format. Do not change, unless // you know what you are doing. "version": 1, + "timeout": 1200, + "install_timeout": 1200, // The name of the project being benchmarked "project": "scanpy", @@ -43,11 +45,12 @@ // If missing or the empty string, the tool will be automatically // determined by looking for tools on the PATH environment // variable. + "timeout": 1200, // Timeout for each benchmark in seconds "environment_type": "conda", // timeout in seconds for installing any dependencies in environment // defaults to 10 min - //"install_timeout": 600, + "install_timeout": 1200, // the base URL to show a commit for the project. "show_commit_url": "https://github.com/scverse/scanpy/commit/", diff --git a/benchmarks/benchmarks/_utils.py b/benchmarks/benchmarks/_utils.py index e6833aeaad..1601f73dfe 100644 --- a/benchmarks/benchmarks/_utils.py +++ b/benchmarks/benchmarks/_utils.py @@ -117,6 +117,23 @@ def lung93k() -> AnnData: return _lung93k().copy() + +@cache +def _musmus_11m() -> AnnData: + # Define the path to the dataset + path = "/sc/arion/projects/psychAD/mikaela/scanpy/scanpy/benchmarks/data/MusMus_4M_cells_cellxgene.h5ad" + adata = sc.read_h5ad(path) + #assert isinstance(adata.X, sparse.csr_matrix) + # Add counts layer + #adata.layers["counts"] = adata.X.astype(np.int32, copy=True) + sc.pp.log1p(adata) + return adata + + +def musmus_11m() -> AnnData: + return _musmus_11m().copy() + + @cache def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: """ @@ -176,6 +193,8 @@ def _get_dataset_raw(dataset: Dataset) -> tuple[AnnData, str | None]: adata, batch_key = lung93k(), "PatientNumber" case "large_synthetic": adata, batch_key = large_synthetic_dataset(), None + case "musmus_11m": + adata, batch_key = musmus_11m(), None case _: msg = f"Unknown dataset {dataset}" raise AssertionError(msg) diff --git a/benchmarks/benchmarks/preprocessing_counts.py b/benchmarks/benchmarks/preprocessing_counts.py index 6521d258f0..99c3a6e682 100644 --- a/benchmarks/benchmarks/preprocessing_counts.py +++ b/benchmarks/benchmarks/preprocessing_counts.py @@ -32,8 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_): # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc3k"], - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], + ["pbmc3k", "pbmc68k_reduced", "bmmc", "musmus_11m"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic", "musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -79,8 +79,8 @@ class FastSuite: """Suite for fast preprocessing operations.""" params: tuple[list[Dataset], list[KeyCount]] = ( - # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"], - ["pbmc3k"], + # ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic", "musmus_11m"], + ["pbmc3k", "pbmc68k_reduced", "bmmc", "musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index c28f984d00..4f9086f283 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -23,36 +23,57 @@ def setup(dataset: Dataset, layer: KeyCount, *_): assert "log1p" not in adata.uns +#def setup_dask_cluster(): +# """Set up a local Dask cluster for benchmarking.""" +# cluster = LocalCluster(n_workers=4, threads_per_worker=2) +# client = Client(cluster) +# return client + def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" - cluster = LocalCluster(n_workers=4, threads_per_worker=2) + cluster = LocalCluster(n_workers=5, + threads_per_worker=2, + memory_limit="60GB", + timeout="1200s") client = Client(cluster) return client # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc68k_reduced"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] ### Dask-Based Benchmarks ### - def time_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) # Ensure sparse chunks + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks).persist() sc.pp.filter_cells(adata, min_genes=100) + assert adata.n_obs > 0 # Ensure cells are retained finally: client.close() +#def time_filter_cells_dask(*_): +# client = setup_dask_cluster() +# try: +# # Compute optimal chunks based on Dask cluster +# optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) +# adata.X = dd.from_array(adata.X, chunks=optimal_chunks) +# adata.X = adata.X.persist() # Persist to avoid recomputation +# sc.pp.filter_cells(adata, min_genes=100) +# finally: +# client.close() + def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_cells(adata, min_genes=100) finally: client.close() @@ -61,7 +82,9 @@ def peakmem_filter_cells_dask(*_): def time_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) + adata.X = adata.X.persist() sc.pp.filter_genes(adata, min_cells=3) finally: client.close() @@ -70,7 +93,8 @@ def time_filter_genes_dask(*_): def peakmem_filter_genes_dask(*_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_genes(adata, min_cells=3) finally: client.close() @@ -82,7 +106,7 @@ class FastSuite: """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["pbmc68k_reduced"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -90,36 +114,37 @@ class FastSuite: def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - adata.X = adata.X.map_blocks(sparse.csr_matrix) - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) + adata.X = adata.X.persist() + sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) finally: client.close() def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10)) - sc.pp.calculate_qc_metrics( - adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True - ) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) + sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) finally: client.close() def time_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) - sc.pp.normalize_total(adata, target_sum=1e4) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) + adata.X = adata.X.map_blocks(lambda x: x / x.sum(axis=1), dtype=float) # Optimize normalization + adata.X = adata.X.persist() finally: client.close() def peakmem_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1])) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.normalize_total(adata, target_sum=1e4) finally: client.close() @@ -128,7 +153,9 @@ def time_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50)) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) + adata.X = adata.X.persist() sc.pp.log1p(adata) finally: client.close() @@ -137,7 +164,8 @@ def peakmem_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100)) + optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.log1p(adata) finally: client.close() From 6149b8b8a2d3272198d9845325dc109d1616d5a7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 5 Feb 2025 01:11:02 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- benchmarks/asv.conf.json | 8 +- benchmarks/benchmarks/_utils.py | 19 ++-- .../benchmarks/preprocessing_counts_dask.py | 86 ++++++++++++++----- 3 files changed, 80 insertions(+), 33 deletions(-) diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index e2e260a7ec..222d69b021 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -45,7 +45,7 @@ // If missing or the empty string, the tool will be automatically // determined by looking for tools on the PATH environment // variable. - "timeout": 1200, // Timeout for each benchmark in seconds + "timeout": 1200, // Timeout for each benchmark in seconds "environment_type": "conda", // timeout in seconds for installing any dependencies in environment @@ -89,9 +89,9 @@ "pooch": [""], "scikit-image": [""], // "scikit-misc": [""], - "scikit-learn": [""], - "pip+asv_runner": [""], - "dask": [""] + "scikit-learn": [""], + "pip+asv_runner": [""], + "dask": [""], }, // Combinations of libraries/python versions can be excluded/included diff --git a/benchmarks/benchmarks/_utils.py b/benchmarks/benchmarks/_utils.py index 1601f73dfe..d0b7e2547a 100644 --- a/benchmarks/benchmarks/_utils.py +++ b/benchmarks/benchmarks/_utils.py @@ -117,15 +117,14 @@ def lung93k() -> AnnData: return _lung93k().copy() - @cache def _musmus_11m() -> AnnData: # Define the path to the dataset path = "/sc/arion/projects/psychAD/mikaela/scanpy/scanpy/benchmarks/data/MusMus_4M_cells_cellxgene.h5ad" adata = sc.read_h5ad(path) - #assert isinstance(adata.X, sparse.csr_matrix) + # assert isinstance(adata.X, sparse.csr_matrix) # Add counts layer - #adata.layers["counts"] = adata.X.astype(np.int32, copy=True) + # adata.layers["counts"] = adata.X.astype(np.int32, copy=True) sc.pp.log1p(adata) return adata @@ -135,10 +134,12 @@ def musmus_11m() -> AnnData: @cache -def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: +def _large_synthetic_dataset( + n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01 +) -> AnnData: """ Generate a synthetic dataset suitable for Dask testing. - + Parameters: n_obs: int Number of observations (rows, typically cells). @@ -152,7 +153,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: The synthetic dataset. """ - X = sparse.random(n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42) + X = sparse.random( + n_obs, n_vars, density=density, format="csr", dtype=np.float32, random_state=42 + ) obs = {"obs_names": [f"cell_{i}" for i in range(n_obs)]} var = {"var_names": [f"gene_{j}" for j in range(n_vars)]} adata = anndata.AnnData(X=X, obs=obs, var=var) @@ -166,7 +169,9 @@ def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: return adata -def large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData: +def large_synthetic_dataset( + n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01 +) -> AnnData: return _large_synthetic_dataset(n_obs, n_vars, density).copy() diff --git a/benchmarks/benchmarks/preprocessing_counts_dask.py b/benchmarks/benchmarks/preprocessing_counts_dask.py index 4f9086f283..cb6d45ea03 100644 --- a/benchmarks/benchmarks/preprocessing_counts_dask.py +++ b/benchmarks/benchmarks/preprocessing_counts_dask.py @@ -1,21 +1,24 @@ from __future__ import annotations from typing import TYPE_CHECKING + import dask.array as dd from dask.distributed import Client, LocalCluster + import scanpy as sc -from scipy import sparse from ._utils import get_count_dataset if TYPE_CHECKING: from anndata import AnnData + from ._utils import Dataset, KeyCount # Setup global variables adata: AnnData batch_key: str | None + def setup(dataset: Dataset, layer: KeyCount, *_): """Setup global variables before each benchmark.""" global adata, batch_key @@ -23,41 +26,46 @@ def setup(dataset: Dataset, layer: KeyCount, *_): assert "log1p" not in adata.uns -#def setup_dask_cluster(): +# def setup_dask_cluster(): # """Set up a local Dask cluster for benchmarking.""" # cluster = LocalCluster(n_workers=4, threads_per_worker=2) # client = Client(cluster) # return client + def setup_dask_cluster(): """Set up a local Dask cluster for benchmarking.""" - cluster = LocalCluster(n_workers=5, - threads_per_worker=2, - memory_limit="60GB", - timeout="1200s") + cluster = LocalCluster( + n_workers=5, threads_per_worker=2, memory_limit="60GB", timeout="1200s" + ) client = Client(cluster) return client # ASV suite params: tuple[list[Dataset], list[KeyCount]] = ( - ["musmus_11m"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] + ### Dask-Based Benchmarks ### def time_filter_cells_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks).persist() sc.pp.filter_cells(adata, min_genes=100) assert adata.n_obs > 0 # Ensure cells are retained finally: client.close() -#def time_filter_cells_dask(*_): + +# def time_filter_cells_dask(*_): # client = setup_dask_cluster() # try: # # Compute optimal chunks based on Dask cluster @@ -72,7 +80,10 @@ def time_filter_cells_dask(*_): def peakmem_filter_cells_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_cells(adata, min_genes=100) finally: @@ -82,7 +93,10 @@ def peakmem_filter_cells_dask(*_): def time_filter_genes_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() sc.pp.filter_genes(adata, min_cells=3) @@ -93,7 +107,10 @@ def time_filter_genes_dask(*_): def peakmem_filter_genes_dask(*_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.filter_genes(adata, min_cells=3) finally: @@ -102,11 +119,12 @@ def peakmem_filter_genes_dask(*_): ### General Dask and Non-Dask Preprocessing Benchmarks ### + class FastSuite: """Suite for benchmarking preprocessing operations with Dask.""" params: tuple[list[Dataset], list[KeyCount]] = ( - ["musmus_11m"], + ["musmus_11m"], ["counts", "counts-off-axis"], ) param_names = ["dataset", "layer"] @@ -114,28 +132,43 @@ class FastSuite: def time_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() - sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) finally: client.close() def peakmem_calculate_qc_metrics_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) - sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True) + sc.pp.calculate_qc_metrics( + adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True + ) finally: client.close() def time_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) - adata.X = adata.X.map_blocks(lambda x: x / x.sum(axis=1), dtype=float) # Optimize normalization + adata.X = adata.X.map_blocks( + lambda x: x / x.sum(axis=1), dtype=float + ) # Optimize normalization adata.X = adata.X.persist() finally: client.close() @@ -143,7 +176,10 @@ def time_normalize_total_dask(self, *_): def peakmem_normalize_total_dask(self, *_): client = setup_dask_cluster() try: - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.normalize_total(adata, target_sum=1e4) finally: @@ -153,7 +189,10 @@ def time_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) adata.X = adata.X.persist() sc.pp.log1p(adata) @@ -164,7 +203,10 @@ def peakmem_log1p_dask(self, *_): client = setup_dask_cluster() try: adata.uns.pop("log1p", None) - optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1]) + optimal_chunks = ( + adata.X.shape[0] // (4 * len(client.nthreads())), + adata.X.shape[1], + ) adata.X = dd.from_array(adata.X, chunks=optimal_chunks) sc.pp.log1p(adata) finally: