Skip to content

testing

Buffer

zarr.testing.buffer

NDBufferUsingTestNDArrayLike

Bases: NDBuffer

Example of a custom NDBuffer that handles MyNDArrayLike

Source code in zarr/testing/buffer.py
class NDBufferUsingTestNDArrayLike(cpu.NDBuffer):
    """Example of a custom NDBuffer that handles MyNDArrayLike"""

    @classmethod
    def create(
        cls,
        *,
        shape: Iterable[int],
        dtype: npt.DTypeLike,
        order: Literal["C", "F"] = "C",
        fill_value: Any | None = None,
    ) -> Self:
        """Overwrite `NDBuffer.create` to create an TestNDArrayLike instance"""
        ret = cls(TestNDArrayLike(shape=shape, dtype=dtype, order=order))
        if fill_value is not None:
            ret.fill(fill_value)
        return ret

    @classmethod
    def empty(
        cls,
        shape: tuple[int, ...],
        dtype: npt.DTypeLike,
        order: Literal["C", "F"] = "C",
    ) -> Self:
        return super(cpu.NDBuffer, cls).empty(shape=shape, dtype=dtype, order=order)

all_equal

all_equal(other: Any, equal_nan: bool = True) -> bool

Compare to other using np.array_equal.

Source code in zarr/core/buffer/core.py
def all_equal(self, other: Any, equal_nan: bool = True) -> bool:
    """Compare to `other` using np.array_equal."""
    if other is None:
        # Handle None fill_value for Zarr V2
        return False
    # Handle positive and negative zero by comparing bit patterns:
    if (
        np.asarray(other).dtype.kind == "f"
        and other == 0.0
        and self._data.dtype.kind not in ("U", "S", "T", "O", "V")
    ):
        _data, other = np.broadcast_arrays(self._data, np.asarray(other, self._data.dtype))
        void_dtype = "V" + str(_data.dtype.itemsize)
        return np.array_equal(_data.view(void_dtype), other.view(void_dtype))
    # use array_equal to obtain equal_nan=True functionality
    # Since fill-value is a scalar, isn't there a faster path than allocating a new array for fill value
    # every single time we have to write data?
    _data, other = np.broadcast_arrays(self._data, other)
    return np.array_equal(
        self._data,
        other,
        equal_nan=equal_nan
        if self._data.dtype.kind not in ("U", "S", "T", "O", "V")
        else False,
    )

as_ndarray_like

as_ndarray_like() -> NDArrayLike

Returns the underlying array (host or device memory) of this buffer

This will never copy data.

Returns:

  • The underlying array such as a NumPy or CuPy array.
Source code in zarr/core/buffer/core.py
def as_ndarray_like(self) -> NDArrayLike:
    """Returns the underlying array (host or device memory) of this buffer

    This will never copy data.

    Returns
    -------
        The underlying array such as a NumPy or CuPy array.
    """
    return self._data

as_numpy_array

as_numpy_array() -> NDArray[Any]

Returns the buffer as a NumPy array (host memory).

Warnings

Might have to copy data, consider using .as_ndarray_like() instead.

Returns:

  • NumPy array of this buffer (might be a data copy)
Source code in zarr/core/buffer/cpu.py
def as_numpy_array(self) -> npt.NDArray[Any]:
    """Returns the buffer as a NumPy array (host memory).

    Warnings
    --------
    Might have to copy data, consider using `.as_ndarray_like()` instead.

    Returns
    -------
        NumPy array of this buffer (might be a data copy)
    """
    return np.asanyarray(self._data)

as_scalar

as_scalar() -> ScalarType

Returns the buffer as a scalar value

Source code in zarr/core/buffer/core.py
def as_scalar(self) -> ScalarType:
    """Returns the buffer as a scalar value"""
    if self._data.size != 1:
        raise ValueError("Buffer does not contain a single scalar value")
    return cast("ScalarType", self.as_numpy_array()[()])

create classmethod

create(
    *,
    shape: Iterable[int],
    dtype: DTypeLike,
    order: Literal["C", "F"] = "C",
    fill_value: Any | None = None,
) -> Self

Overwrite NDBuffer.create to create an TestNDArrayLike instance

Source code in zarr/testing/buffer.py
@classmethod
def create(
    cls,
    *,
    shape: Iterable[int],
    dtype: npt.DTypeLike,
    order: Literal["C", "F"] = "C",
    fill_value: Any | None = None,
) -> Self:
    """Overwrite `NDBuffer.create` to create an TestNDArrayLike instance"""
    ret = cls(TestNDArrayLike(shape=shape, dtype=dtype, order=order))
    if fill_value is not None:
        ret.fill(fill_value)
    return ret

empty classmethod

empty(
    shape: tuple[int, ...],
    dtype: DTypeLike,
    order: Literal["C", "F"] = "C",
) -> Self

Create an empty buffer with the given shape, dtype, and order.

This method can be faster than NDBuffer.create because it doesn't have to initialize the memory used by the underlying ndarray-like object.

Parameters:

  • shape (tuple[int, ...]) –

    The shape of the buffer and its underlying ndarray-like object

  • dtype (DTypeLike) –

    The datatype of the buffer and its underlying ndarray-like object

  • order (Literal['C', 'F'], default: 'C' ) –

    Whether to store multi-dimensional data in row-major (C-style) or column-major (Fortran-style) order in memory.

Returns:

  • buffer

    New buffer representing a new ndarray_like object with empty data.

See Also

NDBuffer.create Create a new buffer with some initial fill value.

Source code in zarr/testing/buffer.py
@classmethod
def empty(
    cls,
    shape: tuple[int, ...],
    dtype: npt.DTypeLike,
    order: Literal["C", "F"] = "C",
) -> Self:
    return super(cpu.NDBuffer, cls).empty(shape=shape, dtype=dtype, order=order)

from_ndarray_like classmethod

from_ndarray_like(ndarray_like: NDArrayLike) -> Self

Create a new buffer of a ndarray-like object

Parameters:

Returns:

  • New buffer representing `ndarray_like`
Source code in zarr/core/buffer/core.py
@classmethod
def from_ndarray_like(cls, ndarray_like: NDArrayLike) -> Self:
    """Create a new buffer of a ndarray-like object

    Parameters
    ----------
    ndarray_like
        ndarray-like object

    Returns
    -------
        New buffer representing `ndarray_like`
    """
    return cls(ndarray_like)

from_numpy_array classmethod

from_numpy_array(array_like: ArrayLike) -> Self

Create a new buffer of Numpy array-like object

Parameters:

  • array_like (ArrayLike) –

    Object that can be coerced into a Numpy array

Returns:

  • New buffer representing `array_like`
Source code in zarr/core/buffer/cpu.py
@classmethod
def from_numpy_array(cls, array_like: npt.ArrayLike) -> Self:
    return cls.from_ndarray_like(np.asanyarray(array_like))

StoreExpectingTestBuffer

Bases: MemoryStore

Example of a custom Store that expect MyBuffer for all its non-metadata

We assume that keys containing "json" is metadata

Source code in zarr/testing/buffer.py
class StoreExpectingTestBuffer(MemoryStore):
    """Example of a custom Store that expect MyBuffer for all its non-metadata

    We assume that keys containing "json" is metadata
    """

    async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
        if "json" not in key:
            assert isinstance(value, TestBuffer)
        await super().set(key, value, byte_range)

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: tuple[int, int | None] | None = None,
    ) -> Buffer | None:
        if "json" not in key:
            assert prototype.buffer is TestBuffer
        ret = await super().get(key=key, prototype=prototype, byte_range=byte_range)
        if ret is not None:
            assert isinstance(ret, prototype.buffer)
        return ret

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = True

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_memory.py
def __eq__(self, other: object) -> bool:
    return (
        isinstance(other, type(self))
        and self._store_dict == other._store_dict
        and self.read_only == other.read_only
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_memory.py
async def clear(self) -> None:
    # docstring inherited
    self._store_dict.clear()

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_memory.py
async def delete(self, key: str) -> None:
    # docstring inherited
    self._check_writable()
    try:
        del self._store_dict[key]
    except KeyError:
        logger.debug("Key %s does not exist.", key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/abc/store.py
async def delete_dir(self, prefix: str) -> None:
    """
    Remove all keys and prefixes in the store that begin with a given prefix.
    """
    if not self.supports_deletes:
        raise NotImplementedError
    if not self.supports_listing:
        raise NotImplementedError
    self._check_writable()
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for key in self.list_prefix(prefix):
        await self.delete(key)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_memory.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    return key in self._store_dict

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: tuple[int, int | None] | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/testing/buffer.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: tuple[int, int | None] | None = None,
) -> Buffer | None:
    if "json" not in key:
        assert prototype.buffer is TestBuffer
    ret = await super().get(key=key, prototype=prototype, byte_range=byte_range)
    if ret is not None:
        assert isinstance(ret, prototype.buffer)
    return ret

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_memory.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited

    # All the key-ranges arguments goes with the same prototype
    async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None:
        return await self.get(key, prototype=prototype, byte_range=byte_range)

    return await concurrent_map(key_ranges, _get, limit=None)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_memory.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    for key in self._store_dict:
        yield key

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    prefix = prefix.rstrip("/")

    if prefix == "":
        keys_unique = {k.split("/")[0] for k in self._store_dict}
    else:
        # Our dictionary doesn't contain directory markers, but we want to include
        # a pseudo directory when there's a nested item and we're listing an
        # intermediate level.
        keys_unique = {
            key.removeprefix(prefix + "/").split("/")[0]
            for key in self._store_dict
            if key.startswith(prefix + "/") and key != prefix
        }

    for key in keys_unique:
        yield key

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    # note: we materialize all dict keys into a list here so we can mutate the dict in-place (e.g. in delete_prefix)
    for key in list(self._store_dict):
        if key.startswith(prefix):
            yield key

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(
    key: str,
    value: Buffer,
    byte_range: tuple[int, int] | None = None,
) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/testing/buffer.py
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
    if "json" not in key:
        assert isinstance(value, TestBuffer)
    await super().set(key, value, byte_range)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_memory.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    self._check_writable()
    await self._ensure_open()
    self._store_dict.setdefault(key, value)

with_read_only

with_read_only(read_only: bool = False) -> MemoryStore

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_memory.py
def with_read_only(self, read_only: bool = False) -> MemoryStore:
    # docstring inherited
    return type(self)(
        store_dict=self._store_dict,
        read_only=read_only,
    )

TestBuffer

Bases: Buffer

Example of a custom Buffer that handles ArrayLike

Source code in zarr/testing/buffer.py
class TestBuffer(cpu.Buffer):
    """Example of a custom Buffer that handles ArrayLike"""

    __test__ = False

__add__

__add__(other: Buffer) -> Self

Concatenate two buffers

Source code in zarr/core/buffer/cpu.py
def __add__(self, other: core.Buffer) -> Self:
    """Concatenate two buffers"""

    other_array = other.as_array_like()
    assert other_array.dtype == np.dtype("B")
    return self.__class__(
        np.concatenate((np.asanyarray(self._data), np.asanyarray(other_array)))
    )

as_array_like

as_array_like() -> ArrayLike

Returns the underlying array (host or device memory) of this buffer

This will never copy data.

Returns:

  • The underlying 1d array such as a NumPy or CuPy array.
Source code in zarr/core/buffer/core.py
def as_array_like(self) -> ArrayLike:
    """Returns the underlying array (host or device memory) of this buffer

    This will never copy data.

    Returns
    -------
        The underlying 1d array such as a NumPy or CuPy array.
    """
    return self._data

as_buffer_like

as_buffer_like() -> BytesLike

Returns the buffer as an object that implements the Python buffer protocol.

Notes

Might have to copy data, since the implementation uses .as_numpy_array().

Returns:

  • An object that implements the Python buffer protocol
Source code in zarr/core/buffer/core.py
def as_buffer_like(self) -> BytesLike:
    """Returns the buffer as an object that implements the Python buffer protocol.

    Notes
    -----
    Might have to copy data, since the implementation uses `.as_numpy_array()`.

    Returns
    -------
        An object that implements the Python buffer protocol
    """
    return memoryview(self.as_numpy_array())  # type: ignore[arg-type]

as_numpy_array

as_numpy_array() -> NDArray[Any]

Returns the buffer as a NumPy array (host memory).

Notes

Might have to copy data, consider using .as_array_like() instead.

Returns:

  • NumPy array of this buffer (might be a data copy)
Source code in zarr/core/buffer/cpu.py
def as_numpy_array(self) -> npt.NDArray[Any]:
    """Returns the buffer as a NumPy array (host memory).

    Notes
    -----
    Might have to copy data, consider using `.as_array_like()` instead.

    Returns
    -------
        NumPy array of this buffer (might be a data copy)
    """
    return np.asanyarray(self._data)

create_zero_length classmethod

create_zero_length() -> Self

Create an empty buffer with length zero

Returns:

  • New empty 0-length buffer
Source code in zarr/core/buffer/cpu.py
@classmethod
def create_zero_length(cls) -> Self:
    return cls(np.array([], dtype="B"))

from_array_like classmethod

from_array_like(array_like: ArrayLike) -> Self

Create a new buffer of an array-like object

Parameters:

  • array_like (ArrayLike) –

    array-like object that must be 1-dim, contiguous, and byte dtype.

Returns:

  • New buffer representing `array_like`
Source code in zarr/core/buffer/core.py
@classmethod
def from_array_like(cls, array_like: ArrayLike) -> Self:
    """Create a new buffer of an array-like object

    Parameters
    ----------
    array_like
        array-like object that must be 1-dim, contiguous, and byte dtype.

    Returns
    -------
        New buffer representing `array_like`
    """
    return cls(array_like)

from_buffer classmethod

from_buffer(buffer: Buffer) -> Self

Create a new buffer of an existing Buffer

This is useful if you want to ensure that an existing buffer is of the correct subclass of Buffer. E.g., MemoryStore uses this to return a buffer instance of the subclass specified by its BufferPrototype argument.

Typically, this only copies data if the data has to be moved between memory types, such as from host to device memory.

Parameters:

  • buffer (Buffer) –

    buffer object.

Returns:

  • A new buffer representing the content of the input buffer
Notes

Subclasses of Buffer must override this method to implement more optimal conversions that avoid copies where possible

Source code in zarr/core/buffer/cpu.py
@classmethod
def from_buffer(cls, buffer: core.Buffer) -> Self:
    """Create a new buffer of an existing Buffer

    This is useful if you want to ensure that an existing buffer is
    of the correct subclass of Buffer. E.g., MemoryStore uses this
    to return a buffer instance of the subclass specified by its
    BufferPrototype argument.

    Typically, this only copies data if the data has to be moved between
    memory types, such as from host to device memory.

    Parameters
    ----------
    buffer
        buffer object.

    Returns
    -------
        A new buffer representing the content of the input buffer

    Notes
    -----
    Subclasses of `Buffer` must override this method to implement
    more optimal conversions that avoid copies where possible
    """
    return cls.from_array_like(buffer.as_numpy_array())

from_bytes classmethod

from_bytes(bytes_like: BytesLike) -> Self

Create a new buffer of a bytes-like object (host memory)

Parameters:

  • bytes_like (BytesLike) –

    bytes-like object

Returns:

  • New buffer representing `bytes_like`
Source code in zarr/core/buffer/cpu.py
@classmethod
def from_bytes(cls, bytes_like: BytesLike) -> Self:
    """Create a new buffer of a bytes-like object (host memory)

    Parameters
    ----------
    bytes_like
        bytes-like object

    Returns
    -------
        New buffer representing `bytes_like`
    """
    return cls.from_array_like(np.frombuffer(bytes_like, dtype="B"))

to_bytes

to_bytes() -> bytes

Returns the buffer as bytes (host memory).

Warnings

Will always copy data, only use this method for small buffers such as metadata buffers. If possible, use .as_numpy_array() or .as_array_like() instead.

Returns:

  • `bytes` of this buffer (data copy)
Source code in zarr/core/buffer/core.py
def to_bytes(self) -> bytes:
    """Returns the buffer as `bytes` (host memory).

    Warnings
    --------
    Will always copy data, only use this method for small buffers such as metadata
    buffers. If possible, use `.as_numpy_array()` or `.as_array_like()` instead.

    Returns
    -------
        `bytes` of this buffer (data copy)
    """
    return bytes(self.as_numpy_array())

TestNDArrayLike

Bases: ndarray

An example of a ndarray-like class

Source code in zarr/testing/buffer.py
class TestNDArrayLike(np.ndarray):
    """An example of a ndarray-like class"""

    __test__ = False

Stateful

zarr.testing.stateful

SyncStoreWrapper

Bases: SyncMixin

Source code in zarr/testing/stateful.py
class SyncStoreWrapper(zarr.core.sync.SyncMixin):
    def __init__(self, store: Store) -> None:
        """Synchronous Store wrapper

        This class holds synchronous methods that map to async methods of Store classes.
        The synchronous wrapper is needed because hypothesis' stateful testing infra does
        not support asyncio so we redefine sync versions of the Store API.
        https://github.com/HypothesisWorks/hypothesis/issues/3712#issuecomment-1668999041
        """
        self.store = store

    @property
    def read_only(self) -> bool:
        return self.store.read_only

    def set(self, key: str, data_buffer: Buffer) -> None:
        return self._sync(self.store.set(key, data_buffer))

    def list(self) -> builtins.list[str]:
        return self._sync_iter(self.store.list())

    def get(self, key: str, prototype: BufferPrototype) -> Buffer | None:
        return self._sync(self.store.get(key, prototype=prototype))

    def get_partial_values(
        self, key_ranges: builtins.list[Any], prototype: BufferPrototype
    ) -> builtins.list[Buffer | None]:
        return self._sync(self.store.get_partial_values(prototype=prototype, key_ranges=key_ranges))

    def delete(self, path: str) -> None:
        return self._sync(self.store.delete(path))

    def is_empty(self, prefix: str) -> bool:
        return self._sync(self.store.is_empty(prefix=prefix))

    def clear(self) -> None:
        return self._sync(self.store.clear())

    def exists(self, key: str) -> bool:
        return self._sync(self.store.exists(key))

    def list_dir(self, prefix: str) -> None:
        raise NotImplementedError

    def list_prefix(self, prefix: str) -> None:
        raise NotImplementedError

    @property
    def supports_listing(self) -> bool:
        return self.store.supports_listing

    @property
    def supports_writes(self) -> bool:
        return self.store.supports_writes

    @property
    def supports_deletes(self) -> bool:
        return self.store.supports_deletes

__init__

__init__(store: Store) -> None

Synchronous Store wrapper

This class holds synchronous methods that map to async methods of Store classes. The synchronous wrapper is needed because hypothesis' stateful testing infra does not support asyncio so we redefine sync versions of the Store API. github.com/HypothesisWorks/hypothesis/issues/3712#issuecomment-1668999041

Source code in zarr/testing/stateful.py
def __init__(self, store: Store) -> None:
    """Synchronous Store wrapper

    This class holds synchronous methods that map to async methods of Store classes.
    The synchronous wrapper is needed because hypothesis' stateful testing infra does
    not support asyncio so we redefine sync versions of the Store API.
    https://github.com/HypothesisWorks/hypothesis/issues/3712#issuecomment-1668999041
    """
    self.store = store

ZarrHierarchyStateMachine

Bases: SyncMixin, RuleBasedStateMachine

This state machine models operations that modify a zarr store's hierarchy. That is, user actions that modify arrays/groups as well as list operations. It is intended to be used by external stores, and compares their results to a MemoryStore that is assumed to be perfect.

Source code in zarr/testing/stateful.py
class ZarrHierarchyStateMachine(SyncMixin, RuleBasedStateMachine):
    """
    This state machine models operations that modify a zarr store's
    hierarchy. That is, user actions that modify arrays/groups as well
    as list operations. It is intended to be used by external stores, and
    compares their results to a MemoryStore that is assumed to be perfect.
    """

    def __init__(self, store: Store) -> None:
        super().__init__()

        self.store = store

        self.model = MemoryStore()
        zarr.group(store=self.model)

        # Track state of the hierarchy, these should contain fully qualified paths
        self.all_groups: set[str] = set()
        self.all_arrays: set[str] = set()

    @initialize()
    def init_store(self) -> None:
        # This lets us reuse the fixture provided store.
        self._sync(self.store.clear())
        zarr.group(store=self.store)

    def can_add(self, path: str) -> bool:
        return path not in self.all_groups and path not in self.all_arrays

    # -------------------- store operations -----------------------
    @rule(name=node_names, data=st.data())
    def add_group(self, name: str, data: DataObject) -> None:
        # Handle possible case-insensitive file systems (e.g. MacOS)
        if isinstance(self.store, LocalStore):
            name = name.lower()
        if self.all_groups:
            parent = data.draw(st.sampled_from(sorted(self.all_groups)), label="Group parent")
        else:
            parent = ""
        path = f"{parent}/{name}".lstrip("/")
        assume(self.can_add(path))
        note(f"Adding group: path='{path}'")
        self.all_groups.add(path)
        zarr.group(store=self.store, path=path)
        zarr.group(store=self.model, path=path)

    @rule(data=st.data(), name=node_names, array_and_chunks=np_array_and_chunks())
    def add_array(
        self,
        data: DataObject,
        name: str,
        array_and_chunks: tuple[np.ndarray[Any, Any], tuple[int, ...]],
    ) -> None:
        # Handle possible case-insensitive file systems (e.g. MacOS)
        if isinstance(self.store, LocalStore):
            name = name.lower()
        array, chunks = array_and_chunks
        fill_value = data.draw(npst.from_dtype(array.dtype))
        if self.all_groups:
            parent = data.draw(st.sampled_from(sorted(self.all_groups)), label="Array parent")
        else:
            parent = ""
        # TODO: support creating deeper paths
        # TODO: support overwriting potentially by just skipping `self.can_add`
        path = f"{parent}/{name}".lstrip("/")
        assume(self.can_add(path))
        note(f"Adding array:  path='{path}'  shape={array.shape}  chunks={chunks}")
        for store in [self.store, self.model]:
            zarr.array(
                array,
                chunks=chunks,
                path=path,
                store=store,
                fill_value=fill_value,
                zarr_format=3,
                dimension_names=data.draw(
                    dimension_names(ndim=array.ndim), label="dimension names"
                ),
                # Chose bytes codec to avoid wasting time compressing the data being written
                codecs=[BytesCodec()],
            )
        self.all_arrays.add(path)

    @rule()
    @with_frequency(0.25)
    def clear(self) -> None:
        note("clearing")
        import zarr

        self._sync(self.store.clear())
        self._sync(self.model.clear())

        assert self._sync(self.store.is_empty("/"))
        assert self._sync(self.model.is_empty("/"))

        self.all_groups.clear()
        self.all_arrays.clear()

        zarr.group(store=self.store)
        zarr.group(store=self.model)

        # TODO: MemoryStore is broken?
        # assert not self._sync(self.store.is_empty("/"))
        # assert not self._sync(self.model.is_empty("/"))

    def draw_directory(self, data: DataObject) -> str:
        group_st = st.sampled_from(sorted(self.all_groups)) if self.all_groups else st.nothing()
        array_st = st.sampled_from(sorted(self.all_arrays)) if self.all_arrays else st.nothing()
        array_or_group = data.draw(st.one_of(group_st, array_st))
        if data.draw(st.booleans()) and array_or_group in self.all_arrays:
            arr = zarr.open_array(path=array_or_group, store=self.model)
            path = data.draw(
                st.one_of(
                    st.sampled_from([array_or_group]),
                    chunk_paths(ndim=arr.ndim, numblocks=arr.cdata_shape).map(
                        lambda x: f"{array_or_group}/c/"
                    ),
                )
            )
        else:
            path = array_or_group
        return path

    @precondition(lambda self: bool(self.all_groups))
    @rule(data=st.data())
    def check_list_dir(self, data: DataObject) -> None:
        path = self.draw_directory(data)
        note(f"list_dir for {path=!r}")
        # Consider .list_dir("path/to/array") for an array with a single chunk.
        # The MemoryStore model will return `"c", "zarr.json"` only if the chunk exists
        # If that chunk was deleted, then `"c"` is not returned.
        # LocalStore will not have this behaviour :/
        # There are similar consistency issues with delete_dir("/path/to/array/c/0/0")
        assume(not isinstance(self.store, LocalStore))
        model_ls = sorted(self._sync_iter(self.model.list_dir(path)))
        store_ls = sorted(self._sync_iter(self.store.list_dir(path)))
        assert model_ls == store_ls, (model_ls, store_ls)

    @precondition(lambda self: bool(self.all_arrays))
    @rule(data=st.data())
    def delete_chunk(self, data: DataObject) -> None:
        array = data.draw(st.sampled_from(sorted(self.all_arrays)))
        arr = zarr.open_array(path=array, store=self.model)
        chunk_path = data.draw(chunk_paths(ndim=arr.ndim, numblocks=arr.cdata_shape, subset=False))
        path = f"{array}/c/{chunk_path}"
        note(f"deleting chunk {path=!r}")
        self._sync(self.model.delete(path))
        self._sync(self.store.delete(path))

    @precondition(lambda self: bool(self.all_arrays))
    @rule(data=st.data())
    def check_array(self, data: DataObject) -> None:
        path = data.draw(st.sampled_from(sorted(self.all_arrays)))
        actual = zarr.open_array(self.store, path=path)[:]
        expected = zarr.open_array(self.model, path=path)[:]
        np.testing.assert_equal(actual, expected)

    @precondition(lambda self: bool(self.all_arrays))
    @rule(data=st.data())
    def overwrite_array_basic_indexing(self, data: DataObject) -> None:
        array = data.draw(st.sampled_from(sorted(self.all_arrays)))
        model_array = zarr.open_array(path=array, store=self.model)
        store_array = zarr.open_array(path=array, store=self.store)
        slicer = data.draw(basic_indices(shape=model_array.shape))
        note(f"overwriting array with basic indexer: {slicer=}")
        new_data = data.draw(
            npst.arrays(shape=np.shape(model_array[slicer]), dtype=model_array.dtype)
        )
        model_array[slicer] = new_data
        store_array[slicer] = new_data

    @precondition(lambda self: bool(self.all_arrays))
    @rule(data=st.data())
    def overwrite_array_orthogonal_indexing(self, data: DataObject) -> None:
        array = data.draw(st.sampled_from(sorted(self.all_arrays)))
        model_array = zarr.open_array(path=array, store=self.model)
        store_array = zarr.open_array(path=array, store=self.store)
        indexer, _ = data.draw(orthogonal_indices(shape=model_array.shape))
        note(f"overwriting array orthogonal {indexer=}")
        new_data = data.draw(
            npst.arrays(shape=model_array.oindex[indexer].shape, dtype=model_array.dtype)  # type: ignore[union-attr]
        )
        model_array.oindex[indexer] = new_data
        store_array.oindex[indexer] = new_data

    @precondition(lambda self: bool(self.all_arrays))
    @rule(data=st.data())
    def resize_array(self, data: DataObject) -> None:
        array = data.draw(st.sampled_from(sorted(self.all_arrays)))
        model_array = zarr.open_array(path=array, store=self.model)
        store_array = zarr.open_array(path=array, store=self.store)
        ndim = model_array.ndim
        new_shape = tuple(
            0 if oldsize == 0 else newsize
            for newsize, oldsize in zip(
                data.draw(npst.array_shapes(max_dims=ndim, min_dims=ndim, min_side=0)),
                model_array.shape,
                strict=True,
            )
        )

        note(f"resizing array from {model_array.shape} to {new_shape}")
        model_array.resize(new_shape)
        store_array.resize(new_shape)

    @precondition(lambda self: bool(self.all_arrays) or bool(self.all_groups))
    @rule(data=st.data())
    def delete_dir(self, data: DataObject) -> None:
        path = self.draw_directory(data)
        note(f"delete_dir with {path=!r}")
        self._sync(self.model.delete_dir(path))
        self._sync(self.store.delete_dir(path))

        matches = set()
        for node in self.all_groups | self.all_arrays:
            if node.startswith(path):
                matches.add(node)
        self.all_groups = self.all_groups - matches
        self.all_arrays = self.all_arrays - matches

    # @precondition(lambda self: bool(self.all_groups))
    # @precondition(lambda self: bool(self.all_arrays))
    # @rule(data=st.data())
    # def move_array(self, data):
    #     array_path = data.draw(st.sampled_from(self.all_arrays), label="Array move source")
    #     to_group = data.draw(st.sampled_from(self.all_groups), label="Array move destination")

    #     # fixme renaming to self?
    #     array_name = os.path.basename(array_path)
    #     assume(self.model.can_add(to_group, array_name))
    #     new_path = f"{to_group}/{array_name}".lstrip("/")
    #     note(f"moving array '{array_path}' -> '{new_path}'")
    #     self.model.rename(array_path, new_path)
    #     self.repo.store.rename(array_path, new_path)

    # @precondition(lambda self: len(self.all_groups) >= 2)
    # @rule(data=st.data())
    # def move_group(self, data):
    #     from_group = data.draw(st.sampled_from(self.all_groups), label="Group move source")
    #     to_group = data.draw(st.sampled_from(self.all_groups), label="Group move destination")
    #     assume(not to_group.startswith(from_group))

    #     from_group_name = os.path.basename(from_group)
    #     assume(self.model.can_add(to_group, from_group_name))
    #     # fixme renaming to self?
    #     new_path = f"{to_group}/{from_group_name}".lstrip("/")
    #     note(f"moving group '{from_group}' -> '{new_path}'")
    #     self.model.rename(from_group, new_path)
    #     self.repo.store.rename(from_group, new_path)

    @precondition(lambda self: self.store.supports_deletes)
    @precondition(lambda self: len(self.all_arrays) >= 1)
    @rule(data=st.data())
    def delete_array_using_del(self, data: DataObject) -> None:
        array_path = data.draw(
            st.sampled_from(sorted(self.all_arrays)), label="Array deletion target"
        )
        prefix, array_name = split_prefix_name(array_path)
        note(f"Deleting array '{array_path}' ({prefix=!r}, {array_name=!r}) using del")
        for store in [self.model, self.store]:
            group = zarr.open_group(path=prefix, store=store)
            group[array_name]  # check that it exists
            del group[array_name]
        self.all_arrays.remove(array_path)

    @precondition(lambda self: self.store.supports_deletes)
    @precondition(lambda self: len(self.all_groups) >= 2)  # fixme don't delete root
    @rule(data=st.data())
    def delete_group_using_del(self, data: DataObject) -> None:
        # ensure that we don't include the root group in the list of member names that we try
        # to delete
        member_names = tuple(filter(lambda v: "/" in v, sorted(self.all_groups)))
        group_path = data.draw(st.sampled_from(member_names), label="Group deletion target")
        prefix, group_name = split_prefix_name(group_path)
        note(f"Deleting group '{group_path=!r}', {prefix=!r}, {group_name=!r} using delete")
        members = zarr.open_group(store=self.model, path=group_path).members(max_depth=None)
        for _, obj in members:
            if isinstance(obj, Array):
                self.all_arrays.remove(obj.path)
            else:
                self.all_groups.remove(obj.path)
        for store in [self.store, self.model]:
            group = zarr.open_group(store=store, path=prefix)
            group[group_name]  # check that it exists
            del group[group_name]
        if group_path != "/":
            # The root group is always present
            self.all_groups.remove(group_path)

    # # --------------- assertions -----------------
    # def check_group_arrays(self, group):
    #     # note(f"Checking arrays of '{group}'")
    #     g1 = self.model.get_group(group)
    #     g2 = zarr.open_group(path=group, mode="r", store=self.repo.store)
    #     model_arrays = sorted(g1.arrays(), key=itemgetter(0))
    #     our_arrays = sorted(g2.arrays(), key=itemgetter(0))
    #     for (n1, a1), (n2, a2) in zip_longest(model_arrays, our_arrays):
    #         assert n1 == n2
    #         assert_array_equal(a1, a2)

    # def check_subgroups(self, group_path):
    #     g1 = self.model.get_group(group_path)
    #     g2 = zarr.open_group(path=group_path, mode="r", store=self.repo.store)
    #     g1_children = [name for (name, _) in g1.groups()]
    #     g2_children = [name for (name, _) in g2.groups()]
    #     # note(f"Checking {len(g1_children)} subgroups of group '{group_path}'")
    #     assert g1_children == g2_children

    # def check_list_prefix_from_group(self, group):
    #     prefix = f"meta/root/{group}"
    #     model_list = sorted(self.model.list_prefix(prefix))
    #     al_list = sorted(self.repo.store.list_prefix(prefix))
    #     # note(f"Checking {len(model_list)} keys under '{prefix}'")
    #     assert model_list == al_list

    #     prefix = f"data/root/{group}"
    #     model_list = sorted(self.model.list_prefix(prefix))
    #     al_list = sorted(self.repo.store.list_prefix(prefix))
    #     # note(f"Checking {len(model_list)} keys under '{prefix}'")
    #     assert model_list == al_list

    # @precondition(lambda self: self.model.is_persistent_session())
    # @rule(data=st.data())
    # def check_group_path(self, data):
    #     t0 = time.time()
    #     group = data.draw(st.sampled_from(self.all_groups))
    #     self.check_list_prefix_from_group(group)
    #     self.check_subgroups(group)
    #     self.check_group_arrays(group)
    #     t1 = time.time()
    #     note(f"Checks took {t1 - t0} sec.")
    @invariant()
    def check_list_prefix_from_root(self) -> None:
        model_list = self._sync_iter(self.model.list_prefix(""))
        store_list = self._sync_iter(self.store.list_prefix(""))
        note(f"Checking {len(model_list)} expected keys vs {len(store_list)} actual keys")
        assert sorted(model_list) == sorted(store_list), (
            sorted(model_list),
            sorted(store_list),
        )

        # check that our internal state matches that of the store and model
        assert all(f"{path}/zarr.json" in model_list for path in self.all_groups | self.all_arrays)
        assert all(f"{path}/zarr.json" in store_list for path in self.all_groups | self.all_arrays)

ZarrStoreStateMachine

Bases: RuleBasedStateMachine

" Zarr store state machine

This is a subclass of a Hypothesis RuleBasedStateMachine.
It is testing a framework to ensure that the state of a Zarr store matches
an expected state after a set of random operations. It contains a store
(currently, a Zarr MemoryStore) and a model, a simplified version of a
zarr store (in this case, a dict). It also contains rules which represent
actions that can be applied to a zarr store. Rules apply an action to both
the store and the model, and invariants assert that the state of the model
is equal to the state of the store. Hypothesis then generates sequences of
rules, running invariants after each rule. It raises an error if a sequence
produces discontinuity between state of the model and state of the store
(ie. an invariant is violated).
https://hypothesis.readthedocs.io/en/latest/stateful.html
Source code in zarr/testing/stateful.py
class ZarrStoreStateMachine(RuleBasedStateMachine):
    """ "
    Zarr store state machine

        This is a subclass of a Hypothesis RuleBasedStateMachine.
        It is testing a framework to ensure that the state of a Zarr store matches
        an expected state after a set of random operations. It contains a store
        (currently, a Zarr MemoryStore) and a model, a simplified version of a
        zarr store (in this case, a dict). It also contains rules which represent
        actions that can be applied to a zarr store. Rules apply an action to both
        the store and the model, and invariants assert that the state of the model
        is equal to the state of the store. Hypothesis then generates sequences of
        rules, running invariants after each rule. It raises an error if a sequence
        produces discontinuity between state of the model and state of the store
        (ie. an invariant is violated).
        https://hypothesis.readthedocs.io/en/latest/stateful.html
    """

    def __init__(self, store: Store) -> None:
        super().__init__()
        self.model: dict[str, Buffer] = {}
        self.store = SyncStoreWrapper(store)
        self.prototype = default_buffer_prototype()

    @initialize()
    def init_store(self) -> None:
        self.store.clear()

    @rule(key=zarr_keys(), data=st.binary(min_size=0, max_size=MAX_BINARY_SIZE))
    def set(self, key: str, data: bytes) -> None:
        note(f"(set) Setting {key!r} with {data!r}")
        assert not self.store.read_only
        data_buf = cpu.Buffer.from_bytes(data)
        self.store.set(key, data_buf)
        self.model[key] = data_buf

    @precondition(lambda self: len(self.model.keys()) > 0)
    @rule(key=zarr_keys(), data=st.data())
    def get(self, key: str, data: DataObject) -> None:
        key = data.draw(
            st.sampled_from(sorted(self.model.keys()))
        )  # hypothesis wants to sample from sorted list
        note("(get)")
        store_value = self.store.get(key, self.prototype)
        # to bytes here necessary because data_buf set to model in set()
        assert self.model[key] == store_value

    @rule(key=zarr_keys(), data=st.data())
    def get_invalid_zarr_keys(self, key: str, data: DataObject) -> None:
        note("(get_invalid)")
        assume(key not in self.model)
        assert self.store.get(key, self.prototype) is None

    @precondition(lambda self: len(self.model.keys()) > 0)
    @rule(data=st.data())
    def get_partial_values(self, data: DataObject) -> None:
        key_range = data.draw(
            key_ranges(keys=st.sampled_from(sorted(self.model.keys())), max_size=MAX_BINARY_SIZE)
        )
        note(f"(get partial) {key_range=}")
        obs_maybe = self.store.get_partial_values(key_range, self.prototype)
        observed = []

        for obs in obs_maybe:
            assert obs is not None
            observed.append(obs.to_bytes())

        model_vals_ls = []

        for key, byte_range in key_range:
            start = byte_range.start
            stop = byte_range.end
            model_vals_ls.append(self.model[key][start:stop])

        assert all(
            obs == exp.to_bytes() for obs, exp in zip(observed, model_vals_ls, strict=True)
        ), (
            observed,
            model_vals_ls,
        )

    @precondition(lambda self: self.store.supports_deletes)
    @precondition(lambda self: len(self.model.keys()) > 0)
    @rule(data=st.data())
    def delete(self, data: DataObject) -> None:
        key = data.draw(st.sampled_from(sorted(self.model.keys())))
        note(f"(delete) Deleting {key=}")

        self.store.delete(key)
        del self.model[key]

    @rule()
    def clear(self) -> None:
        assert not self.store.read_only
        note("(clear)")
        self.store.clear()
        self.model.clear()

        assert self.store.is_empty("")

        assert len(self.model.keys()) == len(list(self.store.list())) == 0

    @rule()
    # Local store can be non-empty when there are subdirectories but no files
    @precondition(lambda self: not isinstance(self.store.store, LocalStore))
    def is_empty(self) -> None:
        note("(is_empty)")

        # make sure they either both are or both aren't empty (same state)
        assert self.store.is_empty("") == (not self.model)

    @rule(key=zarr_keys())
    def exists(self, key: str) -> None:
        note("(exists)")

        assert self.store.exists(key) == (key in self.model)

    @invariant()
    def check_paths_equal(self) -> None:
        note("Checking that paths are equal")
        paths = sorted(self.store.list())

        assert sorted(self.model.keys()) == paths

    @invariant()
    def check_vals_equal(self) -> None:
        note("Checking values equal")
        for key, val in self.model.items():
            store_item = self.store.get(key, self.prototype)
            assert val == store_item

    @invariant()
    def check_num_zarr_keys_equal(self) -> None:
        note("check num zarr_keys equal")

        assert len(self.model) == len(list(self.store.list()))

    @invariant()
    def check_zarr_keys(self) -> None:
        keys = list(self.store.list())

        if not keys:
            assert self.store.is_empty("") is True

        else:
            assert self.store.is_empty("") is False

            for key in keys:
                assert self.store.exists(key) is True
        note("checking keys / exists / empty")

with_frequency

with_frequency(frequency: float) -> Callable[[F], F]

This needs to be deterministic for hypothesis replaying

Source code in zarr/testing/stateful.py
def with_frequency(frequency: float) -> Callable[[F], F]:
    """This needs to be deterministic for hypothesis replaying"""

    def decorator(func: F) -> F:
        counter_attr = f"__{func.__name__}_counter"

        @functools.wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            return func(*args, **kwargs)

        @precondition
        def frequency_check(f: Any) -> Any:
            if not hasattr(f, counter_attr):
                setattr(f, counter_attr, 0)

            current_count = getattr(f, counter_attr) + 1
            setattr(f, counter_attr, current_count)

            return (current_count * frequency) % 1.0 >= (1.0 - frequency)

        return cast(F, frequency_check(wrapper))

    return decorator

Store

zarr.testing.store

LatencyStore

Bases: WrapperStore[Store]

A wrapper class that takes any store class in its constructor and adds latency to the set and get methods. This can be used for performance testing.

Source code in zarr/testing/store.py
class LatencyStore(WrapperStore[Store]):
    """
    A wrapper class that takes any store class in its constructor and
    adds latency to the `set` and `get` methods. This can be used for
    performance testing.
    """

    get_latency: float
    set_latency: float

    def __init__(self, cls: Store, *, get_latency: float = 0, set_latency: float = 0) -> None:
        self.get_latency = float(get_latency)
        self.set_latency = float(set_latency)
        self._store = cls

    async def set(self, key: str, value: Buffer) -> None:
        """
        Add latency to the ``set`` method.

        Calls ``asyncio.sleep(self.set_latency)`` before invoking the wrapped ``set`` method.

        Parameters
        ----------
        key : str
            The key to set
        value : Buffer
            The value to set

        Returns
        -------
        None
        """
        await asyncio.sleep(self.set_latency)
        await self._store.set(key, value)

    async def get(
        self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
    ) -> Buffer | None:
        """
        Add latency to the ``get`` method.

        Calls ``asyncio.sleep(self.get_latency)`` before invoking the wrapped ``get`` method.

        Parameters
        ----------
        key : str
            The key to get
        prototype : BufferPrototype
            The BufferPrototype to use.
        byte_range : ByteRequest, optional
            An optional byte range.

        Returns
        -------
        buffer : Buffer or None
        """
        await asyncio.sleep(self.get_latency)
        return await self._store.get(key, prototype=prototype, byte_range=byte_range)

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes property

supports_deletes: bool

Does the store support deletes?

supports_listing property

supports_listing: bool

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes property

supports_writes: bool

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/storage/_wrapper.py
def __enter__(self) -> Self:
    return type(self)(self._store.__enter__())

__eq__

__eq__(value: object) -> bool

Equality comparison.

Source code in zarr/storage/_wrapper.py
def __eq__(self, value: object) -> bool:
    return type(self) is type(value) and self._store.__eq__(value._store)  # type: ignore[attr-defined]

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    return self._store.__exit__(exc_type, exc_value, traceback)

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_wrapper.py
async def clear(self) -> None:
    return await self._store.clear()

close

close() -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def close(self) -> None:
    self._store.close()

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_wrapper.py
async def delete(self, key: str) -> None:
    await self._store.delete(key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_wrapper.py
async def delete_dir(self, prefix: str) -> None:
    return await self._store.delete_dir(prefix)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_wrapper.py
async def exists(self, key: str) -> bool:
    return await self._store.exists(key)

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Add latency to the get method.

Calls asyncio.sleep(self.get_latency) before invoking the wrapped get method.

Parameters:

  • key (str) –

    The key to get

  • prototype (BufferPrototype) –

    The BufferPrototype to use.

  • byte_range (ByteRequest, default: None ) –

    An optional byte range.

Returns:

  • buffer ( Buffer or None ) –
Source code in zarr/testing/store.py
async def get(
    self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
    """
    Add latency to the ``get`` method.

    Calls ``asyncio.sleep(self.get_latency)`` before invoking the wrapped ``get`` method.

    Parameters
    ----------
    key : str
        The key to get
    prototype : BufferPrototype
        The BufferPrototype to use.
    byte_range : ByteRequest, optional
        An optional byte range.

    Returns
    -------
    buffer : Buffer or None
    """
    await asyncio.sleep(self.get_latency)
    return await self._store.get(key, prototype=prototype, byte_range=byte_range)

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_wrapper.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    return await self._store.get_partial_values(prototype, key_ranges)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/storage/_wrapper.py
async def is_empty(self, prefix: str) -> bool:
    return await self._store.is_empty(prefix)

list

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_wrapper.py
def list(self) -> AsyncIterator[str]:
    return self._store.list()

list_dir

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_wrapper.py
def list_dir(self, prefix: str) -> AsyncIterator[str]:
    return self._store.list_dir(prefix)

list_prefix

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_wrapper.py
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    return self._store.list_prefix(prefix)

open async classmethod

open(
    store_cls: type[T_Store], *args: Any, **kwargs: Any
) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/storage/_wrapper.py
@classmethod
async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
    store = store_cls(*args, **kwargs)
    await store._open()
    return cls(store=store)

set async

set(key: str, value: Buffer) -> None

Add latency to the set method.

Calls asyncio.sleep(self.set_latency) before invoking the wrapped set method.

Parameters:

  • key (str) –

    The key to set

  • value (Buffer) –

    The value to set

Returns:

  • None
Source code in zarr/testing/store.py
async def set(self, key: str, value: Buffer) -> None:
    """
    Add latency to the ``set`` method.

    Calls ``asyncio.sleep(self.set_latency)`` before invoking the wrapped ``set`` method.

    Parameters
    ----------
    key : str
        The key to set
    value : Buffer
        The value to set

    Returns
    -------
    None
    """
    await asyncio.sleep(self.set_latency)
    await self._store.set(key, value)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_wrapper.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    return await self._store.set_if_not_exists(key, value)

with_read_only

with_read_only(read_only: bool = False) -> Store

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/abc/store.py
def with_read_only(self, read_only: bool = False) -> Store:
    """
    Return a new store with a new read_only setting.

    The new store points to the same location with the specified new read_only state.
    The returned Store is not automatically opened, and this store is
    not automatically closed.

    Parameters
    ----------
    read_only
        If True, the store will be created in read-only mode. Defaults to False.

    Returns
    -------
        A new store of the same type with the new read only attribute.
    """
    raise NotImplementedError(
        f"with_read_only is not implemented for the {type(self)} store type."
    )

StoreTests

Bases: Generic[S, B]

Source code in zarr/testing/store.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
class StoreTests(Generic[S, B]):
    store_cls: type[S]
    buffer_cls: type[B]

    @abstractmethod
    async def set(self, store: S, key: str, value: Buffer) -> None:
        """
        Insert a value into a storage backend, with a specific key.
        This should not use any store methods. Bypassing the store methods allows them to be
        tested.
        """
        ...

    @abstractmethod
    async def get(self, store: S, key: str) -> Buffer:
        """
        Retrieve a value from a storage backend, by key.
        This should not use any store methods. Bypassing the store methods allows them to be
        tested.
        """
        ...

    @abstractmethod
    @pytest.fixture
    def store_kwargs(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
        """Kwargs for instantiating a store"""
        ...

    @abstractmethod
    def test_store_repr(self, store: S) -> None: ...

    @abstractmethod
    def test_store_supports_writes(self, store: S) -> None: ...

    def test_store_supports_partial_writes(self, store: S) -> None:
        assert not store.supports_partial_writes

    @abstractmethod
    def test_store_supports_listing(self, store: S) -> None: ...

    @pytest.fixture
    def open_kwargs(self, store_kwargs: dict[str, Any]) -> dict[str, Any]:
        return store_kwargs

    @pytest.fixture
    async def store(self, open_kwargs: dict[str, Any]) -> Store:
        return await self.store_cls.open(**open_kwargs)

    @pytest.fixture
    async def store_not_open(self, store_kwargs: dict[str, Any]) -> Store:
        return self.store_cls(**store_kwargs)

    def test_store_type(self, store: S) -> None:
        assert isinstance(store, Store)
        assert isinstance(store, self.store_cls)

    def test_store_eq(self, store: S, store_kwargs: dict[str, Any]) -> None:
        # check self equality
        assert store == store

        # check store equality with same inputs
        # asserting this is important for being able to compare (de)serialized stores
        store2 = self.store_cls(**store_kwargs)
        assert store == store2

    async def test_serializable_store(self, store: S) -> None:
        new_store: S = pickle.loads(pickle.dumps(store))
        assert new_store == store
        assert new_store.read_only == store.read_only
        # quickly roundtrip data to a key to test that new store works
        data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
        key = "foo"
        await store.set(key, data_buf)
        observed = await store.get(key, prototype=default_buffer_prototype())
        assert_bytes_equal(observed, data_buf)

    def test_store_read_only(self, store: S) -> None:
        assert not store.read_only

        with pytest.raises(AttributeError):
            store.read_only = False  # type: ignore[misc]

    @pytest.mark.parametrize("read_only", [True, False])
    async def test_store_open_read_only(self, open_kwargs: dict[str, Any], read_only: bool) -> None:
        open_kwargs["read_only"] = read_only
        store = await self.store_cls.open(**open_kwargs)
        assert store._is_open
        assert store.read_only == read_only

    async def test_store_context_manager(self, open_kwargs: dict[str, Any]) -> None:
        # Test that the context manager closes the store
        with await self.store_cls.open(**open_kwargs) as store:
            assert store._is_open
            # Test trying to open an already open store
            with pytest.raises(ValueError, match="store is already open"):
                await store._open()
        assert not store._is_open

    async def test_read_only_store_raises(self, open_kwargs: dict[str, Any]) -> None:
        kwargs = {**open_kwargs, "read_only": True}
        store = await self.store_cls.open(**kwargs)
        assert store.read_only

        # set
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await store.set("foo", self.buffer_cls.from_bytes(b"bar"))

        # delete
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await store.delete("foo")

    async def test_with_read_only_store(self, open_kwargs: dict[str, Any]) -> None:
        kwargs = {**open_kwargs, "read_only": True}
        store = await self.store_cls.open(**kwargs)
        assert store.read_only

        # Test that you cannot write to a read-only store
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await store.set("foo", self.buffer_cls.from_bytes(b"bar"))

        # Check if the store implements with_read_only
        try:
            writer = store.with_read_only(read_only=False)
        except NotImplementedError:
            # Test that stores that do not implement with_read_only raise NotImplementedError with the correct message
            with pytest.raises(
                NotImplementedError,
                match=f"with_read_only is not implemented for the {type(store)} store type.",
            ):
                store.with_read_only(read_only=False)
            return

        # Test that you can write to a new store copy
        assert not writer._is_open
        assert not writer.read_only
        await writer.set("foo", self.buffer_cls.from_bytes(b"bar"))
        await writer.delete("foo")

        # Test that you cannot write to the original store
        assert store.read_only
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await store.set("foo", self.buffer_cls.from_bytes(b"bar"))
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await store.delete("foo")

        # Test that you cannot write to a read-only store copy
        reader = store.with_read_only(read_only=True)
        assert reader.read_only
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await reader.set("foo", self.buffer_cls.from_bytes(b"bar"))
        with pytest.raises(
            ValueError, match="store was opened in read-only mode and does not support writing"
        ):
            await reader.delete("foo")

    @pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
    @pytest.mark.parametrize(
        ("data", "byte_range"),
        [
            (b"\x01\x02\x03\x04", None),
            (b"\x01\x02\x03\x04", RangeByteRequest(1, 4)),
            (b"\x01\x02\x03\x04", OffsetByteRequest(1)),
            (b"\x01\x02\x03\x04", SuffixByteRequest(1)),
            (b"", None),
        ],
    )
    async def test_get(self, store: S, key: str, data: bytes, byte_range: ByteRequest) -> None:
        """
        Ensure that data can be read from the store using the store.get method.
        """
        data_buf = self.buffer_cls.from_bytes(data)
        await self.set(store, key, data_buf)
        observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range)
        start, stop = _normalize_byte_range_index(data_buf, byte_range=byte_range)
        expected = data_buf[start:stop]
        assert_bytes_equal(observed, expected)

    async def test_get_not_open(self, store_not_open: S) -> None:
        """
        Ensure that data can be read from the store that isn't yet open using the store.get method.
        """
        assert not store_not_open._is_open
        data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
        key = "c/0"
        await self.set(store_not_open, key, data_buf)
        observed = await store_not_open.get(key, prototype=default_buffer_prototype())
        assert_bytes_equal(observed, data_buf)

    async def test_get_raises(self, store: S) -> None:
        """
        Ensure that a ValueError is raise for invalid byte range syntax
        """
        data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
        await self.set(store, "c/0", data_buf)
        with pytest.raises((ValueError, TypeError), match=r"Unexpected byte_range, got.*"):
            await store.get("c/0", prototype=default_buffer_prototype(), byte_range=(0, 2))  # type: ignore[arg-type]

    async def test_get_many(self, store: S) -> None:
        """
        Ensure that multiple keys can be retrieved at once with the _get_many method.
        """
        keys = tuple(map(str, range(10)))
        values = tuple(f"{k}".encode() for k in keys)
        for k, v in zip(keys, values, strict=False):
            await self.set(store, k, self.buffer_cls.from_bytes(v))
        observed_buffers = await _collect_aiterator(
            store._get_many(
                zip(
                    keys,
                    (default_buffer_prototype(),) * len(keys),
                    (None,) * len(keys),
                    strict=False,
                )
            )
        )
        observed_kvs = sorted(((k, b.to_bytes()) for k, b in observed_buffers))  # type: ignore[union-attr]
        expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
        assert observed_kvs == expected_kvs

    @pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
    @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
    async def test_getsize(self, store: S, key: str, data: bytes) -> None:
        """
        Test the result of store.getsize().
        """
        data_buf = self.buffer_cls.from_bytes(data)
        expected = len(data_buf)
        await self.set(store, key, data_buf)
        observed = await store.getsize(key)
        assert observed == expected

    async def test_getsize_prefix(self, store: S) -> None:
        """
        Test the result of store.getsize_prefix().
        """
        data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
        keys = ["c/0/0", "c/0/1", "c/1/0", "c/1/1"]
        keys_values = [(k, data_buf) for k in keys]
        await store._set_many(keys_values)
        expected = len(data_buf) * len(keys)
        observed = await store.getsize_prefix("c")
        assert observed == expected

    async def test_getsize_raises(self, store: S) -> None:
        """
        Test that getsize() raise a FileNotFoundError if the key doesn't exist.
        """
        with pytest.raises(FileNotFoundError):
            await store.getsize("c/1000")

    @pytest.mark.parametrize("key", ["zarr.json", "c/0", "foo/c/0.0", "foo/0/0"])
    @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
    async def test_set(self, store: S, key: str, data: bytes) -> None:
        """
        Ensure that data can be written to the store using the store.set method.
        """
        assert not store.read_only
        data_buf = self.buffer_cls.from_bytes(data)
        await store.set(key, data_buf)
        observed = await self.get(store, key)
        assert_bytes_equal(observed, data_buf)

    async def test_set_not_open(self, store_not_open: S) -> None:
        """
        Ensure that data can be written to the store that's not yet open using the store.set method.
        """
        assert not store_not_open._is_open
        data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
        key = "c/0"
        await store_not_open.set(key, data_buf)
        observed = await self.get(store_not_open, key)
        assert_bytes_equal(observed, data_buf)

    async def test_set_many(self, store: S) -> None:
        """
        Test that a dict of key : value pairs can be inserted into the store via the
        `_set_many` method.
        """
        keys = ["zarr.json", "c/0", "foo/c/0.0", "foo/0/0"]
        data_buf = [self.buffer_cls.from_bytes(k.encode()) for k in keys]
        store_dict = dict(zip(keys, data_buf, strict=True))
        await store._set_many(store_dict.items())
        for k, v in store_dict.items():
            assert (await self.get(store, k)).to_bytes() == v.to_bytes()

    @pytest.mark.parametrize(
        "key_ranges",
        [
            [],
            [("zarr.json", RangeByteRequest(0, 2))],
            [("c/0", RangeByteRequest(0, 2)), ("zarr.json", None)],
            [
                ("c/0/0", RangeByteRequest(0, 2)),
                ("c/0/1", SuffixByteRequest(2)),
                ("c/0/2", OffsetByteRequest(2)),
            ],
        ],
    )
    async def test_get_partial_values(
        self, store: S, key_ranges: list[tuple[str, ByteRequest]]
    ) -> None:
        # put all of the data
        for key, _ in key_ranges:
            await self.set(store, key, self.buffer_cls.from_bytes(bytes(key, encoding="utf-8")))

        # read back just part of it
        observed_maybe = await store.get_partial_values(
            prototype=default_buffer_prototype(), key_ranges=key_ranges
        )

        observed: list[Buffer] = []
        expected: list[Buffer] = []

        for obs in observed_maybe:
            assert obs is not None
            observed.append(obs)

        for idx in range(len(observed)):
            key, byte_range = key_ranges[idx]
            result = await store.get(
                key, prototype=default_buffer_prototype(), byte_range=byte_range
            )
            assert result is not None
            expected.append(result)

        assert all(
            obs.to_bytes() == exp.to_bytes() for obs, exp in zip(observed, expected, strict=True)
        )

    async def test_exists(self, store: S) -> None:
        assert not await store.exists("foo")
        await store.set("foo/zarr.json", self.buffer_cls.from_bytes(b"bar"))
        assert await store.exists("foo/zarr.json")

    async def test_delete(self, store: S) -> None:
        if not store.supports_deletes:
            pytest.skip("store does not support deletes")
        await store.set("foo/zarr.json", self.buffer_cls.from_bytes(b"bar"))
        assert await store.exists("foo/zarr.json")
        await store.delete("foo/zarr.json")
        assert not await store.exists("foo/zarr.json")

    async def test_delete_dir(self, store: S) -> None:
        if not store.supports_deletes:
            pytest.skip("store does not support deletes")
        await store.set("zarr.json", self.buffer_cls.from_bytes(b"root"))
        await store.set("foo-bar/zarr.json", self.buffer_cls.from_bytes(b"root"))
        await store.set("foo/zarr.json", self.buffer_cls.from_bytes(b"bar"))
        await store.set("foo/c/0", self.buffer_cls.from_bytes(b"chunk"))
        await store.delete_dir("foo")
        assert await store.exists("zarr.json")
        assert await store.exists("foo-bar/zarr.json")
        assert not await store.exists("foo/zarr.json")
        assert not await store.exists("foo/c/0")

    async def test_delete_nonexistent_key_does_not_raise(self, store: S) -> None:
        if not store.supports_deletes:
            pytest.skip("store does not support deletes")
        await store.delete("nonexistent_key")

    async def test_is_empty(self, store: S) -> None:
        assert await store.is_empty("")
        await self.set(
            store, "foo/bar", self.buffer_cls.from_bytes(bytes("something", encoding="utf-8"))
        )
        assert not await store.is_empty("")
        assert await store.is_empty("fo")
        assert not await store.is_empty("foo/")
        assert not await store.is_empty("foo")
        assert await store.is_empty("spam/")

    async def test_clear(self, store: S) -> None:
        await self.set(
            store, "key", self.buffer_cls.from_bytes(bytes("something", encoding="utf-8"))
        )
        await store.clear()
        assert await store.is_empty("")

    async def test_list(self, store: S) -> None:
        assert await _collect_aiterator(store.list()) == ()
        prefix = "foo"
        data = self.buffer_cls.from_bytes(b"")
        store_dict = {
            prefix + "/zarr.json": data,
            **{prefix + f"/c/{idx}": data for idx in range(10)},
        }
        await store._set_many(store_dict.items())
        expected_sorted = sorted(store_dict.keys())
        observed = await _collect_aiterator(store.list())
        observed_sorted = sorted(observed)
        assert observed_sorted == expected_sorted

    async def test_list_prefix(self, store: S) -> None:
        """
        Test that the `list_prefix` method works as intended. Given a prefix, it should return
        all the keys in storage that start with this prefix.
        """
        prefixes = ("", "a/", "a/b/", "a/b/c/")
        data = self.buffer_cls.from_bytes(b"")
        fname = "zarr.json"
        store_dict = {p + fname: data for p in prefixes}

        await store._set_many(store_dict.items())

        for prefix in prefixes:
            observed = tuple(sorted(await _collect_aiterator(store.list_prefix(prefix))))
            expected: tuple[str, ...] = ()
            for key in store_dict:
                if key.startswith(prefix):
                    expected += (key,)
            expected = tuple(sorted(expected))
            assert observed == expected

    async def test_list_empty_path(self, store: S) -> None:
        """
        Verify that list and list_prefix work correctly when path is an empty string,
        i.e. no unwanted replacement occurs.
        """
        data = self.buffer_cls.from_bytes(b"")
        store_dict = {
            "foo/bar/zarr.json": data,
            "foo/bar/c/1": data,
            "foo/baz/c/0": data,
        }
        await store._set_many(store_dict.items())

        # Test list()
        observed_list = await _collect_aiterator(store.list())
        observed_list_sorted = sorted(observed_list)
        expected_list_sorted = sorted(store_dict.keys())
        assert observed_list_sorted == expected_list_sorted

        # Test list_prefix() with an empty prefix
        observed_prefix_empty = await _collect_aiterator(store.list_prefix(""))
        observed_prefix_empty_sorted = sorted(observed_prefix_empty)
        expected_prefix_empty_sorted = sorted(store_dict.keys())
        assert observed_prefix_empty_sorted == expected_prefix_empty_sorted

        # Test list_prefix() with a non-empty prefix
        observed_prefix = await _collect_aiterator(store.list_prefix("foo/bar/"))
        observed_prefix_sorted = sorted(observed_prefix)
        expected_prefix_sorted = sorted(k for k in store_dict if k.startswith("foo/bar/"))
        assert observed_prefix_sorted == expected_prefix_sorted

    async def test_list_dir(self, store: S) -> None:
        root = "foo"
        store_dict = {
            root + "/zarr.json": self.buffer_cls.from_bytes(b"bar"),
            root + "/c/1": self.buffer_cls.from_bytes(b"\x01"),
        }

        assert await _collect_aiterator(store.list_dir("")) == ()
        assert await _collect_aiterator(store.list_dir(root)) == ()

        await store._set_many(store_dict.items())

        keys_observed = await _collect_aiterator(store.list_dir(root))
        keys_expected = {k.removeprefix(root + "/").split("/")[0] for k in store_dict}

        assert sorted(keys_observed) == sorted(keys_expected)

        keys_observed = await _collect_aiterator(store.list_dir(root + "/"))
        assert sorted(keys_expected) == sorted(keys_observed)

    async def test_set_if_not_exists(self, store: S) -> None:
        key = "k"
        data_buf = self.buffer_cls.from_bytes(b"0000")
        await self.set(store, key, data_buf)

        new = self.buffer_cls.from_bytes(b"1111")
        await store.set_if_not_exists("k", new)  # no error

        result = await store.get(key, default_buffer_prototype())
        assert result == data_buf

        await store.set_if_not_exists("k2", new)  # no error

        result = await store.get("k2", default_buffer_prototype())
        assert result == new

get abstractmethod async

get(store: S, key: str) -> Buffer

Retrieve a value from a storage backend, by key. This should not use any store methods. Bypassing the store methods allows them to be tested.

Source code in zarr/testing/store.py
@abstractmethod
async def get(self, store: S, key: str) -> Buffer:
    """
    Retrieve a value from a storage backend, by key.
    This should not use any store methods. Bypassing the store methods allows them to be
    tested.
    """
    ...

set abstractmethod async

set(store: S, key: str, value: Buffer) -> None

Insert a value into a storage backend, with a specific key. This should not use any store methods. Bypassing the store methods allows them to be tested.

Source code in zarr/testing/store.py
@abstractmethod
async def set(self, store: S, key: str, value: Buffer) -> None:
    """
    Insert a value into a storage backend, with a specific key.
    This should not use any store methods. Bypassing the store methods allows them to be
    tested.
    """
    ...

store_kwargs abstractmethod

store_kwargs(*args: Any, **kwargs: Any) -> dict[str, Any]

Kwargs for instantiating a store

Source code in zarr/testing/store.py
@abstractmethod
@pytest.fixture
def store_kwargs(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
    """Kwargs for instantiating a store"""
    ...

test_get async

test_get(
    store: S,
    key: str,
    data: bytes,
    byte_range: ByteRequest,
) -> None

Ensure that data can be read from the store using the store.get method.

Source code in zarr/testing/store.py
@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
@pytest.mark.parametrize(
    ("data", "byte_range"),
    [
        (b"\x01\x02\x03\x04", None),
        (b"\x01\x02\x03\x04", RangeByteRequest(1, 4)),
        (b"\x01\x02\x03\x04", OffsetByteRequest(1)),
        (b"\x01\x02\x03\x04", SuffixByteRequest(1)),
        (b"", None),
    ],
)
async def test_get(self, store: S, key: str, data: bytes, byte_range: ByteRequest) -> None:
    """
    Ensure that data can be read from the store using the store.get method.
    """
    data_buf = self.buffer_cls.from_bytes(data)
    await self.set(store, key, data_buf)
    observed = await store.get(key, prototype=default_buffer_prototype(), byte_range=byte_range)
    start, stop = _normalize_byte_range_index(data_buf, byte_range=byte_range)
    expected = data_buf[start:stop]
    assert_bytes_equal(observed, expected)

test_get_many async

test_get_many(store: S) -> None

Ensure that multiple keys can be retrieved at once with the _get_many method.

Source code in zarr/testing/store.py
async def test_get_many(self, store: S) -> None:
    """
    Ensure that multiple keys can be retrieved at once with the _get_many method.
    """
    keys = tuple(map(str, range(10)))
    values = tuple(f"{k}".encode() for k in keys)
    for k, v in zip(keys, values, strict=False):
        await self.set(store, k, self.buffer_cls.from_bytes(v))
    observed_buffers = await _collect_aiterator(
        store._get_many(
            zip(
                keys,
                (default_buffer_prototype(),) * len(keys),
                (None,) * len(keys),
                strict=False,
            )
        )
    )
    observed_kvs = sorted(((k, b.to_bytes()) for k, b in observed_buffers))  # type: ignore[union-attr]
    expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
    assert observed_kvs == expected_kvs

test_get_not_open async

test_get_not_open(store_not_open: S) -> None

Ensure that data can be read from the store that isn't yet open using the store.get method.

Source code in zarr/testing/store.py
async def test_get_not_open(self, store_not_open: S) -> None:
    """
    Ensure that data can be read from the store that isn't yet open using the store.get method.
    """
    assert not store_not_open._is_open
    data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
    key = "c/0"
    await self.set(store_not_open, key, data_buf)
    observed = await store_not_open.get(key, prototype=default_buffer_prototype())
    assert_bytes_equal(observed, data_buf)

test_get_raises async

test_get_raises(store: S) -> None

Ensure that a ValueError is raise for invalid byte range syntax

Source code in zarr/testing/store.py
async def test_get_raises(self, store: S) -> None:
    """
    Ensure that a ValueError is raise for invalid byte range syntax
    """
    data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
    await self.set(store, "c/0", data_buf)
    with pytest.raises((ValueError, TypeError), match=r"Unexpected byte_range, got.*"):
        await store.get("c/0", prototype=default_buffer_prototype(), byte_range=(0, 2))  # type: ignore[arg-type]

test_getsize async

test_getsize(store: S, key: str, data: bytes) -> None

Test the result of store.getsize().

Source code in zarr/testing/store.py
@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
async def test_getsize(self, store: S, key: str, data: bytes) -> None:
    """
    Test the result of store.getsize().
    """
    data_buf = self.buffer_cls.from_bytes(data)
    expected = len(data_buf)
    await self.set(store, key, data_buf)
    observed = await store.getsize(key)
    assert observed == expected

test_getsize_prefix async

test_getsize_prefix(store: S) -> None

Test the result of store.getsize_prefix().

Source code in zarr/testing/store.py
async def test_getsize_prefix(self, store: S) -> None:
    """
    Test the result of store.getsize_prefix().
    """
    data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
    keys = ["c/0/0", "c/0/1", "c/1/0", "c/1/1"]
    keys_values = [(k, data_buf) for k in keys]
    await store._set_many(keys_values)
    expected = len(data_buf) * len(keys)
    observed = await store.getsize_prefix("c")
    assert observed == expected

test_getsize_raises async

test_getsize_raises(store: S) -> None

Test that getsize() raise a FileNotFoundError if the key doesn't exist.

Source code in zarr/testing/store.py
async def test_getsize_raises(self, store: S) -> None:
    """
    Test that getsize() raise a FileNotFoundError if the key doesn't exist.
    """
    with pytest.raises(FileNotFoundError):
        await store.getsize("c/1000")

test_list_empty_path async

test_list_empty_path(store: S) -> None

Verify that list and list_prefix work correctly when path is an empty string, i.e. no unwanted replacement occurs.

Source code in zarr/testing/store.py
async def test_list_empty_path(self, store: S) -> None:
    """
    Verify that list and list_prefix work correctly when path is an empty string,
    i.e. no unwanted replacement occurs.
    """
    data = self.buffer_cls.from_bytes(b"")
    store_dict = {
        "foo/bar/zarr.json": data,
        "foo/bar/c/1": data,
        "foo/baz/c/0": data,
    }
    await store._set_many(store_dict.items())

    # Test list()
    observed_list = await _collect_aiterator(store.list())
    observed_list_sorted = sorted(observed_list)
    expected_list_sorted = sorted(store_dict.keys())
    assert observed_list_sorted == expected_list_sorted

    # Test list_prefix() with an empty prefix
    observed_prefix_empty = await _collect_aiterator(store.list_prefix(""))
    observed_prefix_empty_sorted = sorted(observed_prefix_empty)
    expected_prefix_empty_sorted = sorted(store_dict.keys())
    assert observed_prefix_empty_sorted == expected_prefix_empty_sorted

    # Test list_prefix() with a non-empty prefix
    observed_prefix = await _collect_aiterator(store.list_prefix("foo/bar/"))
    observed_prefix_sorted = sorted(observed_prefix)
    expected_prefix_sorted = sorted(k for k in store_dict if k.startswith("foo/bar/"))
    assert observed_prefix_sorted == expected_prefix_sorted

test_list_prefix async

test_list_prefix(store: S) -> None

Test that the list_prefix method works as intended. Given a prefix, it should return all the keys in storage that start with this prefix.

Source code in zarr/testing/store.py
async def test_list_prefix(self, store: S) -> None:
    """
    Test that the `list_prefix` method works as intended. Given a prefix, it should return
    all the keys in storage that start with this prefix.
    """
    prefixes = ("", "a/", "a/b/", "a/b/c/")
    data = self.buffer_cls.from_bytes(b"")
    fname = "zarr.json"
    store_dict = {p + fname: data for p in prefixes}

    await store._set_many(store_dict.items())

    for prefix in prefixes:
        observed = tuple(sorted(await _collect_aiterator(store.list_prefix(prefix))))
        expected: tuple[str, ...] = ()
        for key in store_dict:
            if key.startswith(prefix):
                expected += (key,)
        expected = tuple(sorted(expected))
        assert observed == expected

test_set async

test_set(store: S, key: str, data: bytes) -> None

Ensure that data can be written to the store using the store.set method.

Source code in zarr/testing/store.py
@pytest.mark.parametrize("key", ["zarr.json", "c/0", "foo/c/0.0", "foo/0/0"])
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
async def test_set(self, store: S, key: str, data: bytes) -> None:
    """
    Ensure that data can be written to the store using the store.set method.
    """
    assert not store.read_only
    data_buf = self.buffer_cls.from_bytes(data)
    await store.set(key, data_buf)
    observed = await self.get(store, key)
    assert_bytes_equal(observed, data_buf)

test_set_many async

test_set_many(store: S) -> None

Test that a dict of key : value pairs can be inserted into the store via the _set_many method.

Source code in zarr/testing/store.py
async def test_set_many(self, store: S) -> None:
    """
    Test that a dict of key : value pairs can be inserted into the store via the
    `_set_many` method.
    """
    keys = ["zarr.json", "c/0", "foo/c/0.0", "foo/0/0"]
    data_buf = [self.buffer_cls.from_bytes(k.encode()) for k in keys]
    store_dict = dict(zip(keys, data_buf, strict=True))
    await store._set_many(store_dict.items())
    for k, v in store_dict.items():
        assert (await self.get(store, k)).to_bytes() == v.to_bytes()

test_set_not_open async

test_set_not_open(store_not_open: S) -> None

Ensure that data can be written to the store that's not yet open using the store.set method.

Source code in zarr/testing/store.py
async def test_set_not_open(self, store_not_open: S) -> None:
    """
    Ensure that data can be written to the store that's not yet open using the store.set method.
    """
    assert not store_not_open._is_open
    data_buf = self.buffer_cls.from_bytes(b"\x01\x02\x03\x04")
    key = "c/0"
    await store_not_open.set(key, data_buf)
    observed = await self.get(store_not_open, key)
    assert_bytes_equal(observed, data_buf)

Strategies

zarr.testing.strategies

basic_indices

basic_indices(
    draw: DrawFn,
    *,
    shape: tuple[int, ...],
    min_dims: int = 0,
    max_dims: int | None = None,
    allow_newaxis: bool = False,
    allow_ellipsis: bool = True,
) -> Any

Basic indices without unsupported negative slices.

Source code in zarr/testing/strategies.py
@st.composite
def basic_indices(
    draw: st.DrawFn,
    *,
    shape: tuple[int, ...],
    min_dims: int = 0,
    max_dims: int | None = None,
    allow_newaxis: bool = False,
    allow_ellipsis: bool = True,
) -> Any:
    """Basic indices without unsupported negative slices."""
    strategy = npst.basic_indices(
        shape=shape,
        min_dims=min_dims,
        max_dims=max_dims,
        allow_newaxis=allow_newaxis,
        allow_ellipsis=allow_ellipsis,
    ).filter(
        lambda idxr: (
            not (
                is_negative_slice(idxr)
                or (isinstance(idxr, tuple) and any(is_negative_slice(idx) for idx in idxr))  # type: ignore[redundant-expr]
            )
        )
    )
    if math.prod(shape) >= 3:
        strategy = end_slices(shape=shape) | strategy
    return draw(strategy)

end_slices

end_slices(draw: DrawFn, *, shape: tuple[int, ...]) -> Any

A strategy that slices ranges that include the last chunk. This is intended to stress-test handling of a possibly smaller last chunk.

Source code in zarr/testing/strategies.py
@st.composite
def end_slices(draw: st.DrawFn, *, shape: tuple[int, ...]) -> Any:
    """
    A strategy that slices ranges that include the last chunk.
    This is intended to stress-test handling of a possibly smaller last chunk.
    """
    slicers = []
    for size in shape:
        start = draw(st.integers(min_value=size // 2, max_value=size - 1))
        length = draw(st.integers(min_value=0, max_value=size - start))
        slicers.append(slice(start, start + length))
    event("drawing end slice")
    return tuple(slicers)

key_ranges

key_ranges(
    keys: SearchStrategy[str] = node_names,
    max_size: int = maxsize,
) -> SearchStrategy[list[tuple[str, RangeByteRequest]]]

Function to generate key_ranges strategy for get_partial_values() returns list strategy w/ form::

[(key, (range_start, range_end)),
 (key, (range_start, range_end)),...]
Source code in zarr/testing/strategies.py
def key_ranges(
    keys: SearchStrategy[str] = node_names, max_size: int = sys.maxsize
) -> SearchStrategy[list[tuple[str, RangeByteRequest]]]:
    """
    Function to generate key_ranges strategy for get_partial_values()
    returns list strategy w/ form::

        [(key, (range_start, range_end)),
         (key, (range_start, range_end)),...]
    """

    def make_request(start: int, length: int) -> RangeByteRequest:
        return RangeByteRequest(start, end=min(start + length, max_size))

    byte_ranges = st.builds(
        make_request,
        start=st.integers(min_value=0, max_value=max_size),
        length=st.integers(min_value=0, max_value=max_size),
    )
    key_tuple = st.tuples(keys, byte_ranges)
    return st.lists(key_tuple, min_size=1, max_size=10)

np_array_and_chunks

np_array_and_chunks(
    draw: DrawFn,
    *,
    arrays: SearchStrategy[NDArray[Any]] = numpy_arrays(),
) -> tuple[ndarray, tuple[int, ...]]

A hypothesis strategy to generate small sized random arrays.

Returns: a tuple of the array and a suitable random chunking for it.

Source code in zarr/testing/strategies.py
@st.composite
def np_array_and_chunks(
    draw: st.DrawFn,
    *,
    arrays: st.SearchStrategy[npt.NDArray[Any]] = numpy_arrays(),  # noqa: B008
) -> tuple[np.ndarray, tuple[int, ...]]:  # type: ignore[type-arg]
    """A hypothesis strategy to generate small sized random arrays.

    Returns: a tuple of the array and a suitable random chunking for it.
    """
    array = draw(arrays)
    return (array, draw(chunk_shapes(shape=array.shape)))

numpy_arrays

numpy_arrays(
    draw: DrawFn,
    *,
    shapes: SearchStrategy[tuple[int, ...]] = array_shapes,
    dtype: dtype[Any] | None = None,
) -> NDArray[Any]

Generate numpy arrays that can be saved in the provided Zarr format.

Source code in zarr/testing/strategies.py
@st.composite
def numpy_arrays(
    draw: st.DrawFn,
    *,
    shapes: st.SearchStrategy[tuple[int, ...]] = array_shapes,
    dtype: np.dtype[Any] | None = None,
) -> npt.NDArray[Any]:
    """
    Generate numpy arrays that can be saved in the provided Zarr format.
    """
    if dtype is None:
        dtype = draw(dtypes())
    if np.issubdtype(dtype, np.str_):
        safe_unicode_strings = safe_unicode_for_dtype(dtype)
        return draw(npst.arrays(dtype=dtype, shape=shapes, elements=safe_unicode_strings))

    return draw(npst.arrays(dtype=dtype, shape=shapes))

orthogonal_indices

orthogonal_indices(
    draw: DrawFn, *, shape: tuple[int, ...]
) -> tuple[
    tuple[ndarray[Any, Any], ...],
    tuple[ndarray[Any, Any], ...],
]

Strategy that returns (1) a tuple of integer arrays used for orthogonal indexing of Zarr arrays. (2) an tuple of integer arrays that can be used for equivalent indexing of numpy arrays

Source code in zarr/testing/strategies.py
@st.composite
def orthogonal_indices(
    draw: st.DrawFn, *, shape: tuple[int, ...]
) -> tuple[tuple[np.ndarray[Any, Any], ...], tuple[np.ndarray[Any, Any], ...]]:
    """
    Strategy that returns
    (1) a tuple of integer arrays used for orthogonal indexing of Zarr arrays.
    (2) an tuple of integer arrays that can be used for equivalent indexing of numpy arrays
    """
    zindexer = []
    npindexer = []
    ndim = len(shape)
    for axis, size in enumerate(shape):
        if size != 0:
            strategy = npst.integer_array_indices(
                shape=(size,), result_shape=npst.array_shapes(min_side=1, max_side=size, max_dims=1)
            ) | basic_indices(min_dims=1, shape=(size,), allow_ellipsis=False)
        else:
            strategy = basic_indices(min_dims=1, shape=(size,), allow_ellipsis=False)

        val = draw(
            strategy
            # bare ints, slices
            .map(lambda x: (x,) if not isinstance(x, tuple) else x)
            # skip empty tuple
            .filter(bool)
        )
        (idxr,) = val
        if isinstance(idxr, int):
            idxr = np.array([idxr])
        zindexer.append(idxr)
        if isinstance(idxr, slice):
            idxr = np.arange(*idxr.indices(size))
        elif isinstance(idxr, (tuple, int)):
            idxr = np.array(idxr)
        newshape = [1] * ndim
        newshape[axis] = idxr.size
        npindexer.append(idxr.reshape(newshape))

    # casting the output of broadcast_arrays is needed for numpy < 2
    return tuple(zindexer), tuple(np.broadcast_arrays(*npindexer))

safe_unicode_for_dtype

safe_unicode_for_dtype(
    dtype: dtype[str_],
) -> SearchStrategy[str]

Generate UTF-8-safe text constrained to max_len of dtype.

Source code in zarr/testing/strategies.py
def safe_unicode_for_dtype(dtype: np.dtype[np.str_]) -> st.SearchStrategy[str]:
    """Generate UTF-8-safe text constrained to max_len of dtype."""
    # account for utf-32 encoding (i.e. 4 bytes/character)
    max_len = max(1, dtype.itemsize // 4)

    return st.text(
        alphabet=st.characters(
            exclude_categories=["Cs"],  # Avoid *technically allowed* surrogates
            min_codepoint=32,
        ),
        min_size=1,
        max_size=max_len,
    )

Utils

zarr.testing.utils

assert_bytes_equal

assert_bytes_equal(
    b1: Buffer | BytesLike | None,
    b2: Buffer | BytesLike | None,
) -> None

Help function to assert if two bytes-like or Buffers are equal

Warnings

Always copies data, only use for testing and debugging

Source code in zarr/testing/utils.py
def assert_bytes_equal(b1: Buffer | BytesLike | None, b2: Buffer | BytesLike | None) -> None:
    """Help function to assert if two bytes-like or Buffers are equal

    Warnings
    --------
    Always copies data, only use for testing and debugging
    """
    if isinstance(b1, Buffer):
        b1 = b1.to_bytes()
    if isinstance(b2, Buffer):
        b2 = b2.to_bytes()
    assert b1 == b2