Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify scale #3351

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/release-notes/3351.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix zappy compatibility for clip_array {smaller}`P Angerer`
218 changes: 104 additions & 114 deletions src/scanpy/preprocessing/_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,32 @@
da = None

if TYPE_CHECKING:
from numpy.typing import NDArray
from typing import TypeVar

from numpy.typing import ArrayLike, NDArray

from .._utils import _CSMatrix

_A = TypeVar("_A", bound=_CSMatrix | np.ndarray | DaskArray)

@njit
def _scale_sparse_numba(indptr, indices, data, *, std, mask_obs, clip):
for i in numba.prange(len(indptr) - 1):
if mask_obs[i]:
for j in range(indptr[i], indptr[i + 1]):
if clip:
data[j] = min(clip, data[j] / std[indices[j]])
else:
data[j] /= std[indices[j]]

@singledispatch
def clip(x: ArrayLike | _A, *, max_value: float, zero_center: bool = True) -> _A:
return clip_array(x, max_value=max_value, zero_center=zero_center)


@clip.register(csr_matrix)
@clip.register(csc_matrix)
def _(x: _CSMatrix, *, max_value: float, zero_center: bool = True) -> _CSMatrix:
x.data = clip(x.data, max_value=max_value, zero_center=zero_center)
return x


@clip.register(DaskArray)
def _(x: DaskArray, *, max_value: float, zero_center: bool = True) -> DaskArray:
return x.map_blocks(
clip, max_value=max_value, zero_center=zero_center, dtype=x.dtype, meta=x._meta
)


@njit
Expand All @@ -65,27 +77,19 @@
return X


def clip_set(x: _CSMatrix, *, max_value: float, zero_center: bool = True) -> _CSMatrix:
x = x.copy()
x[x > max_value] = max_value
if zero_center:
x[x < -max_value] = -max_value
return x


@renamed_arg("X", "data", pos_0=True)
@old_positionals("zero_center", "max_value", "copy", "layer", "obsm")
@singledispatch
def scale(
data: AnnData | _CSMatrix | np.ndarray | DaskArray,
data: AnnData | _A,
*,
zero_center: bool = True,
max_value: float | None = None,
copy: bool = False,
layer: str | None = None,
obsm: str | None = None,
mask_obs: NDArray[np.bool_] | str | None = None,
) -> AnnData | _CSMatrix | np.ndarray | DaskArray | None:
) -> AnnData | _A | None:
"""\
Scale data to unit variance and zero mean.

Expand Down Expand Up @@ -144,155 +148,141 @@

@scale.register(np.ndarray)
@scale.register(DaskArray)
@scale.register(csc_matrix)
@scale.register(csr_matrix)
def scale_array(
X: np.ndarray | DaskArray,
x: _A,
*,
zero_center: bool = True,
max_value: float | None = None,
copy: bool = False,
return_mean_std: bool = False,
mask_obs: NDArray[np.bool_] | None = None,
) -> (
np.ndarray
| DaskArray
_A
| tuple[
np.ndarray | DaskArray, NDArray[np.float64] | DaskArray, NDArray[np.float64]
_A,
NDArray[np.float64] | DaskArray,
NDArray[np.float64],
]
):
if copy:
X = X.copy()
mask_obs = _check_mask(X, mask_obs, "obs")
if mask_obs is not None:
scale_rv = scale_array(
X[mask_obs, :],
zero_center=zero_center,
max_value=max_value,
copy=False,
return_mean_std=return_mean_std,
mask_obs=None,
)

if return_mean_std:
X[mask_obs, :], mean, std = scale_rv
return X, mean, std
else:
X[mask_obs, :] = scale_rv
return X
x = x.copy()

if not zero_center and max_value is not None:
logg.info( # Be careful of what? This should be more specific
"... be careful when using `max_value` without `zero_center`."
)

if np.issubdtype(X.dtype, np.integer):
if np.issubdtype(x.dtype, np.integer):
logg.info(
"... as scaling leads to float results, integer "
"input is cast to float, returning copy."
)
X = X.astype(float)
x = x.astype(float)

mean, var = _get_mean_var(X)
mask_obs = _check_mask(x, mask_obs, "obs")
if mask_obs is not None:
return scale_array_masked(
x,
mask_obs,
zero_center=zero_center,
max_value=max_value,
return_mean_std=return_mean_std,
)

mean, var = _get_mean_var(x)
std = np.sqrt(var)
std[std == 0] = 1
if zero_center:
if isinstance(X, DaskArray) and issparse(X._meta):
if isinstance(x, csr_matrix | csc_matrix) or (
isinstance(x, DaskArray) and issparse(x._meta)
):
warnings.warn(
"zero-center being used with `DaskArray` sparse chunks. "
"This can be bad if you have large chunks or intend to eventually read the whole data into memory.",
UserWarning,
)
X -= mean
x -= mean

X = axis_mul_or_truediv(
X,
x = axis_mul_or_truediv(
x,
std,
op=truediv,
out=X if isinstance(X, np.ndarray) or issparse(X) else None,
out=x if isinstance(x, np.ndarray | csr_matrix | csc_matrix) else None,
axis=1,
)

# do the clipping
if max_value is not None:
logg.debug(f"... clipping at max_value {max_value}")
if isinstance(X, DaskArray):
clip = clip_set if issparse(X._meta) else clip_array
X = X.map_blocks(clip, max_value=max_value, zero_center=zero_center)
elif issparse(X):
X.data = clip_array(X.data, max_value=max_value, zero_center=False)
else:
X = clip_array(X, max_value=max_value, zero_center=zero_center)
x = clip(x, max_value=max_value, zero_center=zero_center)
if return_mean_std:
return X, mean, std
return x, mean, std
else:
return X
return x


@scale.register(csr_matrix)
@scale.register(csc_matrix)
def scale_sparse(
X: _CSMatrix,
def scale_array_masked(
x: _A,
mask_obs: NDArray[np.bool_],
*,
zero_center: bool = True,
max_value: float | None = None,
copy: bool = False,
return_mean_std: bool = False,
mask_obs: NDArray[np.bool_] | None = None,
) -> np.ndarray | tuple[np.ndarray, NDArray[np.float64], NDArray[np.float64]]:
# need to add the following here to make inplace logic work
if zero_center:
logg.info(
"... as `zero_center=True`, sparse input is "
"densified and may lead to large memory consumption"
)
X = X.toarray()
copy = False # Since the data has been copied
return scale_array(
X,
zero_center=zero_center,
copy=copy,
max_value=max_value,
return_mean_std=return_mean_std,
) -> (
_A
| tuple[
_A,
NDArray[np.float64] | DaskArray,
NDArray[np.float64],
]
):
if isinstance(x, csr_matrix | csc_matrix) and not zero_center:
if isinstance(x, csc_matrix):
x = x.tocsr()
mean, var = _get_mean_var(x[mask_obs, :])
std = np.sqrt(var)
std[std == 0] = 1

scale_and_clip_csr(
x.indptr,
x.indices,
x.data,
std=std,
mask_obs=mask_obs,
max_value=max_value,
)
elif mask_obs is None:
return scale_array(
X,
else:
x[mask_obs, :], mean, std = scale_array(
x[mask_obs, :],
zero_center=zero_center,
copy=copy,
max_value=max_value,
return_mean_std=return_mean_std,
mask_obs=mask_obs,
return_mean_std=True,
)
else:
if isinstance(X, csc_matrix):
X = X.tocsr()
elif copy:
X = X.copy()

if mask_obs is not None:
mask_obs = _check_mask(X, mask_obs, "obs")

mean, var = _get_mean_var(X[mask_obs, :])

std = np.sqrt(var)
std[std == 0] = 1

if max_value is None:
max_value = 0

_scale_sparse_numba(
X.indptr,
X.indices,
X.data,
std=std.astype(X.dtype),
mask_obs=mask_obs,
clip=max_value,
)

if return_mean_std:
return X, mean, std
return x, mean, std
else:
return X
return x


@njit
def scale_and_clip_csr(
indptr: NDArray[np.integer],
indices: NDArray[np.integer],
data: NDArray[np.floating],
*,
std: NDArray[np.floating],
mask_obs: NDArray[np.bool_],
max_value: float | None,
) -> None:
for i in numba.prange(len(indptr) - 1):
if mask_obs[i]:
for j in range(indptr[i], indptr[i + 1]):
if max_value is not None:
data[j] = min(max_value, data[j] / std[indices[j]])

Check warning on line 283 in src/scanpy/preprocessing/_scale.py

View check run for this annotation

Codecov / codecov/patch

src/scanpy/preprocessing/_scale.py#L279-L283

Added lines #L279 - L283 were not covered by tests
else:
data[j] /= std[indices[j]]

Check warning on line 285 in src/scanpy/preprocessing/_scale.py

View check run for this annotation

Codecov / codecov/patch

src/scanpy/preprocessing/_scale.py#L285

Added line #L285 was not covered by tests


@scale.register(AnnData)
Expand Down
Loading