Skip to content

storage

Attributes

zarr.storage.StoreLike module-attribute

StoreLike: TypeAlias = (
    Store
    | StorePath
    | FSMap
    | Path
    | str
    | dict[str, Buffer]
)

Classes

zarr.storage

FsspecStore

Bases: Store

Store for remote data based on FSSpec.

Parameters:

  • fs (AsyncFileSystem) –

    The Async FSSpec filesystem to use with this store.

  • read_only (bool, default: False ) –

    Whether the store is read-only

  • path (str, default: '/' ) –

    The root path of the store. This should be a relative path and must not include the filesystem scheme.

  • allowed_exceptions (tuple[type[Exception], ...], default: ALLOWED_EXCEPTIONS ) –

    When fetching data, these cases will be deemed to correspond to missing keys.

Attributes:

Raises:

  • TypeError

    If the Filesystem does not support async operations.

  • ValueError

    If the path argument includes a scheme.

Warns:

  • ZarrUserWarning

    If the file system (fs) was not created with asynchronous=True.

See Also

FsspecStore.from_upath FsspecStore.from_url

Source code in zarr/storage/_fsspec.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
class FsspecStore(Store):
    """
    Store for remote data based on FSSpec.

    Parameters
    ----------
    fs : AsyncFileSystem
        The Async FSSpec filesystem to use with this store.
    read_only : bool
        Whether the store is read-only
    path : str
        The root path of the store. This should be a relative path and must not include the
        filesystem scheme.
    allowed_exceptions : tuple[type[Exception], ...]
        When fetching data, these cases will be deemed to correspond to missing keys.

    Attributes
    ----------
    fs
    allowed_exceptions
    supports_writes
    supports_deletes
    supports_listing

    Raises
    ------
    TypeError
        If the Filesystem does not support async operations.
    ValueError
        If the path argument includes a scheme.

    Warns
    -----
    ZarrUserWarning
        If the file system (fs) was not created with `asynchronous=True`.

    See Also
    --------
    FsspecStore.from_upath
    FsspecStore.from_url
    """

    # based on FSSpec
    supports_writes: bool = True
    supports_deletes: bool = True
    supports_listing: bool = True

    fs: AsyncFileSystem
    allowed_exceptions: tuple[type[Exception], ...]
    path: str

    def __init__(
        self,
        fs: AsyncFileSystem,
        read_only: bool = False,
        path: str = "/",
        allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
    ) -> None:
        super().__init__(read_only=read_only)
        self.fs = fs
        self.path = path
        self.allowed_exceptions = allowed_exceptions

        if not self.fs.async_impl:
            raise TypeError("Filesystem needs to support async operations.")
        if not self.fs.asynchronous:
            warnings.warn(
                f"fs ({fs}) was not created with `asynchronous=True`, this may lead to surprising behavior",
                category=ZarrUserWarning,
                stacklevel=2,
            )
        if "://" in path and not path.startswith("http"):
            # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
            scheme, _ = path.split("://", maxsplit=1)
            raise ValueError(f"path argument to FsspecStore must not include scheme ({scheme}://)")

    @classmethod
    def from_upath(
        cls,
        upath: Any,
        read_only: bool = False,
        allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
    ) -> FsspecStore:
        """
        Create a FsspecStore from an upath object.

        Parameters
        ----------
        upath : UPath
            The upath to the root of the store.
        read_only : bool
            Whether the store is read-only, defaults to False.
        allowed_exceptions : tuple, optional
            The exceptions that are allowed to be raised when accessing the
            store. Defaults to ALLOWED_EXCEPTIONS.

        Returns
        -------
        FsspecStore
        """
        return cls(
            fs=upath.fs,
            path=upath.path.rstrip("/"),
            read_only=read_only,
            allowed_exceptions=allowed_exceptions,
        )

    @classmethod
    def from_mapper(
        cls,
        fs_map: FSMap,
        read_only: bool = False,
        allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
    ) -> FsspecStore:
        """
        Create a FsspecStore from a FSMap object.

        Parameters
        ----------
        fs_map : FSMap
            Fsspec mutable mapping object.
        read_only : bool
            Whether the store is read-only, defaults to False.
        allowed_exceptions : tuple, optional
            The exceptions that are allowed to be raised when accessing the
            store. Defaults to ALLOWED_EXCEPTIONS.

        Returns
        -------
        FsspecStore
        """
        fs = _make_async(fs_map.fs)
        return cls(
            fs=fs,
            path=fs_map.root,
            read_only=read_only,
            allowed_exceptions=allowed_exceptions,
        )

    @classmethod
    def from_url(
        cls,
        url: str,
        storage_options: dict[str, Any] | None = None,
        read_only: bool = False,
        allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
    ) -> FsspecStore:
        """
        Create a FsspecStore from a URL. The type of store is determined from the URL scheme.

        Parameters
        ----------
        url : str
            The URL to the root of the store.
        storage_options : dict, optional
            The options to pass to fsspec when creating the filesystem.
        read_only : bool
            Whether the store is read-only, defaults to False.
        allowed_exceptions : tuple, optional
            The exceptions that are allowed to be raised when accessing the
            store. Defaults to ALLOWED_EXCEPTIONS.

        Returns
        -------
        FsspecStore
        """
        try:
            from fsspec import url_to_fs
        except ImportError:
            # before fsspec==2024.3.1
            from fsspec.core import url_to_fs

        opts = storage_options or {}
        opts = {"asynchronous": True, **opts}

        fs, path = url_to_fs(url, **opts)
        if not fs.async_impl:
            fs = _make_async(fs)

        # fsspec is not consistent about removing the scheme from the path, so check and strip it here
        # https://github.com/fsspec/filesystem_spec/issues/1722
        if "://" in path and not path.startswith("http"):
            # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
            path = fs._strip_protocol(path)

        return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions)

    def with_read_only(self, read_only: bool = False) -> FsspecStore:
        # docstring inherited
        return type(self)(
            fs=self.fs,
            path=self.path,
            allowed_exceptions=self.allowed_exceptions,
            read_only=read_only,
        )

    async def clear(self) -> None:
        # docstring inherited
        try:
            for subpath in await self.fs._find(self.path, withdirs=True):
                if subpath != self.path:
                    await self.fs._rm(subpath, recursive=True)
        except FileNotFoundError:
            pass

    def __repr__(self) -> str:
        return f"<FsspecStore({type(self.fs).__name__}, {self.path})>"

    def __eq__(self, other: object) -> bool:
        return (
            isinstance(other, type(self))
            and self.path == other.path
            and self.read_only == other.read_only
            and self.fs == other.fs
        )

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        # docstring inherited
        if not self._is_open:
            await self._open()
        path = _dereference_path(self.path, key)

        try:
            if byte_range is None:
                value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
            elif isinstance(byte_range, RangeByteRequest):
                value = prototype.buffer.from_bytes(
                    await self.fs._cat_file(
                        path,
                        start=byte_range.start,
                        end=byte_range.end,
                    )
                )
            elif isinstance(byte_range, OffsetByteRequest):
                value = prototype.buffer.from_bytes(
                    await self.fs._cat_file(path, start=byte_range.offset, end=None)
                )
            elif isinstance(byte_range, SuffixByteRequest):
                value = prototype.buffer.from_bytes(
                    await self.fs._cat_file(path, start=-byte_range.suffix, end=None)
                )
            else:
                raise ValueError(f"Unexpected byte_range, got {byte_range}.")
        except self.allowed_exceptions:
            return None
        except OSError as e:
            if "not satisfiable" in str(e):
                # this is an s3-specific condition we probably don't want to leak
                return prototype.buffer.from_bytes(b"")
            raise
        else:
            return value

    async def set(
        self,
        key: str,
        value: Buffer,
        byte_range: tuple[int, int] | None = None,
    ) -> None:
        # docstring inherited
        if not self._is_open:
            await self._open()
        self._check_writable()
        if not isinstance(value, Buffer):
            raise TypeError(
                f"FsspecStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )
        path = _dereference_path(self.path, key)
        # write data
        if byte_range:
            raise NotImplementedError
        await self.fs._pipe_file(path, value.to_bytes())

    async def delete(self, key: str) -> None:
        # docstring inherited
        self._check_writable()
        path = _dereference_path(self.path, key)
        try:
            await self.fs._rm(path)
        except FileNotFoundError:
            pass
        except self.allowed_exceptions:
            pass

    async def delete_dir(self, prefix: str) -> None:
        # docstring inherited
        if not self.supports_deletes:
            raise NotImplementedError(
                "This method is only available for stores that support deletes."
            )
        self._check_writable()

        path_to_delete = _dereference_path(self.path, prefix)

        with suppress(*self.allowed_exceptions):
            await self.fs._rm(path_to_delete, recursive=True)

    async def exists(self, key: str) -> bool:
        # docstring inherited
        path = _dereference_path(self.path, key)
        exists: bool = await self.fs._exists(path)
        return exists

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited
        if key_ranges:
            # _cat_ranges expects a list of paths, start, and end ranges, so we need to reformat each ByteRequest.
            key_ranges = list(key_ranges)
            paths: list[str] = []
            starts: list[int | None] = []
            stops: list[int | None] = []
            for key, byte_range in key_ranges:
                paths.append(_dereference_path(self.path, key))
                if byte_range is None:
                    starts.append(None)
                    stops.append(None)
                elif isinstance(byte_range, RangeByteRequest):
                    starts.append(byte_range.start)
                    stops.append(byte_range.end)
                elif isinstance(byte_range, OffsetByteRequest):
                    starts.append(byte_range.offset)
                    stops.append(None)
                elif isinstance(byte_range, SuffixByteRequest):
                    starts.append(-byte_range.suffix)
                    stops.append(None)
                else:
                    raise ValueError(f"Unexpected byte_range, got {byte_range}.")
        else:
            return []
        # TODO: expectations for exceptions or missing keys?
        res = await self.fs._cat_ranges(paths, starts, stops, on_error="return")
        # the following is an s3-specific condition we probably don't want to leak
        res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res]
        for r in res:
            if isinstance(r, Exception) and not isinstance(r, self.allowed_exceptions):
                raise r

        return [None if isinstance(r, Exception) else prototype.buffer.from_bytes(r) for r in res]

    async def list(self) -> AsyncIterator[str]:
        # docstring inherited
        allfiles = await self.fs._find(self.path, detail=False, withdirs=False)
        for onefile in (a.removeprefix(self.path + "/") for a in allfiles):
            yield onefile

    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        prefix = f"{self.path}/{prefix.rstrip('/')}"
        try:
            allfiles = await self.fs._ls(prefix, detail=False)
        except FileNotFoundError:
            return
        for onefile in (a.replace(prefix + "/", "") for a in allfiles):
            yield onefile.removeprefix(self.path).removeprefix("/")

    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        for onefile in await self.fs._find(
            f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
        ):
            yield onefile.removeprefix(f"{self.path}/")

    async def getsize(self, key: str) -> int:
        path = _dereference_path(self.path, key)
        info = await self.fs._info(path)

        size = info.get("size")

        if size is None:
            # Not all filesystems support size. Fall back to reading the entire object
            return await super().getsize(key)
        else:
            # fsspec doesn't have typing. We'll need to assume or verify this is true
            return int(size)

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = True

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_fsspec.py
def __eq__(self, other: object) -> bool:
    return (
        isinstance(other, type(self))
        and self.path == other.path
        and self.read_only == other.read_only
        and self.fs == other.fs
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_fsspec.py
async def clear(self) -> None:
    # docstring inherited
    try:
        for subpath in await self.fs._find(self.path, withdirs=True):
            if subpath != self.path:
                await self.fs._rm(subpath, recursive=True)
    except FileNotFoundError:
        pass

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_fsspec.py
async def delete(self, key: str) -> None:
    # docstring inherited
    self._check_writable()
    path = _dereference_path(self.path, key)
    try:
        await self.fs._rm(path)
    except FileNotFoundError:
        pass
    except self.allowed_exceptions:
        pass

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_fsspec.py
async def delete_dir(self, prefix: str) -> None:
    # docstring inherited
    if not self.supports_deletes:
        raise NotImplementedError(
            "This method is only available for stores that support deletes."
        )
    self._check_writable()

    path_to_delete = _dereference_path(self.path, prefix)

    with suppress(*self.allowed_exceptions):
        await self.fs._rm(path_to_delete, recursive=True)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_fsspec.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    path = _dereference_path(self.path, key)
    exists: bool = await self.fs._exists(path)
    return exists

from_mapper classmethod

from_mapper(
    fs_map: FSMap,
    read_only: bool = False,
    allowed_exceptions: tuple[
        type[Exception], ...
    ] = ALLOWED_EXCEPTIONS,
) -> FsspecStore

Create a FsspecStore from a FSMap object.

Parameters:

  • fs_map (FSMap) –

    Fsspec mutable mapping object.

  • read_only (bool, default: False ) –

    Whether the store is read-only, defaults to False.

  • allowed_exceptions (tuple, default: ALLOWED_EXCEPTIONS ) –

    The exceptions that are allowed to be raised when accessing the store. Defaults to ALLOWED_EXCEPTIONS.

Returns:

Source code in zarr/storage/_fsspec.py
@classmethod
def from_mapper(
    cls,
    fs_map: FSMap,
    read_only: bool = False,
    allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> FsspecStore:
    """
    Create a FsspecStore from a FSMap object.

    Parameters
    ----------
    fs_map : FSMap
        Fsspec mutable mapping object.
    read_only : bool
        Whether the store is read-only, defaults to False.
    allowed_exceptions : tuple, optional
        The exceptions that are allowed to be raised when accessing the
        store. Defaults to ALLOWED_EXCEPTIONS.

    Returns
    -------
    FsspecStore
    """
    fs = _make_async(fs_map.fs)
    return cls(
        fs=fs,
        path=fs_map.root,
        read_only=read_only,
        allowed_exceptions=allowed_exceptions,
    )

from_upath classmethod

from_upath(
    upath: Any,
    read_only: bool = False,
    allowed_exceptions: tuple[
        type[Exception], ...
    ] = ALLOWED_EXCEPTIONS,
) -> FsspecStore

Create a FsspecStore from an upath object.

Parameters:

  • upath (UPath) –

    The upath to the root of the store.

  • read_only (bool, default: False ) –

    Whether the store is read-only, defaults to False.

  • allowed_exceptions (tuple, default: ALLOWED_EXCEPTIONS ) –

    The exceptions that are allowed to be raised when accessing the store. Defaults to ALLOWED_EXCEPTIONS.

Returns:

Source code in zarr/storage/_fsspec.py
@classmethod
def from_upath(
    cls,
    upath: Any,
    read_only: bool = False,
    allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> FsspecStore:
    """
    Create a FsspecStore from an upath object.

    Parameters
    ----------
    upath : UPath
        The upath to the root of the store.
    read_only : bool
        Whether the store is read-only, defaults to False.
    allowed_exceptions : tuple, optional
        The exceptions that are allowed to be raised when accessing the
        store. Defaults to ALLOWED_EXCEPTIONS.

    Returns
    -------
    FsspecStore
    """
    return cls(
        fs=upath.fs,
        path=upath.path.rstrip("/"),
        read_only=read_only,
        allowed_exceptions=allowed_exceptions,
    )

from_url classmethod

from_url(
    url: str,
    storage_options: dict[str, Any] | None = None,
    read_only: bool = False,
    allowed_exceptions: tuple[
        type[Exception], ...
    ] = ALLOWED_EXCEPTIONS,
) -> FsspecStore

Create a FsspecStore from a URL. The type of store is determined from the URL scheme.

Parameters:

  • url (str) –

    The URL to the root of the store.

  • storage_options (dict, default: None ) –

    The options to pass to fsspec when creating the filesystem.

  • read_only (bool, default: False ) –

    Whether the store is read-only, defaults to False.

  • allowed_exceptions (tuple, default: ALLOWED_EXCEPTIONS ) –

    The exceptions that are allowed to be raised when accessing the store. Defaults to ALLOWED_EXCEPTIONS.

Returns:

Source code in zarr/storage/_fsspec.py
@classmethod
def from_url(
    cls,
    url: str,
    storage_options: dict[str, Any] | None = None,
    read_only: bool = False,
    allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> FsspecStore:
    """
    Create a FsspecStore from a URL. The type of store is determined from the URL scheme.

    Parameters
    ----------
    url : str
        The URL to the root of the store.
    storage_options : dict, optional
        The options to pass to fsspec when creating the filesystem.
    read_only : bool
        Whether the store is read-only, defaults to False.
    allowed_exceptions : tuple, optional
        The exceptions that are allowed to be raised when accessing the
        store. Defaults to ALLOWED_EXCEPTIONS.

    Returns
    -------
    FsspecStore
    """
    try:
        from fsspec import url_to_fs
    except ImportError:
        # before fsspec==2024.3.1
        from fsspec.core import url_to_fs

    opts = storage_options or {}
    opts = {"asynchronous": True, **opts}

    fs, path = url_to_fs(url, **opts)
    if not fs.async_impl:
        fs = _make_async(fs)

    # fsspec is not consistent about removing the scheme from the path, so check and strip it here
    # https://github.com/fsspec/filesystem_spec/issues/1722
    if "://" in path and not path.startswith("http"):
        # `not path.startswith("http")` is a special case for the http filesystem (¯\_(ツ)_/¯)
        path = fs._strip_protocol(path)

    return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions)

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_fsspec.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    if not self._is_open:
        await self._open()
    path = _dereference_path(self.path, key)

    try:
        if byte_range is None:
            value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
        elif isinstance(byte_range, RangeByteRequest):
            value = prototype.buffer.from_bytes(
                await self.fs._cat_file(
                    path,
                    start=byte_range.start,
                    end=byte_range.end,
                )
            )
        elif isinstance(byte_range, OffsetByteRequest):
            value = prototype.buffer.from_bytes(
                await self.fs._cat_file(path, start=byte_range.offset, end=None)
            )
        elif isinstance(byte_range, SuffixByteRequest):
            value = prototype.buffer.from_bytes(
                await self.fs._cat_file(path, start=-byte_range.suffix, end=None)
            )
        else:
            raise ValueError(f"Unexpected byte_range, got {byte_range}.")
    except self.allowed_exceptions:
        return None
    except OSError as e:
        if "not satisfiable" in str(e):
            # this is an s3-specific condition we probably don't want to leak
            return prototype.buffer.from_bytes(b"")
        raise
    else:
        return value

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_fsspec.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited
    if key_ranges:
        # _cat_ranges expects a list of paths, start, and end ranges, so we need to reformat each ByteRequest.
        key_ranges = list(key_ranges)
        paths: list[str] = []
        starts: list[int | None] = []
        stops: list[int | None] = []
        for key, byte_range in key_ranges:
            paths.append(_dereference_path(self.path, key))
            if byte_range is None:
                starts.append(None)
                stops.append(None)
            elif isinstance(byte_range, RangeByteRequest):
                starts.append(byte_range.start)
                stops.append(byte_range.end)
            elif isinstance(byte_range, OffsetByteRequest):
                starts.append(byte_range.offset)
                stops.append(None)
            elif isinstance(byte_range, SuffixByteRequest):
                starts.append(-byte_range.suffix)
                stops.append(None)
            else:
                raise ValueError(f"Unexpected byte_range, got {byte_range}.")
    else:
        return []
    # TODO: expectations for exceptions or missing keys?
    res = await self.fs._cat_ranges(paths, starts, stops, on_error="return")
    # the following is an s3-specific condition we probably don't want to leak
    res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res]
    for r in res:
        if isinstance(r, Exception) and not isinstance(r, self.allowed_exceptions):
            raise r

    return [None if isinstance(r, Exception) else prototype.buffer.from_bytes(r) for r in res]

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/storage/_fsspec.py
async def getsize(self, key: str) -> int:
    path = _dereference_path(self.path, key)
    info = await self.fs._info(path)

    size = info.get("size")

    if size is None:
        # Not all filesystems support size. Fall back to reading the entire object
        return await super().getsize(key)
    else:
        # fsspec doesn't have typing. We'll need to assume or verify this is true
        return int(size)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_fsspec.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    allfiles = await self.fs._find(self.path, detail=False, withdirs=False)
    for onefile in (a.removeprefix(self.path + "/") for a in allfiles):
        yield onefile

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_fsspec.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    prefix = f"{self.path}/{prefix.rstrip('/')}"
    try:
        allfiles = await self.fs._ls(prefix, detail=False)
    except FileNotFoundError:
        return
    for onefile in (a.replace(prefix + "/", "") for a in allfiles):
        yield onefile.removeprefix(self.path).removeprefix("/")

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_fsspec.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    for onefile in await self.fs._find(
        f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
    ):
        yield onefile.removeprefix(f"{self.path}/")

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(
    key: str,
    value: Buffer,
    byte_range: tuple[int, int] | None = None,
) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_fsspec.py
async def set(
    self,
    key: str,
    value: Buffer,
    byte_range: tuple[int, int] | None = None,
) -> None:
    # docstring inherited
    if not self._is_open:
        await self._open()
    self._check_writable()
    if not isinstance(value, Buffer):
        raise TypeError(
            f"FsspecStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
        )
    path = _dereference_path(self.path, key)
    # write data
    if byte_range:
        raise NotImplementedError
    await self.fs._pipe_file(path, value.to_bytes())

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/abc/store.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    """
    Store a key to ``value`` if the key is not already present.

    Parameters
    ----------
    key : str
    value : Buffer
    """
    # Note for implementers: the default implementation provided here
    # is not safe for concurrent writers. There's a race condition between
    # the `exists` check and the `set` where another writer could set some
    # value at `key` or delete `key`.
    if not await self.exists(key):
        await self.set(key, value)

with_read_only

with_read_only(read_only: bool = False) -> FsspecStore

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_fsspec.py
def with_read_only(self, read_only: bool = False) -> FsspecStore:
    # docstring inherited
    return type(self)(
        fs=self.fs,
        path=self.path,
        allowed_exceptions=self.allowed_exceptions,
        read_only=read_only,
    )

GpuMemoryStore

Bases: MemoryStore

Store for GPU memory.

Stores every chunk in GPU memory irrespective of the original location.

The dictionary of buffers to initialize this memory store with must be GPU Buffers.

Writing data to this store through .set will move the buffer to the GPU if necessary.

Parameters:

Source code in zarr/storage/_memory.py
class GpuMemoryStore(MemoryStore):
    """
    Store for GPU memory.

    Stores every chunk in GPU memory irrespective of the original location.

    The dictionary of buffers to initialize this memory store with *must* be
    GPU Buffers.

    Writing data to this store through ``.set`` will move the buffer to the GPU
    if necessary.

    Parameters
    ----------
    store_dict : MutableMapping, optional
        A mutable mapping with string keys and [zarr.core.buffer.gpu.Buffer][]
        values.
    read_only : bool
        Whether to open the store in read-only mode.
    """

    _store_dict: MutableMapping[str, gpu.Buffer]  # type: ignore[assignment]

    def __init__(
        self,
        store_dict: MutableMapping[str, gpu.Buffer] | None = None,
        *,
        read_only: bool = False,
    ) -> None:
        super().__init__(store_dict=store_dict, read_only=read_only)  # type: ignore[arg-type]

    def __str__(self) -> str:
        return f"gpumemory://{id(self._store_dict)}"

    def __repr__(self) -> str:
        return f"GpuMemoryStore('{self}')"

    @classmethod
    def from_dict(cls, store_dict: MutableMapping[str, Buffer]) -> Self:
        """
        Create a GpuMemoryStore from a dictionary of buffers at any location.

        The dictionary backing the newly created ``GpuMemoryStore`` will not be
        the same as ``store_dict``.

        Parameters
        ----------
        store_dict : mapping
            A mapping of strings keys to arbitrary Buffers. The buffer data
            will be moved into a [`gpu.Buffer`][zarr.core.buffer.gpu.Buffer].

        Returns
        -------
        GpuMemoryStore
        """
        gpu_store_dict = {k: gpu.Buffer.from_buffer(v) for k, v in store_dict.items()}
        return cls(gpu_store_dict)

    async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
        # docstring inherited
        self._check_writable()
        assert isinstance(key, str)
        if not isinstance(value, Buffer):
            raise TypeError(
                f"GpuMemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )
        # Convert to gpu.Buffer
        gpu_value = value if isinstance(value, gpu.Buffer) else gpu.Buffer.from_buffer(value)
        await super().set(key, gpu_value, byte_range=byte_range)

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = True

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_memory.py
def __eq__(self, other: object) -> bool:
    return (
        isinstance(other, type(self))
        and self._store_dict == other._store_dict
        and self.read_only == other.read_only
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_memory.py
async def clear(self) -> None:
    # docstring inherited
    self._store_dict.clear()

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_memory.py
async def delete(self, key: str) -> None:
    # docstring inherited
    self._check_writable()
    try:
        del self._store_dict[key]
    except KeyError:
        logger.debug("Key %s does not exist.", key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/abc/store.py
async def delete_dir(self, prefix: str) -> None:
    """
    Remove all keys and prefixes in the store that begin with a given prefix.
    """
    if not self.supports_deletes:
        raise NotImplementedError
    if not self.supports_listing:
        raise NotImplementedError
    self._check_writable()
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for key in self.list_prefix(prefix):
        await self.delete(key)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_memory.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    return key in self._store_dict

from_dict classmethod

from_dict(store_dict: MutableMapping[str, Buffer]) -> Self

Create a GpuMemoryStore from a dictionary of buffers at any location.

The dictionary backing the newly created GpuMemoryStore will not be the same as store_dict.

Parameters:

  • store_dict (mapping) –

    A mapping of strings keys to arbitrary Buffers. The buffer data will be moved into a gpu.Buffer.

Returns:

Source code in zarr/storage/_memory.py
@classmethod
def from_dict(cls, store_dict: MutableMapping[str, Buffer]) -> Self:
    """
    Create a GpuMemoryStore from a dictionary of buffers at any location.

    The dictionary backing the newly created ``GpuMemoryStore`` will not be
    the same as ``store_dict``.

    Parameters
    ----------
    store_dict : mapping
        A mapping of strings keys to arbitrary Buffers. The buffer data
        will be moved into a [`gpu.Buffer`][zarr.core.buffer.gpu.Buffer].

    Returns
    -------
    GpuMemoryStore
    """
    gpu_store_dict = {k: gpu.Buffer.from_buffer(v) for k, v in store_dict.items()}
    return cls(gpu_store_dict)

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_memory.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    if not self._is_open:
        await self._open()
    assert isinstance(key, str)
    try:
        value = self._store_dict[key]
        start, stop = _normalize_byte_range_index(value, byte_range)
        return prototype.buffer.from_buffer(value[start:stop])
    except KeyError:
        return None

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_memory.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited

    # All the key-ranges arguments goes with the same prototype
    async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None:
        return await self.get(key, prototype=prototype, byte_range=byte_range)

    return await concurrent_map(key_ranges, _get, limit=None)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_memory.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    for key in self._store_dict:
        yield key

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    prefix = prefix.rstrip("/")

    if prefix == "":
        keys_unique = {k.split("/")[0] for k in self._store_dict}
    else:
        # Our dictionary doesn't contain directory markers, but we want to include
        # a pseudo directory when there's a nested item and we're listing an
        # intermediate level.
        keys_unique = {
            key.removeprefix(prefix + "/").split("/")[0]
            for key in self._store_dict
            if key.startswith(prefix + "/") and key != prefix
        }

    for key in keys_unique:
        yield key

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    # note: we materialize all dict keys into a list here so we can mutate the dict in-place (e.g. in delete_prefix)
    for key in list(self._store_dict):
        if key.startswith(prefix):
            yield key

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(
    key: str,
    value: Buffer,
    byte_range: tuple[int, int] | None = None,
) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_memory.py
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
    # docstring inherited
    self._check_writable()
    assert isinstance(key, str)
    if not isinstance(value, Buffer):
        raise TypeError(
            f"GpuMemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
        )
    # Convert to gpu.Buffer
    gpu_value = value if isinstance(value, gpu.Buffer) else gpu.Buffer.from_buffer(value)
    await super().set(key, gpu_value, byte_range=byte_range)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_memory.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    self._check_writable()
    await self._ensure_open()
    self._store_dict.setdefault(key, value)

with_read_only

with_read_only(read_only: bool = False) -> MemoryStore

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_memory.py
def with_read_only(self, read_only: bool = False) -> MemoryStore:
    # docstring inherited
    return type(self)(
        store_dict=self._store_dict,
        read_only=read_only,
    )

LocalStore

Bases: Store

Store for the local file system.

Parameters:

  • root (str or Path) –

    Directory to use as root of store.

  • read_only (bool, default: False ) –

    Whether the store is read-only

Attributes:

Source code in zarr/storage/_local.py
class LocalStore(Store):
    """
    Store for the local file system.

    Parameters
    ----------
    root : str or Path
        Directory to use as root of store.
    read_only : bool
        Whether the store is read-only

    Attributes
    ----------
    supports_writes
    supports_deletes
    supports_listing
    root
    """

    supports_writes: bool = True
    supports_deletes: bool = True
    supports_listing: bool = True

    root: Path

    def __init__(self, root: Path | str, *, read_only: bool = False) -> None:
        super().__init__(read_only=read_only)
        if isinstance(root, str):
            root = Path(root)
        if not isinstance(root, Path):
            raise TypeError(
                f"'root' must be a string or Path instance. Got an instance of {type(root)} instead."
            )
        self.root = root

    def with_read_only(self, read_only: bool = False) -> Self:
        # docstring inherited
        return type(self)(
            root=self.root,
            read_only=read_only,
        )

    @classmethod
    async def open(
        cls, root: Path | str, *, read_only: bool = False, mode: AccessModeLiteral | None = None
    ) -> Self:
        """
        Create and open the store.

        Parameters
        ----------
        root : str or Path
            Directory to use as root of store.
        read_only : bool
            Whether the store is read-only
        mode :
            Mode in which to create the store. This only affects opening the store,
            and the final read-only state of the store is controlled through the
            read_only parameter.

        Returns
        -------
        Store
            The opened store instance.
        """
        # If mode = 'r+', want to open in read only mode (fail if exists),
        # but return a writeable store
        if mode is not None:
            read_only_creation = mode in ["r", "r+"]
        else:
            read_only_creation = read_only
        store = cls(root, read_only=read_only_creation)
        await store._open()

        # Set read_only state
        store = store.with_read_only(read_only)
        await store._open()
        return store

    async def _open(self, *, mode: AccessModeLiteral | None = None) -> None:
        if not self.read_only:
            self.root.mkdir(parents=True, exist_ok=True)

        if not self.root.exists():
            raise FileNotFoundError(f"{self.root} does not exist")
        return await super()._open()

    async def clear(self) -> None:
        # docstring inherited
        self._check_writable()
        shutil.rmtree(self.root)
        self.root.mkdir()

    def __str__(self) -> str:
        return f"file://{self.root.as_posix()}"

    def __repr__(self) -> str:
        return f"LocalStore('{self}')"

    def __eq__(self, other: object) -> bool:
        return isinstance(other, type(self)) and self.root == other.root

    async def get(
        self,
        key: str,
        prototype: BufferPrototype | None = None,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        # docstring inherited
        if prototype is None:
            prototype = default_buffer_prototype()
        if not self._is_open:
            await self._open()
        assert isinstance(key, str)
        path = self.root / key

        try:
            return await asyncio.to_thread(_get, path, prototype, byte_range)
        except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
            return None

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited
        args = []
        for key, byte_range in key_ranges:
            assert isinstance(key, str)
            path = self.root / key
            args.append((_get, path, prototype, byte_range))
        return await concurrent_map(args, asyncio.to_thread, limit=None)  # TODO: fix limit

    async def set(self, key: str, value: Buffer) -> None:
        # docstring inherited
        return await self._set(key, value)

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        # docstring inherited
        try:
            return await self._set(key, value, exclusive=True)
        except FileExistsError:
            pass

    async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None:
        if not self._is_open:
            await self._open()
        self._check_writable()
        assert isinstance(key, str)
        if not isinstance(value, Buffer):
            raise TypeError(
                f"LocalStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )
        path = self.root / key
        await asyncio.to_thread(_put, path, value, exclusive=exclusive)

    async def delete(self, key: str) -> None:
        """
        Remove a key from the store.

        Parameters
        ----------
        key : str

        Notes
        -----
        If ``key`` is a directory within this store, the entire directory
        at ``store.root / key`` is deleted.
        """
        # docstring inherited
        self._check_writable()
        path = self.root / key
        if path.is_dir():  # TODO: support deleting directories? shutil.rmtree?
            shutil.rmtree(path)
        else:
            await asyncio.to_thread(path.unlink, True)  # Q: we may want to raise if path is missing

    async def delete_dir(self, prefix: str) -> None:
        # docstring inherited
        self._check_writable()
        path = self.root / prefix
        if path.is_dir():
            shutil.rmtree(path)
        elif path.is_file():
            raise ValueError(f"delete_dir was passed a {prefix=!r} that is a file")
        else:
            # Non-existent directory
            # This path is tested by test_group:test_create_creates_parents for one
            pass

    async def exists(self, key: str) -> bool:
        # docstring inherited
        path = self.root / key
        return await asyncio.to_thread(path.is_file)

    async def list(self) -> AsyncIterator[str]:
        # docstring inherited
        to_strip = self.root.as_posix() + "/"
        for p in list(self.root.rglob("*")):
            if p.is_file():
                yield p.as_posix().replace(to_strip, "")

    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        to_strip = self.root.as_posix() + "/"
        prefix = prefix.rstrip("/")
        for p in (self.root / prefix).rglob("*"):
            if p.is_file():
                yield p.as_posix().replace(to_strip, "")

    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        base = self.root / prefix
        try:
            key_iter = base.iterdir()
            for key in key_iter:
                yield key.relative_to(base).as_posix()
        except (FileNotFoundError, NotADirectoryError):
            pass

    async def move(self, dest_root: Path | str) -> None:
        """
        Move the store to another path. The old root directory is deleted.
        """
        if isinstance(dest_root, str):
            dest_root = Path(dest_root)
        os.makedirs(dest_root.parent, exist_ok=True)
        if os.path.exists(dest_root):
            raise FileExistsError(f"Destination root {dest_root} already exists.")
        shutil.move(self.root, dest_root)
        self.root = dest_root

    async def getsize(self, key: str) -> int:
        return os.path.getsize(self.root / key)

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = True

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_local.py
def __eq__(self, other: object) -> bool:
    return isinstance(other, type(self)) and self.root == other.root

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_local.py
async def clear(self) -> None:
    # docstring inherited
    self._check_writable()
    shutil.rmtree(self.root)
    self.root.mkdir()

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store.

Parameters:

  • key (str) –
Notes

If key is a directory within this store, the entire directory at store.root / key is deleted.

Source code in zarr/storage/_local.py
async def delete(self, key: str) -> None:
    """
    Remove a key from the store.

    Parameters
    ----------
    key : str

    Notes
    -----
    If ``key`` is a directory within this store, the entire directory
    at ``store.root / key`` is deleted.
    """
    # docstring inherited
    self._check_writable()
    path = self.root / key
    if path.is_dir():  # TODO: support deleting directories? shutil.rmtree?
        shutil.rmtree(path)
    else:
        await asyncio.to_thread(path.unlink, True)  # Q: we may want to raise if path is missing

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_local.py
async def delete_dir(self, prefix: str) -> None:
    # docstring inherited
    self._check_writable()
    path = self.root / prefix
    if path.is_dir():
        shutil.rmtree(path)
    elif path.is_file():
        raise ValueError(f"delete_dir was passed a {prefix=!r} that is a file")
    else:
        # Non-existent directory
        # This path is tested by test_group:test_create_creates_parents for one
        pass

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_local.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    path = self.root / key
    return await asyncio.to_thread(path.is_file)

get async

get(
    key: str,
    prototype: BufferPrototype | None = None,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_local.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype | None = None,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    if prototype is None:
        prototype = default_buffer_prototype()
    if not self._is_open:
        await self._open()
    assert isinstance(key, str)
    path = self.root / key

    try:
        return await asyncio.to_thread(_get, path, prototype, byte_range)
    except (FileNotFoundError, IsADirectoryError, NotADirectoryError):
        return None

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_local.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited
    args = []
    for key, byte_range in key_ranges:
        assert isinstance(key, str)
        path = self.root / key
        args.append((_get, path, prototype, byte_range))
    return await concurrent_map(args, asyncio.to_thread, limit=None)  # TODO: fix limit

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/storage/_local.py
async def getsize(self, key: str) -> int:
    return os.path.getsize(self.root / key)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_local.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    to_strip = self.root.as_posix() + "/"
    for p in list(self.root.rglob("*")):
        if p.is_file():
            yield p.as_posix().replace(to_strip, "")

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_local.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    base = self.root / prefix
    try:
        key_iter = base.iterdir()
        for key in key_iter:
            yield key.relative_to(base).as_posix()
    except (FileNotFoundError, NotADirectoryError):
        pass

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_local.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    to_strip = self.root.as_posix() + "/"
    prefix = prefix.rstrip("/")
    for p in (self.root / prefix).rglob("*"):
        if p.is_file():
            yield p.as_posix().replace(to_strip, "")

move async

move(dest_root: Path | str) -> None

Move the store to another path. The old root directory is deleted.

Source code in zarr/storage/_local.py
async def move(self, dest_root: Path | str) -> None:
    """
    Move the store to another path. The old root directory is deleted.
    """
    if isinstance(dest_root, str):
        dest_root = Path(dest_root)
    os.makedirs(dest_root.parent, exist_ok=True)
    if os.path.exists(dest_root):
        raise FileExistsError(f"Destination root {dest_root} already exists.")
    shutil.move(self.root, dest_root)
    self.root = dest_root

open async classmethod

open(
    root: Path | str,
    *,
    read_only: bool = False,
    mode: AccessModeLiteral | None = None,
) -> Self

Create and open the store.

Parameters:

  • root (str or Path) –

    Directory to use as root of store.

  • read_only (bool, default: False ) –

    Whether the store is read-only

  • mode (AccessModeLiteral | None, default: None ) –

    Mode in which to create the store. This only affects opening the store, and the final read-only state of the store is controlled through the read_only parameter.

Returns:

  • Store

    The opened store instance.

Source code in zarr/storage/_local.py
@classmethod
async def open(
    cls, root: Path | str, *, read_only: bool = False, mode: AccessModeLiteral | None = None
) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    root : str or Path
        Directory to use as root of store.
    read_only : bool
        Whether the store is read-only
    mode :
        Mode in which to create the store. This only affects opening the store,
        and the final read-only state of the store is controlled through the
        read_only parameter.

    Returns
    -------
    Store
        The opened store instance.
    """
    # If mode = 'r+', want to open in read only mode (fail if exists),
    # but return a writeable store
    if mode is not None:
        read_only_creation = mode in ["r", "r+"]
    else:
        read_only_creation = read_only
    store = cls(root, read_only=read_only_creation)
    await store._open()

    # Set read_only state
    store = store.with_read_only(read_only)
    await store._open()
    return store

set async

set(key: str, value: Buffer) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_local.py
async def set(self, key: str, value: Buffer) -> None:
    # docstring inherited
    return await self._set(key, value)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_local.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    try:
        return await self._set(key, value, exclusive=True)
    except FileExistsError:
        pass

with_read_only

with_read_only(read_only: bool = False) -> Self

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_local.py
def with_read_only(self, read_only: bool = False) -> Self:
    # docstring inherited
    return type(self)(
        root=self.root,
        read_only=read_only,
    )

LoggingStore

Bases: WrapperStore[T_Store]

Store that logs all calls to another wrapped store.

Parameters:

  • store (Store) –

    Store to wrap

  • log_level (str, default: 'DEBUG' ) –

    Log level

  • log_handler (Handler, default: None ) –

    Log handler

Attributes:

  • counter (dict) –

    Counter of number of times each method has been called

Source code in zarr/storage/_logging.py
class LoggingStore(WrapperStore[T_Store]):
    """
    Store that logs all calls to another wrapped store.

    Parameters
    ----------
    store : Store
        Store to wrap
    log_level : str
        Log level
    log_handler : logging.Handler
        Log handler

    Attributes
    ----------
    counter : dict
        Counter of number of times each method has been called
    """

    counter: defaultdict[str, int]

    def __init__(
        self,
        store: T_Store,
        log_level: str = "DEBUG",
        log_handler: logging.Handler | None = None,
    ) -> None:
        super().__init__(store)
        self.counter = defaultdict(int)
        self.log_level = log_level
        self.log_handler = log_handler
        self._configure_logger(log_level, log_handler)

    def _configure_logger(
        self, log_level: str = "DEBUG", log_handler: logging.Handler | None = None
    ) -> None:
        self.log_level = log_level
        self.logger = logging.getLogger(f"LoggingStore({self._store})")
        self.logger.setLevel(log_level)

        if not self.logger.hasHandlers():
            if not log_handler:
                log_handler = self._default_handler()
            # Add handler to logger
            self.logger.addHandler(log_handler)

    def _default_handler(self) -> logging.Handler:
        """Define a default log handler"""
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setLevel(self.log_level)
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        return handler

    @contextmanager
    def log(self, hint: Any = "") -> Generator[None, None, None]:
        """Context manager to log method calls

        Each call to the wrapped store is logged to the configured logger and added to
        the counter dict.
        """
        method = inspect.stack()[2].function
        op = f"{type(self._store).__name__}.{method}"
        if hint:
            op = f"{op}({hint})"
        self.logger.info(" Calling %s", op)
        start_time = time.time()
        try:
            self.counter[method] += 1
            yield
        finally:
            end_time = time.time()
            self.logger.info("Finished %s [%.2f s]", op, end_time - start_time)

    @classmethod
    async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
        log_level = kwargs.pop("log_level", "DEBUG")
        log_handler = kwargs.pop("log_handler", None)
        store = store_cls(*args, **kwargs)
        await store._open()
        return cls(store=store, log_level=log_level, log_handler=log_handler)

    @property
    def supports_writes(self) -> bool:
        with self.log():
            return self._store.supports_writes

    @property
    def supports_deletes(self) -> bool:
        with self.log():
            return self._store.supports_deletes

    @property
    def supports_listing(self) -> bool:
        with self.log():
            return self._store.supports_listing

    @property
    def read_only(self) -> bool:
        with self.log():
            return self._store.read_only

    @property
    def _is_open(self) -> bool:
        with self.log():
            return self._store._is_open

    @_is_open.setter
    def _is_open(self, value: bool) -> None:
        raise NotImplementedError("LoggingStore must be opened via the `_open` method")

    async def _open(self) -> None:
        with self.log():
            return await self._store._open()

    async def _ensure_open(self) -> None:
        with self.log():
            return await self._store._ensure_open()

    async def is_empty(self, prefix: str = "") -> bool:
        # docstring inherited
        with self.log():
            return await self._store.is_empty(prefix=prefix)

    async def clear(self) -> None:
        # docstring inherited
        with self.log():
            return await self._store.clear()

    def __str__(self) -> str:
        return f"logging-{self._store}"

    def __repr__(self) -> str:
        return f"LoggingStore({self._store.__class__.__name__}, '{self._store}')"

    def __eq__(self, other: object) -> bool:
        with self.log(other):
            return type(self) is type(other) and self._store.__eq__(other._store)  # type: ignore[attr-defined]

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        # docstring inherited
        with self.log(key):
            return await self._store.get(key=key, prototype=prototype, byte_range=byte_range)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited
        keys = ",".join([k[0] for k in key_ranges])
        with self.log(keys):
            return await self._store.get_partial_values(prototype=prototype, key_ranges=key_ranges)

    async def exists(self, key: str) -> bool:
        # docstring inherited
        with self.log(key):
            return await self._store.exists(key)

    async def set(self, key: str, value: Buffer) -> None:
        # docstring inherited
        with self.log(key):
            return await self._store.set(key=key, value=value)

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        # docstring inherited
        with self.log(key):
            return await self._store.set_if_not_exists(key=key, value=value)

    async def delete(self, key: str) -> None:
        # docstring inherited
        with self.log(key):
            return await self._store.delete(key=key)

    async def list(self) -> AsyncGenerator[str, None]:
        # docstring inherited
        with self.log():
            async for key in self._store.list():
                yield key

    async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
        # docstring inherited
        with self.log(prefix):
            async for key in self._store.list_prefix(prefix=prefix):
                yield key

    async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
        # docstring inherited
        with self.log(prefix):
            async for key in self._store.list_dir(prefix=prefix):
                yield key

    async def delete_dir(self, prefix: str) -> None:
        # docstring inherited
        with self.log(prefix):
            await self._store.delete_dir(prefix=prefix)

    async def getsize(self, key: str) -> int:
        with self.log(key):
            return await self._store.getsize(key)

    async def getsize_prefix(self, prefix: str) -> int:
        with self.log(prefix):
            return await self._store.getsize_prefix(prefix)

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes property

supports_deletes: bool

Does the store support deletes?

supports_listing property

supports_listing: bool

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes property

supports_writes: bool

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/storage/_wrapper.py
def __enter__(self) -> Self:
    return type(self)(self._store.__enter__())

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_logging.py
def __eq__(self, other: object) -> bool:
    with self.log(other):
        return type(self) is type(other) and self._store.__eq__(other._store)  # type: ignore[attr-defined]

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    return self._store.__exit__(exc_type, exc_value, traceback)

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_logging.py
async def clear(self) -> None:
    # docstring inherited
    with self.log():
        return await self._store.clear()

close

close() -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def close(self) -> None:
    self._store.close()

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_logging.py
async def delete(self, key: str) -> None:
    # docstring inherited
    with self.log(key):
        return await self._store.delete(key=key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_logging.py
async def delete_dir(self, prefix: str) -> None:
    # docstring inherited
    with self.log(prefix):
        await self._store.delete_dir(prefix=prefix)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_logging.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    with self.log(key):
        return await self._store.exists(key)

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_logging.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    with self.log(key):
        return await self._store.get(key=key, prototype=prototype, byte_range=byte_range)

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_logging.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited
    keys = ",".join([k[0] for k in key_ranges])
    with self.log(keys):
        return await self._store.get_partial_values(prototype=prototype, key_ranges=key_ranges)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/storage/_logging.py
async def getsize(self, key: str) -> int:
    with self.log(key):
        return await self._store.getsize(key)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/storage/_logging.py
async def getsize_prefix(self, prefix: str) -> int:
    with self.log(prefix):
        return await self._store.getsize_prefix(prefix)

is_empty async

is_empty(prefix: str = '') -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/storage/_logging.py
async def is_empty(self, prefix: str = "") -> bool:
    # docstring inherited
    with self.log():
        return await self._store.is_empty(prefix=prefix)

list async

list() -> AsyncGenerator[str, None]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_logging.py
async def list(self) -> AsyncGenerator[str, None]:
    # docstring inherited
    with self.log():
        async for key in self._store.list():
            yield key

list_dir async

list_dir(prefix: str) -> AsyncGenerator[str, None]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_logging.py
async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
    # docstring inherited
    with self.log(prefix):
        async for key in self._store.list_dir(prefix=prefix):
            yield key

list_prefix async

list_prefix(prefix: str) -> AsyncGenerator[str, None]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_logging.py
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
    # docstring inherited
    with self.log(prefix):
        async for key in self._store.list_prefix(prefix=prefix):
            yield key

log

log(hint: Any = '') -> Generator[None, None, None]

Context manager to log method calls

Each call to the wrapped store is logged to the configured logger and added to the counter dict.

Source code in zarr/storage/_logging.py
@contextmanager
def log(self, hint: Any = "") -> Generator[None, None, None]:
    """Context manager to log method calls

    Each call to the wrapped store is logged to the configured logger and added to
    the counter dict.
    """
    method = inspect.stack()[2].function
    op = f"{type(self._store).__name__}.{method}"
    if hint:
        op = f"{op}({hint})"
    self.logger.info(" Calling %s", op)
    start_time = time.time()
    try:
        self.counter[method] += 1
        yield
    finally:
        end_time = time.time()
        self.logger.info("Finished %s [%.2f s]", op, end_time - start_time)

open async classmethod

open(
    store_cls: type[T_Store], *args: Any, **kwargs: Any
) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/storage/_logging.py
@classmethod
async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
    log_level = kwargs.pop("log_level", "DEBUG")
    log_handler = kwargs.pop("log_handler", None)
    store = store_cls(*args, **kwargs)
    await store._open()
    return cls(store=store, log_level=log_level, log_handler=log_handler)

set async

set(key: str, value: Buffer) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_logging.py
async def set(self, key: str, value: Buffer) -> None:
    # docstring inherited
    with self.log(key):
        return await self._store.set(key=key, value=value)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_logging.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    with self.log(key):
        return await self._store.set_if_not_exists(key=key, value=value)

with_read_only

with_read_only(read_only: bool = False) -> Store

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/abc/store.py
def with_read_only(self, read_only: bool = False) -> Store:
    """
    Return a new store with a new read_only setting.

    The new store points to the same location with the specified new read_only state.
    The returned Store is not automatically opened, and this store is
    not automatically closed.

    Parameters
    ----------
    read_only
        If True, the store will be created in read-only mode. Defaults to False.

    Returns
    -------
        A new store of the same type with the new read only attribute.
    """
    raise NotImplementedError(
        f"with_read_only is not implemented for the {type(self)} store type."
    )

MemoryStore

Bases: Store

Store for local memory.

Parameters:

  • store_dict (dict, default: None ) –

    Initial data

  • read_only (bool, default: False ) –

    Whether the store is read-only

Attributes:

Source code in zarr/storage/_memory.py
class MemoryStore(Store):
    """
    Store for local memory.

    Parameters
    ----------
    store_dict : dict
        Initial data
    read_only : bool
        Whether the store is read-only

    Attributes
    ----------
    supports_writes
    supports_deletes
    supports_listing
    """

    supports_writes: bool = True
    supports_deletes: bool = True
    supports_listing: bool = True

    _store_dict: MutableMapping[str, Buffer]

    def __init__(
        self,
        store_dict: MutableMapping[str, Buffer] | None = None,
        *,
        read_only: bool = False,
    ) -> None:
        super().__init__(read_only=read_only)
        if store_dict is None:
            store_dict = {}
        self._store_dict = store_dict

    def with_read_only(self, read_only: bool = False) -> MemoryStore:
        # docstring inherited
        return type(self)(
            store_dict=self._store_dict,
            read_only=read_only,
        )

    async def clear(self) -> None:
        # docstring inherited
        self._store_dict.clear()

    def __str__(self) -> str:
        return f"memory://{id(self._store_dict)}"

    def __repr__(self) -> str:
        return f"MemoryStore('{self}')"

    def __eq__(self, other: object) -> bool:
        return (
            isinstance(other, type(self))
            and self._store_dict == other._store_dict
            and self.read_only == other.read_only
        )

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        # docstring inherited
        if not self._is_open:
            await self._open()
        assert isinstance(key, str)
        try:
            value = self._store_dict[key]
            start, stop = _normalize_byte_range_index(value, byte_range)
            return prototype.buffer.from_buffer(value[start:stop])
        except KeyError:
            return None

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited

        # All the key-ranges arguments goes with the same prototype
        async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None:
            return await self.get(key, prototype=prototype, byte_range=byte_range)

        return await concurrent_map(key_ranges, _get, limit=None)

    async def exists(self, key: str) -> bool:
        # docstring inherited
        return key in self._store_dict

    async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
        # docstring inherited
        self._check_writable()
        await self._ensure_open()
        assert isinstance(key, str)
        if not isinstance(value, Buffer):
            raise TypeError(
                f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )

        if byte_range is not None:
            buf = self._store_dict[key]
            buf[byte_range[0] : byte_range[1]] = value
            self._store_dict[key] = buf
        else:
            self._store_dict[key] = value

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        # docstring inherited
        self._check_writable()
        await self._ensure_open()
        self._store_dict.setdefault(key, value)

    async def delete(self, key: str) -> None:
        # docstring inherited
        self._check_writable()
        try:
            del self._store_dict[key]
        except KeyError:
            logger.debug("Key %s does not exist.", key)

    async def list(self) -> AsyncIterator[str]:
        # docstring inherited
        for key in self._store_dict:
            yield key

    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        # note: we materialize all dict keys into a list here so we can mutate the dict in-place (e.g. in delete_prefix)
        for key in list(self._store_dict):
            if key.startswith(prefix):
                yield key

    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        prefix = prefix.rstrip("/")

        if prefix == "":
            keys_unique = {k.split("/")[0] for k in self._store_dict}
        else:
            # Our dictionary doesn't contain directory markers, but we want to include
            # a pseudo directory when there's a nested item and we're listing an
            # intermediate level.
            keys_unique = {
                key.removeprefix(prefix + "/").split("/")[0]
                for key in self._store_dict
                if key.startswith(prefix + "/") and key != prefix
            }

        for key in keys_unique:
            yield key

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = True

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_memory.py
def __eq__(self, other: object) -> bool:
    return (
        isinstance(other, type(self))
        and self._store_dict == other._store_dict
        and self.read_only == other.read_only
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_memory.py
async def clear(self) -> None:
    # docstring inherited
    self._store_dict.clear()

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_memory.py
async def delete(self, key: str) -> None:
    # docstring inherited
    self._check_writable()
    try:
        del self._store_dict[key]
    except KeyError:
        logger.debug("Key %s does not exist.", key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/abc/store.py
async def delete_dir(self, prefix: str) -> None:
    """
    Remove all keys and prefixes in the store that begin with a given prefix.
    """
    if not self.supports_deletes:
        raise NotImplementedError
    if not self.supports_listing:
        raise NotImplementedError
    self._check_writable()
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for key in self.list_prefix(prefix):
        await self.delete(key)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_memory.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    return key in self._store_dict

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_memory.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    if not self._is_open:
        await self._open()
    assert isinstance(key, str)
    try:
        value = self._store_dict[key]
        start, stop = _normalize_byte_range_index(value, byte_range)
        return prototype.buffer.from_buffer(value[start:stop])
    except KeyError:
        return None

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_memory.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited

    # All the key-ranges arguments goes with the same prototype
    async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None:
        return await self.get(key, prototype=prototype, byte_range=byte_range)

    return await concurrent_map(key_ranges, _get, limit=None)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_memory.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    for key in self._store_dict:
        yield key

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    prefix = prefix.rstrip("/")

    if prefix == "":
        keys_unique = {k.split("/")[0] for k in self._store_dict}
    else:
        # Our dictionary doesn't contain directory markers, but we want to include
        # a pseudo directory when there's a nested item and we're listing an
        # intermediate level.
        keys_unique = {
            key.removeprefix(prefix + "/").split("/")[0]
            for key in self._store_dict
            if key.startswith(prefix + "/") and key != prefix
        }

    for key in keys_unique:
        yield key

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_memory.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    # note: we materialize all dict keys into a list here so we can mutate the dict in-place (e.g. in delete_prefix)
    for key in list(self._store_dict):
        if key.startswith(prefix):
            yield key

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(
    key: str,
    value: Buffer,
    byte_range: tuple[int, int] | None = None,
) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_memory.py
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
    # docstring inherited
    self._check_writable()
    await self._ensure_open()
    assert isinstance(key, str)
    if not isinstance(value, Buffer):
        raise TypeError(
            f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
        )

    if byte_range is not None:
        buf = self._store_dict[key]
        buf[byte_range[0] : byte_range[1]] = value
        self._store_dict[key] = buf
    else:
        self._store_dict[key] = value

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_memory.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    self._check_writable()
    await self._ensure_open()
    self._store_dict.setdefault(key, value)

with_read_only

with_read_only(read_only: bool = False) -> MemoryStore

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_memory.py
def with_read_only(self, read_only: bool = False) -> MemoryStore:
    # docstring inherited
    return type(self)(
        store_dict=self._store_dict,
        read_only=read_only,
    )

ObjectStore

Bases: Store

Store that uses obstore for fast read/write from AWS, GCP, Azure.

Parameters:

  • store (ObjectStore) –

    An obstore store instance that is set up with the proper credentials.

  • read_only (bool, default: False ) –

    Whether to open the store in read-only mode.

Warnings

ObjectStore is experimental and subject to API changes without notice. Please raise an issue with any comments/concerns about the store.

Source code in zarr/storage/_obstore.py
class ObjectStore(Store):
    """
    Store that uses obstore for fast read/write from AWS, GCP, Azure.

    Parameters
    ----------
    store : obstore.store.ObjectStore
        An obstore store instance that is set up with the proper credentials.
    read_only : bool
        Whether to open the store in read-only mode.

    Warnings
    --------
    ObjectStore is experimental and subject to API changes without notice. Please
    raise an issue with any comments/concerns about the store.
    """

    store: _UpstreamObjectStore
    """The underlying obstore instance."""

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, ObjectStore):
            return False

        if not self.read_only == value.read_only:
            return False

        return self.store == value.store

    def __init__(self, store: _UpstreamObjectStore, *, read_only: bool = False) -> None:
        if not store.__class__.__module__.startswith("obstore"):
            raise TypeError(f"expected ObjectStore class, got {store!r}")
        super().__init__(read_only=read_only)
        self.store = store

    def with_read_only(self, read_only: bool = False) -> ObjectStore:
        # docstring inherited
        return type(self)(
            store=self.store,
            read_only=read_only,
        )

    def __str__(self) -> str:
        return f"object_store://{self.store}"

    def __repr__(self) -> str:
        return f"{type(self).__name__}({self})"

    def __getstate__(self) -> dict[Any, Any]:
        state = self.__dict__.copy()
        state["store"] = pickle.dumps(self.store)
        return state

    def __setstate__(self, state: dict[Any, Any]) -> None:
        state["store"] = pickle.loads(state["store"])
        self.__dict__.update(state)

    async def get(
        self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
    ) -> Buffer | None:
        # docstring inherited
        import obstore as obs

        try:
            if byte_range is None:
                resp = await obs.get_async(self.store, key)
                return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
            elif isinstance(byte_range, RangeByteRequest):
                bytes = await obs.get_range_async(
                    self.store, key, start=byte_range.start, end=byte_range.end
                )
                return prototype.buffer.from_bytes(bytes)  # type: ignore[arg-type]
            elif isinstance(byte_range, OffsetByteRequest):
                resp = await obs.get_async(
                    self.store, key, options={"range": {"offset": byte_range.offset}}
                )
                return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
            elif isinstance(byte_range, SuffixByteRequest):
                # some object stores (Azure) don't support suffix requests. In this
                # case, our workaround is to first get the length of the object and then
                # manually request the byte range at the end.
                try:
                    resp = await obs.get_async(
                        self.store, key, options={"range": {"suffix": byte_range.suffix}}
                    )
                    return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
                except obs.exceptions.NotSupportedError:
                    head_resp = await obs.head_async(self.store, key)
                    file_size = head_resp["size"]
                    suffix_len = byte_range.suffix
                    buffer = await obs.get_range_async(
                        self.store,
                        key,
                        start=file_size - suffix_len,
                        length=suffix_len,
                    )
                    return prototype.buffer.from_bytes(buffer)  # type: ignore[arg-type]
            else:
                raise ValueError(f"Unexpected byte_range, got {byte_range}")
        except _ALLOWED_EXCEPTIONS:
            return None

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited
        return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges)

    async def exists(self, key: str) -> bool:
        # docstring inherited
        import obstore as obs

        try:
            await obs.head_async(self.store, key)
        except FileNotFoundError:
            return False
        else:
            return True

    @property
    def supports_writes(self) -> bool:
        # docstring inherited
        return True

    async def set(self, key: str, value: Buffer) -> None:
        # docstring inherited
        import obstore as obs

        self._check_writable()

        buf = value.as_buffer_like()
        await obs.put_async(self.store, key, buf)

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        # docstring inherited
        import obstore as obs

        self._check_writable()
        buf = value.as_buffer_like()
        with contextlib.suppress(obs.exceptions.AlreadyExistsError):
            await obs.put_async(self.store, key, buf, mode="create")

    @property
    def supports_deletes(self) -> bool:
        # docstring inherited
        return True

    async def delete(self, key: str) -> None:
        # docstring inherited
        import obstore as obs

        self._check_writable()

        # Some obstore stores such as local filesystems, GCP and Azure raise an error
        # when deleting a non-existent key, while others such as S3 and in-memory do
        # not. We suppress the error to make the behavior consistent across all obstore
        # stores. This is also in line with the behavior of the other Zarr store adapters.
        with contextlib.suppress(FileNotFoundError):
            await obs.delete_async(self.store, key)

    async def delete_dir(self, prefix: str) -> None:
        # docstring inherited
        import obstore as obs

        self._check_writable()
        if prefix != "" and not prefix.endswith("/"):
            prefix += "/"

        metas = await obs.list(self.store, prefix).collect_async()
        keys = [(m["path"],) for m in metas]
        await concurrent_map(keys, self.delete, limit=config.get("async.concurrency"))

    @property
    def supports_listing(self) -> bool:
        # docstring inherited
        return True

    async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]:
        import obstore as obs

        objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix)
        async for batch in objects:
            for item in batch:
                yield item

    def list(self) -> AsyncGenerator[str, None]:
        # docstring inherited
        return (obj["path"] async for obj in self._list())

    def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
        # docstring inherited
        return (obj["path"] async for obj in self._list(prefix))

    def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
        # docstring inherited
        import obstore as obs

        coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix)
        return _transform_list_dir(coroutine, prefix)

    async def getsize(self, key: str) -> int:
        # docstring inherited
        import obstore as obs

        resp = await obs.head_async(self.store, key)
        return resp["size"]

    async def getsize_prefix(self, prefix: str) -> int:
        # docstring inherited
        sizes = [obj["size"] async for obj in self._list(prefix=prefix)]
        return sum(sizes)

read_only property

read_only: bool

Is the store read-only?

store instance-attribute

store: ObjectStore = store

The underlying obstore instance.

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes property

supports_deletes: bool

Does the store support deletes?

supports_listing property

supports_listing: bool

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes property

supports_writes: bool

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(value: object) -> bool

Equality comparison.

Source code in zarr/storage/_obstore.py
def __eq__(self, value: object) -> bool:
    if not isinstance(value, ObjectStore):
        return False

    if not self.read_only == value.read_only:
        return False

    return self.store == value.store

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/abc/store.py
async def clear(self) -> None:
    """
    Clear the store.

    Remove all keys and values from the store.
    """
    if not self.supports_deletes:
        raise NotImplementedError
    if not self.supports_listing:
        raise NotImplementedError
    self._check_writable()
    await self.delete_dir("")

close

close() -> None

Close the store.

Source code in zarr/abc/store.py
def close(self) -> None:
    """Close the store."""
    self._is_open = False

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_obstore.py
async def delete(self, key: str) -> None:
    # docstring inherited
    import obstore as obs

    self._check_writable()

    # Some obstore stores such as local filesystems, GCP and Azure raise an error
    # when deleting a non-existent key, while others such as S3 and in-memory do
    # not. We suppress the error to make the behavior consistent across all obstore
    # stores. This is also in line with the behavior of the other Zarr store adapters.
    with contextlib.suppress(FileNotFoundError):
        await obs.delete_async(self.store, key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_obstore.py
async def delete_dir(self, prefix: str) -> None:
    # docstring inherited
    import obstore as obs

    self._check_writable()
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"

    metas = await obs.list(self.store, prefix).collect_async()
    keys = [(m["path"],) for m in metas]
    await concurrent_map(keys, self.delete, limit=config.get("async.concurrency"))

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_obstore.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    import obstore as obs

    try:
        await obs.head_async(self.store, key)
    except FileNotFoundError:
        return False
    else:
        return True

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_obstore.py
async def get(
    self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
    # docstring inherited
    import obstore as obs

    try:
        if byte_range is None:
            resp = await obs.get_async(self.store, key)
            return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
        elif isinstance(byte_range, RangeByteRequest):
            bytes = await obs.get_range_async(
                self.store, key, start=byte_range.start, end=byte_range.end
            )
            return prototype.buffer.from_bytes(bytes)  # type: ignore[arg-type]
        elif isinstance(byte_range, OffsetByteRequest):
            resp = await obs.get_async(
                self.store, key, options={"range": {"offset": byte_range.offset}}
            )
            return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
        elif isinstance(byte_range, SuffixByteRequest):
            # some object stores (Azure) don't support suffix requests. In this
            # case, our workaround is to first get the length of the object and then
            # manually request the byte range at the end.
            try:
                resp = await obs.get_async(
                    self.store, key, options={"range": {"suffix": byte_range.suffix}}
                )
                return prototype.buffer.from_bytes(await resp.bytes_async())  # type: ignore[arg-type]
            except obs.exceptions.NotSupportedError:
                head_resp = await obs.head_async(self.store, key)
                file_size = head_resp["size"]
                suffix_len = byte_range.suffix
                buffer = await obs.get_range_async(
                    self.store,
                    key,
                    start=file_size - suffix_len,
                    length=suffix_len,
                )
                return prototype.buffer.from_bytes(buffer)  # type: ignore[arg-type]
        else:
            raise ValueError(f"Unexpected byte_range, got {byte_range}")
    except _ALLOWED_EXCEPTIONS:
        return None

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_obstore.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited
    return await _get_partial_values(self.store, prototype=prototype, key_ranges=key_ranges)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/storage/_obstore.py
async def getsize(self, key: str) -> int:
    # docstring inherited
    import obstore as obs

    resp = await obs.head_async(self.store, key)
    return resp["size"]

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/storage/_obstore.py
async def getsize_prefix(self, prefix: str) -> int:
    # docstring inherited
    sizes = [obj["size"] async for obj in self._list(prefix=prefix)]
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list

list() -> AsyncGenerator[str, None]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_obstore.py
def list(self) -> AsyncGenerator[str, None]:
    # docstring inherited
    return (obj["path"] async for obj in self._list())

list_dir

list_dir(prefix: str) -> AsyncGenerator[str, None]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_obstore.py
def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
    # docstring inherited
    import obstore as obs

    coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix)
    return _transform_list_dir(coroutine, prefix)

list_prefix

list_prefix(prefix: str) -> AsyncGenerator[str, None]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_obstore.py
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
    # docstring inherited
    return (obj["path"] async for obj in self._list(prefix))

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(key: str, value: Buffer) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_obstore.py
async def set(self, key: str, value: Buffer) -> None:
    # docstring inherited
    import obstore as obs

    self._check_writable()

    buf = value.as_buffer_like()
    await obs.put_async(self.store, key, buf)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_obstore.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    # docstring inherited
    import obstore as obs

    self._check_writable()
    buf = value.as_buffer_like()
    with contextlib.suppress(obs.exceptions.AlreadyExistsError):
        await obs.put_async(self.store, key, buf, mode="create")

with_read_only

with_read_only(read_only: bool = False) -> ObjectStore

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/storage/_obstore.py
def with_read_only(self, read_only: bool = False) -> ObjectStore:
    # docstring inherited
    return type(self)(
        store=self.store,
        read_only=read_only,
    )

StorePath

Path-like interface for a Store.

Parameters:

  • store (Store) –

    The store to use.

  • path (str, default: '' ) –

    The path within the store.

Source code in zarr/storage/_common.py
class StorePath:
    """
    Path-like interface for a Store.

    Parameters
    ----------
    store : Store
        The store to use.
    path : str
        The path within the store.
    """

    store: Store
    path: str

    def __init__(self, store: Store, path: str = "") -> None:
        self.store = store
        self.path = normalize_path(path)

    @property
    def read_only(self) -> bool:
        return self.store.read_only

    @classmethod
    async def _create_open_instance(cls, store: Store, path: str) -> Self:
        """Helper to create and return a StorePath instance."""
        await store._ensure_open()
        return cls(store, path)

    @classmethod
    async def open(cls, store: Store, path: str, mode: AccessModeLiteral | None = None) -> Self:
        """
        Open StorePath based on the provided mode.

        * If the mode is None, return an opened version of the store with no changes.
        * If the mode is 'r+', 'w-', 'w', or 'a' and the store is read-only, raise a ValueError.
        * If the mode is 'r' and the store is not read-only, return a copy of the store with read_only set to True.
        * If the mode is 'w-' and the store is not read-only and the StorePath contains keys, raise a FileExistsError.
        * If the mode is 'w'  and the store is not read-only, delete all keys nested within the StorePath.

        Parameters
        ----------
        mode : AccessModeLiteral
            The mode to use when initializing the store path.

            The accepted values are:

            - ``'r'``: read only (must exist)
            - ``'r+'``: read/write (must exist)
            - ``'a'``: read/write (create if doesn't exist)
            - ``'w'``: read/write (overwrite if exists)
            - ``'w-'``: read/write (create if doesn't exist).

        Raises
        ------
        FileExistsError
            If the mode is 'w-' and the store path already exists.
        ValueError
            If the mode is not "r" and the store is read-only, or
        """

        # fastpath if mode is None
        if mode is None:
            return await cls._create_open_instance(store, path)

        if mode not in ANY_ACCESS_MODE:
            raise ValueError(f"Invalid mode: {mode}, expected one of {ANY_ACCESS_MODE}")

        if store.read_only:
            # Don't allow write operations on a read-only store
            if mode != "r":
                raise ValueError(
                    f"Store is read-only but mode is {mode!r}. Create a writable store or use 'r' mode."
                )
            self = await cls._create_open_instance(store, path)
        elif mode == "r":
            # Create read-only copy for read mode on writable store
            try:
                read_only_store = store.with_read_only(True)
            except NotImplementedError as e:
                raise ValueError(
                    "Store is not read-only but mode is 'r'. Unable to create a read-only copy of the store. "
                    "Please use a read-only store or a storage class that implements .with_read_only()."
                ) from e
            self = await cls._create_open_instance(read_only_store, path)
        else:
            # writable store and writable mode
            self = await cls._create_open_instance(store, path)

        # Handle mode-specific operations
        match mode:
            case "w-":
                if not await self.is_empty():
                    raise FileExistsError(
                        f"Cannot create '{path}' with mode 'w-' because it already contains data. "
                        f"Use mode 'w' to overwrite or 'a' to append."
                    )
            case "w":
                await self.delete_dir()
        return self

    async def get(
        self,
        prototype: BufferPrototype | None = None,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        """
        Read bytes from the store.

        Parameters
        ----------
        prototype : BufferPrototype, optional
            The buffer prototype to use when reading the bytes.
        byte_range : ByteRequest, optional
            The range of bytes to read.

        Returns
        -------
        buffer : Buffer or None
            The read bytes, or None if the key does not exist.
        """
        if prototype is None:
            prototype = default_buffer_prototype()
        return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

    async def set(self, value: Buffer) -> None:
        """
        Write bytes to the store.

        Parameters
        ----------
        value : Buffer
            The buffer to write.
        """
        await self.store.set(self.path, value)

    async def delete(self) -> None:
        """
        Delete the key from the store.

        Raises
        ------
        NotImplementedError
            If the store does not support deletion.
        """
        await self.store.delete(self.path)

    async def delete_dir(self) -> None:
        """
        Delete all keys with the given prefix from the store.
        """
        await self.store.delete_dir(self.path)

    async def set_if_not_exists(self, default: Buffer) -> None:
        """
        Store a key to ``value`` if the key is not already present.

        Parameters
        ----------
        default : Buffer
            The buffer to store if the key is not already present.
        """
        await self.store.set_if_not_exists(self.path, default)

    async def exists(self) -> bool:
        """
        Check if the key exists in the store.

        Returns
        -------
        bool
            True if the key exists in the store, False otherwise.
        """
        return await self.store.exists(self.path)

    async def is_empty(self) -> bool:
        """
        Check if any keys exist in the store with the given prefix.

        Returns
        -------
        bool
            True if no keys exist in the store with the given prefix, False otherwise.
        """
        return await self.store.is_empty(self.path)

    def __truediv__(self, other: str) -> StorePath:
        """Combine this store path with another path"""
        return self.__class__(self.store, _dereference_path(self.path, other))

    def __str__(self) -> str:
        return _dereference_path(str(self.store), self.path)

    def __repr__(self) -> str:
        return f"StorePath({self.store.__class__.__name__}, '{self}')"

    def __eq__(self, other: object) -> bool:
        """
        Check if two StorePath objects are equal.

        Returns
        -------
        bool
            True if the two objects are equal, False otherwise.

        Notes
        -----
        Two StorePath objects are considered equal if their stores are equal
        and their paths are equal.
        """
        try:
            return self.store == other.store and self.path == other.path  # type: ignore[attr-defined, no-any-return]
        except Exception:
            pass
        return False

__eq__

__eq__(other: object) -> bool

Check if two StorePath objects are equal.

Returns:

  • bool

    True if the two objects are equal, False otherwise.

Notes

Two StorePath objects are considered equal if their stores are equal and their paths are equal.

Source code in zarr/storage/_common.py
def __eq__(self, other: object) -> bool:
    """
    Check if two StorePath objects are equal.

    Returns
    -------
    bool
        True if the two objects are equal, False otherwise.

    Notes
    -----
    Two StorePath objects are considered equal if their stores are equal
    and their paths are equal.
    """
    try:
        return self.store == other.store and self.path == other.path  # type: ignore[attr-defined, no-any-return]
    except Exception:
        pass
    return False

__truediv__

__truediv__(other: str) -> StorePath

Combine this store path with another path

Source code in zarr/storage/_common.py
def __truediv__(self, other: str) -> StorePath:
    """Combine this store path with another path"""
    return self.__class__(self.store, _dereference_path(self.path, other))

delete async

delete() -> None

Delete the key from the store.

Raises:

Source code in zarr/storage/_common.py
async def delete(self) -> None:
    """
    Delete the key from the store.

    Raises
    ------
    NotImplementedError
        If the store does not support deletion.
    """
    await self.store.delete(self.path)

delete_dir async

delete_dir() -> None

Delete all keys with the given prefix from the store.

Source code in zarr/storage/_common.py
async def delete_dir(self) -> None:
    """
    Delete all keys with the given prefix from the store.
    """
    await self.store.delete_dir(self.path)

exists async

exists() -> bool

Check if the key exists in the store.

Returns:

  • bool

    True if the key exists in the store, False otherwise.

Source code in zarr/storage/_common.py
async def exists(self) -> bool:
    """
    Check if the key exists in the store.

    Returns
    -------
    bool
        True if the key exists in the store, False otherwise.
    """
    return await self.store.exists(self.path)

get async

get(
    prototype: BufferPrototype | None = None,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Read bytes from the store.

Parameters:

  • prototype (BufferPrototype, default: None ) –

    The buffer prototype to use when reading the bytes.

  • byte_range (ByteRequest, default: None ) –

    The range of bytes to read.

Returns:

  • buffer ( Buffer or None ) –

    The read bytes, or None if the key does not exist.

Source code in zarr/storage/_common.py
async def get(
    self,
    prototype: BufferPrototype | None = None,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    """
    Read bytes from the store.

    Parameters
    ----------
    prototype : BufferPrototype, optional
        The buffer prototype to use when reading the bytes.
    byte_range : ByteRequest, optional
        The range of bytes to read.

    Returns
    -------
    buffer : Buffer or None
        The read bytes, or None if the key does not exist.
    """
    if prototype is None:
        prototype = default_buffer_prototype()
    return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

is_empty async

is_empty() -> bool

Check if any keys exist in the store with the given prefix.

Returns:

  • bool

    True if no keys exist in the store with the given prefix, False otherwise.

Source code in zarr/storage/_common.py
async def is_empty(self) -> bool:
    """
    Check if any keys exist in the store with the given prefix.

    Returns
    -------
    bool
        True if no keys exist in the store with the given prefix, False otherwise.
    """
    return await self.store.is_empty(self.path)

open async classmethod

open(
    store: Store,
    path: str,
    mode: AccessModeLiteral | None = None,
) -> Self

Open StorePath based on the provided mode.

  • If the mode is None, return an opened version of the store with no changes.
  • If the mode is 'r+', 'w-', 'w', or 'a' and the store is read-only, raise a ValueError.
  • If the mode is 'r' and the store is not read-only, return a copy of the store with read_only set to True.
  • If the mode is 'w-' and the store is not read-only and the StorePath contains keys, raise a FileExistsError.
  • If the mode is 'w' and the store is not read-only, delete all keys nested within the StorePath.

Parameters:

  • mode (AccessModeLiteral, default: None ) –

    The mode to use when initializing the store path.

    The accepted values are:

    • 'r': read only (must exist)
    • 'r+': read/write (must exist)
    • 'a': read/write (create if doesn't exist)
    • 'w': read/write (overwrite if exists)
    • 'w-': read/write (create if doesn't exist).

Raises:

  • FileExistsError

    If the mode is 'w-' and the store path already exists.

  • ValueError

    If the mode is not "r" and the store is read-only, or

Source code in zarr/storage/_common.py
@classmethod
async def open(cls, store: Store, path: str, mode: AccessModeLiteral | None = None) -> Self:
    """
    Open StorePath based on the provided mode.

    * If the mode is None, return an opened version of the store with no changes.
    * If the mode is 'r+', 'w-', 'w', or 'a' and the store is read-only, raise a ValueError.
    * If the mode is 'r' and the store is not read-only, return a copy of the store with read_only set to True.
    * If the mode is 'w-' and the store is not read-only and the StorePath contains keys, raise a FileExistsError.
    * If the mode is 'w'  and the store is not read-only, delete all keys nested within the StorePath.

    Parameters
    ----------
    mode : AccessModeLiteral
        The mode to use when initializing the store path.

        The accepted values are:

        - ``'r'``: read only (must exist)
        - ``'r+'``: read/write (must exist)
        - ``'a'``: read/write (create if doesn't exist)
        - ``'w'``: read/write (overwrite if exists)
        - ``'w-'``: read/write (create if doesn't exist).

    Raises
    ------
    FileExistsError
        If the mode is 'w-' and the store path already exists.
    ValueError
        If the mode is not "r" and the store is read-only, or
    """

    # fastpath if mode is None
    if mode is None:
        return await cls._create_open_instance(store, path)

    if mode not in ANY_ACCESS_MODE:
        raise ValueError(f"Invalid mode: {mode}, expected one of {ANY_ACCESS_MODE}")

    if store.read_only:
        # Don't allow write operations on a read-only store
        if mode != "r":
            raise ValueError(
                f"Store is read-only but mode is {mode!r}. Create a writable store or use 'r' mode."
            )
        self = await cls._create_open_instance(store, path)
    elif mode == "r":
        # Create read-only copy for read mode on writable store
        try:
            read_only_store = store.with_read_only(True)
        except NotImplementedError as e:
            raise ValueError(
                "Store is not read-only but mode is 'r'. Unable to create a read-only copy of the store. "
                "Please use a read-only store or a storage class that implements .with_read_only()."
            ) from e
        self = await cls._create_open_instance(read_only_store, path)
    else:
        # writable store and writable mode
        self = await cls._create_open_instance(store, path)

    # Handle mode-specific operations
    match mode:
        case "w-":
            if not await self.is_empty():
                raise FileExistsError(
                    f"Cannot create '{path}' with mode 'w-' because it already contains data. "
                    f"Use mode 'w' to overwrite or 'a' to append."
                )
        case "w":
            await self.delete_dir()
    return self

set async

set(value: Buffer) -> None

Write bytes to the store.

Parameters:

  • value (Buffer) –

    The buffer to write.

Source code in zarr/storage/_common.py
async def set(self, value: Buffer) -> None:
    """
    Write bytes to the store.

    Parameters
    ----------
    value : Buffer
        The buffer to write.
    """
    await self.store.set(self.path, value)

set_if_not_exists async

set_if_not_exists(default: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

  • default (Buffer) –

    The buffer to store if the key is not already present.

Source code in zarr/storage/_common.py
async def set_if_not_exists(self, default: Buffer) -> None:
    """
    Store a key to ``value`` if the key is not already present.

    Parameters
    ----------
    default : Buffer
        The buffer to store if the key is not already present.
    """
    await self.store.set_if_not_exists(self.path, default)

WrapperStore

Bases: Store, Generic[T_Store]

Store that wraps an existing Store.

By default all of the store methods are delegated to the wrapped store instance, which is accessible via the ._store attribute of this class.

Use this class to modify or extend the behavior of the other store classes.

Source code in zarr/storage/_wrapper.py
class WrapperStore(Store, Generic[T_Store]):
    """
    Store that wraps an existing Store.

    By default all of the store methods are delegated to the wrapped store instance, which is
    accessible via the ``._store`` attribute of this class.

    Use this class to modify or extend the behavior of the other store classes.
    """

    _store: T_Store

    def __init__(self, store: T_Store) -> None:
        self._store = store

    @classmethod
    async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
        store = store_cls(*args, **kwargs)
        await store._open()
        return cls(store=store)

    def __enter__(self) -> Self:
        return type(self)(self._store.__enter__())

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        return self._store.__exit__(exc_type, exc_value, traceback)

    async def _open(self) -> None:
        await self._store._open()

    async def _ensure_open(self) -> None:
        await self._store._ensure_open()

    async def is_empty(self, prefix: str) -> bool:
        return await self._store.is_empty(prefix)

    @property
    def _is_open(self) -> bool:
        return self._store._is_open

    @_is_open.setter
    def _is_open(self, value: bool) -> None:
        raise NotImplementedError("WrapperStore must be opened via the `_open` method")

    async def clear(self) -> None:
        return await self._store.clear()

    @property
    def read_only(self) -> bool:
        return self._store.read_only

    def _check_writable(self) -> None:
        return self._store._check_writable()

    def __eq__(self, value: object) -> bool:
        return type(self) is type(value) and self._store.__eq__(value._store)  # type: ignore[attr-defined]

    def __str__(self) -> str:
        return f"wrapping-{self._store}"

    def __repr__(self) -> str:
        return f"WrapperStore({self._store.__class__.__name__}, '{self._store}')"

    async def get(
        self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
    ) -> Buffer | None:
        return await self._store.get(key, prototype, byte_range)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        return await self._store.get_partial_values(prototype, key_ranges)

    async def exists(self, key: str) -> bool:
        return await self._store.exists(key)

    async def set(self, key: str, value: Buffer) -> None:
        await self._store.set(key, value)

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        return await self._store.set_if_not_exists(key, value)

    async def _set_many(self, values: Iterable[tuple[str, Buffer]]) -> None:
        await self._store._set_many(values)

    @property
    def supports_writes(self) -> bool:
        return self._store.supports_writes

    @property
    def supports_deletes(self) -> bool:
        return self._store.supports_deletes

    async def delete(self, key: str) -> None:
        await self._store.delete(key)

    @property
    def supports_listing(self) -> bool:
        return self._store.supports_listing

    def list(self) -> AsyncIterator[str]:
        return self._store.list()

    def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        return self._store.list_prefix(prefix)

    def list_dir(self, prefix: str) -> AsyncIterator[str]:
        return self._store.list_dir(prefix)

    async def delete_dir(self, prefix: str) -> None:
        return await self._store.delete_dir(prefix)

    def close(self) -> None:
        self._store.close()

    async def _get_many(
        self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
    ) -> AsyncGenerator[tuple[str, Buffer | None], None]:
        async for req in self._store._get_many(requests):
            yield req

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes property

supports_deletes: bool

Does the store support deletes?

supports_listing property

supports_listing: bool

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes property

supports_writes: bool

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/storage/_wrapper.py
def __enter__(self) -> Self:
    return type(self)(self._store.__enter__())

__eq__

__eq__(value: object) -> bool

Equality comparison.

Source code in zarr/storage/_wrapper.py
def __eq__(self, value: object) -> bool:
    return type(self) is type(value) and self._store.__eq__(value._store)  # type: ignore[attr-defined]

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    return self._store.__exit__(exc_type, exc_value, traceback)

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_wrapper.py
async def clear(self) -> None:
    return await self._store.clear()

close

close() -> None

Close the store.

Source code in zarr/storage/_wrapper.py
def close(self) -> None:
    self._store.close()

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_wrapper.py
async def delete(self, key: str) -> None:
    await self._store.delete(key)

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_wrapper.py
async def delete_dir(self, prefix: str) -> None:
    return await self._store.delete_dir(prefix)

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_wrapper.py
async def exists(self, key: str) -> bool:
    return await self._store.exists(key)

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_wrapper.py
async def get(
    self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
    return await self._store.get(key, prototype, byte_range)

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_wrapper.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    return await self._store.get_partial_values(prototype, key_ranges)

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/storage/_wrapper.py
async def is_empty(self, prefix: str) -> bool:
    return await self._store.is_empty(prefix)

list

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_wrapper.py
def list(self) -> AsyncIterator[str]:
    return self._store.list()

list_dir

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_wrapper.py
def list_dir(self, prefix: str) -> AsyncIterator[str]:
    return self._store.list_dir(prefix)

list_prefix

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_wrapper.py
def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    return self._store.list_prefix(prefix)

open async classmethod

open(
    store_cls: type[T_Store], *args: Any, **kwargs: Any
) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/storage/_wrapper.py
@classmethod
async def open(cls: type[Self], store_cls: type[T_Store], *args: Any, **kwargs: Any) -> Self:
    store = store_cls(*args, **kwargs)
    await store._open()
    return cls(store=store)

set async

set(key: str, value: Buffer) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_wrapper.py
async def set(self, key: str, value: Buffer) -> None:
    await self._store.set(key, value)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_wrapper.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    return await self._store.set_if_not_exists(key, value)

with_read_only

with_read_only(read_only: bool = False) -> Store

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/abc/store.py
def with_read_only(self, read_only: bool = False) -> Store:
    """
    Return a new store with a new read_only setting.

    The new store points to the same location with the specified new read_only state.
    The returned Store is not automatically opened, and this store is
    not automatically closed.

    Parameters
    ----------
    read_only
        If True, the store will be created in read-only mode. Defaults to False.

    Returns
    -------
        A new store of the same type with the new read only attribute.
    """
    raise NotImplementedError(
        f"with_read_only is not implemented for the {type(self)} store type."
    )

ZipStore

Bases: Store

Store using a ZIP file.

Parameters:

  • path (str) –

    Location of file.

  • mode (str, default: 'r' ) –

    One of 'r' to read an existing file, 'w' to truncate and write a new file, 'a' to append to an existing file, or 'x' to exclusively create and write a new file.

  • compression (int, default: ZIP_STORED ) –

    Compression method to use when writing to the archive.

  • allowZip64 (bool, default: True ) –

    If True (the default) will create ZIP files that use the ZIP64 extensions when the zipfile is larger than 2 GiB. If False will raise an exception when the ZIP file would require ZIP64 extensions.

Attributes:

Source code in zarr/storage/_zip.py
class ZipStore(Store):
    """
    Store using a ZIP file.

    Parameters
    ----------
    path : str
        Location of file.
    mode : str, optional
        One of 'r' to read an existing file, 'w' to truncate and write a new
        file, 'a' to append to an existing file, or 'x' to exclusively create
        and write a new file.
    compression : int, optional
        Compression method to use when writing to the archive.
    allowZip64 : bool, optional
        If True (the default) will create ZIP files that use the ZIP64
        extensions when the zipfile is larger than 2 GiB. If False
        will raise an exception when the ZIP file would require ZIP64
        extensions.

    Attributes
    ----------
    allowed_exceptions
    supports_writes
    supports_deletes
    supports_listing
    path
    compression
    allowZip64
    """

    supports_writes: bool = True
    supports_deletes: bool = False
    supports_listing: bool = True

    path: Path
    compression: int
    allowZip64: bool

    _zf: zipfile.ZipFile
    _lock: threading.RLock

    def __init__(
        self,
        path: Path | str,
        *,
        mode: ZipStoreAccessModeLiteral = "r",
        read_only: bool | None = None,
        compression: int = zipfile.ZIP_STORED,
        allowZip64: bool = True,
    ) -> None:
        if read_only is None:
            read_only = mode == "r"

        super().__init__(read_only=read_only)

        if isinstance(path, str):
            path = Path(path)
        assert isinstance(path, Path)
        self.path = path  # root?

        self._zmode = mode
        self.compression = compression
        self.allowZip64 = allowZip64

    def _sync_open(self) -> None:
        if self._is_open:
            raise ValueError("store is already open")

        self._lock = threading.RLock()

        self._zf = zipfile.ZipFile(
            self.path,
            mode=self._zmode,
            compression=self.compression,
            allowZip64=self.allowZip64,
        )

        self._is_open = True

    async def _open(self) -> None:
        self._sync_open()

    def __getstate__(self) -> dict[str, Any]:
        # We need a copy to not modify the state of the original store
        state = self.__dict__.copy()
        for attr in ["_zf", "_lock"]:
            state.pop(attr, None)
        return state

    def __setstate__(self, state: dict[str, Any]) -> None:
        self.__dict__ = state
        self._is_open = False
        self._sync_open()

    def close(self) -> None:
        # docstring inherited
        super().close()
        with self._lock:
            self._zf.close()

    async def clear(self) -> None:
        # docstring inherited
        with self._lock:
            self._check_writable()
            self._zf.close()
            os.remove(self.path)
            self._zf = zipfile.ZipFile(
                self.path, mode="w", compression=self.compression, allowZip64=self.allowZip64
            )

    def __str__(self) -> str:
        return f"zip://{self.path}"

    def __repr__(self) -> str:
        return f"ZipStore('{self}')"

    def __eq__(self, other: object) -> bool:
        return isinstance(other, type(self)) and self.path == other.path

    def _get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        if not self._is_open:
            self._sync_open()
        # docstring inherited
        try:
            with self._zf.open(key) as f:  # will raise KeyError
                if byte_range is None:
                    return prototype.buffer.from_bytes(f.read())
                elif isinstance(byte_range, RangeByteRequest):
                    f.seek(byte_range.start)
                    return prototype.buffer.from_bytes(f.read(byte_range.end - f.tell()))
                size = f.seek(0, os.SEEK_END)
                if isinstance(byte_range, OffsetByteRequest):
                    f.seek(byte_range.offset)
                elif isinstance(byte_range, SuffixByteRequest):
                    f.seek(max(0, size - byte_range.suffix))
                else:
                    raise TypeError(f"Unexpected byte_range, got {byte_range}.")
                return prototype.buffer.from_bytes(f.read())
        except KeyError:
            return None

    async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: ByteRequest | None = None,
    ) -> Buffer | None:
        # docstring inherited
        assert isinstance(key, str)

        with self._lock:
            return self._get(key, prototype=prototype, byte_range=byte_range)

    async def get_partial_values(
        self,
        prototype: BufferPrototype,
        key_ranges: Iterable[tuple[str, ByteRequest | None]],
    ) -> list[Buffer | None]:
        # docstring inherited
        out = []
        with self._lock:
            for key, byte_range in key_ranges:
                out.append(self._get(key, prototype=prototype, byte_range=byte_range))
        return out

    def _set(self, key: str, value: Buffer) -> None:
        if not self._is_open:
            self._sync_open()
        # generally, this should be called inside a lock
        keyinfo = zipfile.ZipInfo(filename=key, date_time=time.localtime(time.time())[:6])
        keyinfo.compress_type = self.compression
        if keyinfo.filename[-1] == os.sep:
            keyinfo.external_attr = 0o40775 << 16  # drwxrwxr-x
            keyinfo.external_attr |= 0x10  # MS-DOS directory flag
        else:
            keyinfo.external_attr = 0o644 << 16  # ?rw-r--r--
        self._zf.writestr(keyinfo, value.to_bytes())

    async def set(self, key: str, value: Buffer) -> None:
        # docstring inherited
        self._check_writable()
        if not self._is_open:
            self._sync_open()
        assert isinstance(key, str)
        if not isinstance(value, Buffer):
            raise TypeError(
                f"ZipStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
            )
        with self._lock:
            self._set(key, value)

    async def set_if_not_exists(self, key: str, value: Buffer) -> None:
        self._check_writable()
        with self._lock:
            members = self._zf.namelist()
            if key not in members:
                self._set(key, value)

    async def delete_dir(self, prefix: str) -> None:
        # only raise NotImplementedError if any keys are found
        self._check_writable()
        if prefix != "" and not prefix.endswith("/"):
            prefix += "/"
        async for _ in self.list_prefix(prefix):
            raise NotImplementedError

    async def delete(self, key: str) -> None:
        # docstring inherited
        # we choose to only raise NotImplementedError here if the key exists
        # this allows the array/group APIs to avoid the overhead of existence checks
        self._check_writable()
        if await self.exists(key):
            raise NotImplementedError

    async def exists(self, key: str) -> bool:
        # docstring inherited
        with self._lock:
            try:
                self._zf.getinfo(key)
            except KeyError:
                return False
            else:
                return True

    async def list(self) -> AsyncIterator[str]:
        # docstring inherited
        with self._lock:
            for key in self._zf.namelist():
                yield key

    async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        async for key in self.list():
            if key.startswith(prefix):
                yield key

    async def list_dir(self, prefix: str) -> AsyncIterator[str]:
        # docstring inherited
        prefix = prefix.rstrip("/")

        keys = self._zf.namelist()
        seen = set()
        if prefix == "":
            keys_unique = {k.split("/")[0] for k in keys}
            for key in keys_unique:
                if key not in seen:
                    seen.add(key)
                    yield key
        else:
            for key in keys:
                if key.startswith(prefix + "/") and key.strip("/") != prefix:
                    k = key.removeprefix(prefix + "/").split("/")[0]
                    if k not in seen:
                        seen.add(k)
                        yield k

    async def move(self, path: Path | str) -> None:
        """
        Move the store to another path.
        """
        if isinstance(path, str):
            path = Path(path)
        self.close()
        os.makedirs(path.parent, exist_ok=True)
        shutil.move(self.path, path)
        self.path = path
        await self._open()

read_only property

read_only: bool

Is the store read-only?

supports_consolidated_metadata property

supports_consolidated_metadata: bool

Does the store support consolidated metadata?.

If it doesn't an error will be raised on requests to consolidate the metadata. Returning False can be useful for stores which implement their own consolidation mechanism outside of the zarr-python implementation.

supports_deletes class-attribute instance-attribute

supports_deletes: bool = False

Does the store support deletes?

supports_listing class-attribute instance-attribute

supports_listing: bool = True

Does the store support listing?

supports_partial_writes property

supports_partial_writes: Literal[False]

Does the store support partial writes?

Partial writes are no longer used by Zarr, so this is always false.

supports_writes class-attribute instance-attribute

supports_writes: bool = True

Does the store support writes?

__enter__

__enter__() -> Self

Enter a context manager that will close the store upon exiting.

Source code in zarr/abc/store.py
def __enter__(self) -> Self:
    """Enter a context manager that will close the store upon exiting."""
    return self

__eq__

__eq__(other: object) -> bool

Equality comparison.

Source code in zarr/storage/_zip.py
def __eq__(self, other: object) -> bool:
    return isinstance(other, type(self)) and self.path == other.path

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None

Close the store.

Source code in zarr/abc/store.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    """Close the store."""
    self.close()

clear async

clear() -> None

Clear the store.

Remove all keys and values from the store.

Source code in zarr/storage/_zip.py
async def clear(self) -> None:
    # docstring inherited
    with self._lock:
        self._check_writable()
        self._zf.close()
        os.remove(self.path)
        self._zf = zipfile.ZipFile(
            self.path, mode="w", compression=self.compression, allowZip64=self.allowZip64
        )

close

close() -> None

Close the store.

Source code in zarr/storage/_zip.py
def close(self) -> None:
    # docstring inherited
    super().close()
    with self._lock:
        self._zf.close()

delete async

delete(key: str) -> None

Remove a key from the store

Parameters:

  • key (str) –
Source code in zarr/storage/_zip.py
async def delete(self, key: str) -> None:
    # docstring inherited
    # we choose to only raise NotImplementedError here if the key exists
    # this allows the array/group APIs to avoid the overhead of existence checks
    self._check_writable()
    if await self.exists(key):
        raise NotImplementedError

delete_dir async

delete_dir(prefix: str) -> None

Remove all keys and prefixes in the store that begin with a given prefix.

Source code in zarr/storage/_zip.py
async def delete_dir(self, prefix: str) -> None:
    # only raise NotImplementedError if any keys are found
    self._check_writable()
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        raise NotImplementedError

exists async

exists(key: str) -> bool

Check if a key exists in the store.

Parameters:

  • key (str) –

Returns:

Source code in zarr/storage/_zip.py
async def exists(self, key: str) -> bool:
    # docstring inherited
    with self._lock:
        try:
            self._zf.getinfo(key)
        except KeyError:
            return False
        else:
            return True

get async

get(
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None

Retrieve the value associated with a given key.

Parameters:

  • key (str) –
  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • byte_range (ByteRequest, default: None ) –

    ByteRequest may be one of the following. If not provided, all data associated with the key is retrieved. - RangeByteRequest(int, int): Request a specific range of bytes in the form (start, end). The end is exclusive. If the given range is zero-length or starts after the end of the object, an error will be returned. Additionally, if the range ends after the end of the object, the entire remainder of the object will be returned. Otherwise, the exact requested range will be returned. - OffsetByteRequest(int): Request all bytes starting from a given byte offset. This is equivalent to bytes={int}- as an HTTP header. - SuffixByteRequest(int): Request the last int bytes. Note that here, int is the size of the request, not the byte offset. This is equivalent to bytes=-{int} as an HTTP header.

Returns:

Source code in zarr/storage/_zip.py
async def get(
    self,
    key: str,
    prototype: BufferPrototype,
    byte_range: ByteRequest | None = None,
) -> Buffer | None:
    # docstring inherited
    assert isinstance(key, str)

    with self._lock:
        return self._get(key, prototype=prototype, byte_range=byte_range)

get_partial_values async

get_partial_values(
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]

Retrieve possibly partial values from given key_ranges.

Parameters:

  • prototype (BufferPrototype) –

    The prototype of the output buffer. Stores may support a default buffer prototype.

  • key_ranges (Iterable[tuple[str, tuple[int | None, int | None]]]) –

    Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns:

  • list of values, in the order of the key_ranges, may contain null/none for missing keys
Source code in zarr/storage/_zip.py
async def get_partial_values(
    self,
    prototype: BufferPrototype,
    key_ranges: Iterable[tuple[str, ByteRequest | None]],
) -> list[Buffer | None]:
    # docstring inherited
    out = []
    with self._lock:
        for key, byte_range in key_ranges:
            out.append(self._get(key, prototype=prototype, byte_range=byte_range))
    return out

getsize async

getsize(key: str) -> int

Return the size, in bytes, of a value in a Store.

Parameters:

  • key (str) –

Returns:

  • nbytes ( int ) –

    The size of the value (in bytes).

Raises:

Source code in zarr/abc/store.py
async def getsize(self, key: str) -> int:
    """
    Return the size, in bytes, of a value in a Store.

    Parameters
    ----------
    key : str

    Returns
    -------
    nbytes : int
        The size of the value (in bytes).

    Raises
    ------
    FileNotFoundError
        When the given key does not exist in the store.
    """
    # Note to implementers: this default implementation is very inefficient since
    # it requires reading the entire object. Many systems will have ways to get the
    # size of an object without reading it.
    # avoid circular import
    from zarr.core.buffer.core import default_buffer_prototype

    value = await self.get(key, prototype=default_buffer_prototype())
    if value is None:
        raise FileNotFoundError(key)
    return len(value)

getsize_prefix async

getsize_prefix(prefix: str) -> int

Return the size, in bytes, of all values under a prefix.

Parameters:

  • prefix (str) –

    The prefix of the directory to measure.

Returns:

  • nbytes ( int ) –

    The sum of the sizes of the values in the directory (in bytes).

See Also

zarr.Array.nbytes_stored Store.getsize

Notes

getsize_prefix is just provided as a potentially faster alternative to listing all the keys under a prefix calling Store.getsize on each.

In general, prefix should be the path of an Array or Group in the Store. Implementations may differ on the behavior when some other prefix is provided.

Source code in zarr/abc/store.py
async def getsize_prefix(self, prefix: str) -> int:
    """
    Return the size, in bytes, of all values under a prefix.

    Parameters
    ----------
    prefix : str
        The prefix of the directory to measure.

    Returns
    -------
    nbytes : int
        The sum of the sizes of the values in the directory (in bytes).

    See Also
    --------
    zarr.Array.nbytes_stored
    Store.getsize

    Notes
    -----
    ``getsize_prefix`` is just provided as a potentially faster alternative to
    listing all the keys under a prefix calling [`Store.getsize`][zarr.abc.store.Store.getsize] on each.

    In general, ``prefix`` should be the path of an Array or Group in the Store.
    Implementations may differ on the behavior when some other ``prefix``
    is provided.
    """
    # TODO: Overlap listing keys with getsize calls.
    # Currently, we load the list of keys into memory and only then move
    # on to getting sizes. Ideally we would overlap those two, which should
    # improve tail latency and might reduce memory pressure (since not all keys
    # would be in memory at once).

    # avoid circular import
    from zarr.core.common import concurrent_map
    from zarr.core.config import config

    keys = [(x,) async for x in self.list_prefix(prefix)]
    limit = config.get("async.concurrency")
    sizes = await concurrent_map(keys, self.getsize, limit=limit)
    return sum(sizes)

is_empty async

is_empty(prefix: str) -> bool

Check if the directory is empty.

Parameters:

  • prefix (str) –

    Prefix of keys to check.

Returns:

  • bool

    True if the store is empty, False otherwise.

Source code in zarr/abc/store.py
async def is_empty(self, prefix: str) -> bool:
    """
    Check if the directory is empty.

    Parameters
    ----------
    prefix : str
        Prefix of keys to check.

    Returns
    -------
    bool
        True if the store is empty, False otherwise.
    """
    if not self.supports_listing:
        raise NotImplementedError
    if prefix != "" and not prefix.endswith("/"):
        prefix += "/"
    async for _ in self.list_prefix(prefix):
        return False
    return True

list async

list() -> AsyncIterator[str]

Retrieve all keys in the store.

Returns:

Source code in zarr/storage/_zip.py
async def list(self) -> AsyncIterator[str]:
    # docstring inherited
    with self._lock:
        for key in self._zf.namelist():
            yield key

list_dir async

list_dir(prefix: str) -> AsyncIterator[str]

Retrieve all keys and prefixes with a given prefix and which do not contain the character “/” after the given prefix.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_zip.py
async def list_dir(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    prefix = prefix.rstrip("/")

    keys = self._zf.namelist()
    seen = set()
    if prefix == "":
        keys_unique = {k.split("/")[0] for k in keys}
        for key in keys_unique:
            if key not in seen:
                seen.add(key)
                yield key
    else:
        for key in keys:
            if key.startswith(prefix + "/") and key.strip("/") != prefix:
                k = key.removeprefix(prefix + "/").split("/")[0]
                if k not in seen:
                    seen.add(k)
                    yield k

list_prefix async

list_prefix(prefix: str) -> AsyncIterator[str]

Retrieve all keys in the store that begin with a given prefix. Keys are returned relative to the root of the store.

Parameters:

  • prefix (str) –

Returns:

Source code in zarr/storage/_zip.py
async def list_prefix(self, prefix: str) -> AsyncIterator[str]:
    # docstring inherited
    async for key in self.list():
        if key.startswith(prefix):
            yield key

move async

move(path: Path | str) -> None

Move the store to another path.

Source code in zarr/storage/_zip.py
async def move(self, path: Path | str) -> None:
    """
    Move the store to another path.
    """
    if isinstance(path, str):
        path = Path(path)
    self.close()
    os.makedirs(path.parent, exist_ok=True)
    shutil.move(self.path, path)
    self.path = path
    await self._open()

open async classmethod

open(*args: Any, **kwargs: Any) -> Self

Create and open the store.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments to pass to the store constructor.

  • **kwargs (Any, default: {} ) –

    Keyword arguments to pass to the store constructor.

Returns:

  • Store

    The opened store instance.

Source code in zarr/abc/store.py
@classmethod
async def open(cls, *args: Any, **kwargs: Any) -> Self:
    """
    Create and open the store.

    Parameters
    ----------
    *args : Any
        Positional arguments to pass to the store constructor.
    **kwargs : Any
        Keyword arguments to pass to the store constructor.

    Returns
    -------
    Store
        The opened store instance.
    """
    store = cls(*args, **kwargs)
    await store._open()
    return store

set async

set(key: str, value: Buffer) -> None

Store a (key, value) pair.

Parameters:

Source code in zarr/storage/_zip.py
async def set(self, key: str, value: Buffer) -> None:
    # docstring inherited
    self._check_writable()
    if not self._is_open:
        self._sync_open()
    assert isinstance(key, str)
    if not isinstance(value, Buffer):
        raise TypeError(
            f"ZipStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead."
        )
    with self._lock:
        self._set(key, value)

set_if_not_exists async

set_if_not_exists(key: str, value: Buffer) -> None

Store a key to value if the key is not already present.

Parameters:

Source code in zarr/storage/_zip.py
async def set_if_not_exists(self, key: str, value: Buffer) -> None:
    self._check_writable()
    with self._lock:
        members = self._zf.namelist()
        if key not in members:
            self._set(key, value)

with_read_only

with_read_only(read_only: bool = False) -> Store

Return a new store with a new read_only setting.

The new store points to the same location with the specified new read_only state. The returned Store is not automatically opened, and this store is not automatically closed.

Parameters:

  • read_only (bool, default: False ) –

    If True, the store will be created in read-only mode. Defaults to False.

Returns:

  • A new store of the same type with the new read only attribute.
Source code in zarr/abc/store.py
def with_read_only(self, read_only: bool = False) -> Store:
    """
    Return a new store with a new read_only setting.

    The new store points to the same location with the specified new read_only state.
    The returned Store is not automatically opened, and this store is
    not automatically closed.

    Parameters
    ----------
    read_only
        If True, the store will be created in read-only mode. Defaults to False.

    Returns
    -------
        A new store of the same type with the new read only attribute.
    """
    raise NotImplementedError(
        f"with_read_only is not implemented for the {type(self)} store type."
    )