Skip to content

Memory Provider

Implementation of the in-memory data provider layer.

This implementation supports both the old uncompressed and new compressed (Zstandard) formats.

This implementation stores the data in memory, in a cache of Python dictionaries objects.

Classes:

Name Description
- MemoryDataEntry

Memory data entry implementation.

- MemoryDictEntry

Memory dictionary entry implementation.

- MemoryJobEntry

Memory job entry implementation.

- MemoryDataProvider

Memory data provider implementation.

- MemoryDictProvider

Memory dictionary provider implementation.

- MemoryDataIO

Memory data file-like IO implementation.

MemoryDataEntry

Bases: DataEntry

Memory data entry implementation.

This implementation supports both the old uncompressed and new compressed (Zstandard) formats.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
class MemoryDataEntry(DataEntry):
    """Memory data entry implementation.

    This implementation supports both the old uncompressed and new compressed (Zstandard) formats.
    """

    __slots__ = ("_cache",)

    def __init__(self, name: str, job: int, cache: dict[str, bytearray], *, compressed: bool, readonly: bool) -> None:
        """[Internal] Initialize the data entry.

        Args:
            name: the data name
            job: the job id
            cache: the data cache for the job
            compressed: indicate whether the underlying data is compressed or not (in Zstandard)
            readonly: indicate weather the data is read-only or not
        """
        super().__init__(name, job, compressed=compressed, readonly=readonly)
        self._cache = cache

    def _reader(self) -> BinaryIO:
        return MemoryDataIO(self._name, self._cache[self._name])

    def _writer(self) -> BinaryIO:
        if (d := self._cache.get(self._name, None)) is None:
            self._cache[self._name] = d = bytearray()
        else:
            d.clear()

        return MemoryDataIO(self._name, d, read_mode=False)

    def _size(self) -> int | None:
        if (d := self._cache.get(self._name, None)) is None:
            return None

        return len(d)

    def _delete(self) -> None:
        d = self._cache.pop(self._name, None)
        if d is None:
            raise DataNotExistsError(self._name)

        d.clear()

__init__(name, job, cache, *, compressed, readonly)

[Internal] Initialize the data entry.

Parameters:

Name Type Description Default
name str

the data name

required
job int

the job id

required
cache dict[str, bytearray]

the data cache for the job

required
compressed bool

indicate whether the underlying data is compressed or not (in Zstandard)

required
readonly bool

indicate weather the data is read-only or not

required
Source code in src/lhcbdirac_log/providers/memory/accessors.py
309
310
311
312
313
314
315
316
317
318
319
320
def __init__(self, name: str, job: int, cache: dict[str, bytearray], *, compressed: bool, readonly: bool) -> None:
    """[Internal] Initialize the data entry.

    Args:
        name: the data name
        job: the job id
        cache: the data cache for the job
        compressed: indicate whether the underlying data is compressed or not (in Zstandard)
        readonly: indicate weather the data is read-only or not
    """
    super().__init__(name, job, compressed=compressed, readonly=readonly)
    self._cache = cache

MemoryDataIO

Bases: BufferedIOBase, BinaryIO

Memory data file-like I/O implementation for binary data access.

Supported methods and properties
  • name
  • mode
  • readable
  • writable
  • read
  • write
  • seekable
  • seek
  • tell
  • flush
  • close
  • enter
  • exit
Notes
  • this exists as a workaround BytesIO limitations
  • allow access to the buffer, contrary to BytesIO
  • can be write / read-only, contrary to BytesIO
Source code in src/lhcbdirac_log/providers/memory/accessors.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class MemoryDataIO(BufferedIOBase, BinaryIO):
    """Memory data file-like I/O implementation for binary data access.

    Supported methods and properties:
        - name
        - mode
        - readable
        - writable
        - read
        - write
        - seekable
        - seek
        - tell
        - flush
        - close
        - __enter__
        - __exit__

    Notes:
        - this exists as a workaround BytesIO limitations
        - allow access to the buffer, contrary to BytesIO
        - can be write / read-only, contrary to BytesIO
    """

    __slots__ = (
        "_buffer",
        "_seek",
        "_name",
        "_read_mode",
    )

    def __init__(
        self,
        name: str,
        buffer: bytearray,
        *,
        read_mode: bool = True,
    ) -> None:
        """[Internal] Initialize the Memory Data file-like I/O.

        Args:
            name: the data name
            buffer: the data buffer to read/write from/to
            read_mode: True if the I/O is in read mode, False otherwise
        """
        super().__init__()

        self._name = name
        self._buffer = buffer

        self._read_mode = read_mode

        self._seek = 0

    @override
    def __enter__(self) -> Self:
        return super().__enter__()

    @property
    @override
    def name(self) -> str:
        """Get the file name.

        Returns:
            the file name
        """
        return self._name

    @property
    @override
    def mode(self) -> str:
        """Get the file mode.

        Returns:
            the file mode
        """
        return "rb" if self._read_mode else "wb"

    @override
    def __next__(self) -> bytes:
        raise UnsupportedOperation

    @override
    def readline(self, limit: int | None = -1) -> bytes:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def readable(self) -> bool:
        """Check if the file is readable.

        Returns:
            True if the file is readable, False otherwise

        Raises:
            ValueError: if the I/O is closed and in read mode
        """
        if self.closed and self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._read_mode

    @override
    def writable(self) -> bool:
        """Check if the file is writable.

        Returns:
            True if the file is writable, False otherwise

        Raises:
            ValueError: if the I/O is closed and in write mode
        """
        if self.closed and not self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return not self._read_mode

    @override
    def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
        """Seek to a position.

        Args:
            offset: the offset to seek to
            whence: the reference point for the offset

        Returns:
            the new position

        Raises:
            ValueError: if the I/O is closed or invalid whence
            OSError: if the I/O is in write-only mode or invalid offset
        """
        if self.seekable():
            old = self._seek

            match whence:
                case os.SEEK_SET:
                    self._seek = offset
                case os.SEEK_CUR:
                    self._seek += offset
                case os.SEEK_END:
                    self._seek = len(self._buffer) + offset
                case _:
                    msg = "Invalid whence"
                    raise ValueError(msg)

            if self._seek < 0:
                self._seek = old
                msg = "Invalid offset"
                raise OSError(msg)
        else:
            msg = "Cannot seek using write-only mode"
            raise OSError(msg)

        return self._seek

    @override
    def seekable(self) -> bool:
        """Check if the file is seekable.

        Returns:
            True if the file is seekable, False otherwise

        Raises:
            ValueError: if the I/O is closed
        """
        if self.closed and self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._read_mode

    @override
    def tell(self) -> int:
        """Get the current position.

        Returns:
            the current position
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._seek

    @override
    def readlines(self, hint: int = -1) -> list[bytes]:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def writelines(self, lines: Iterable[Buffer]) -> None:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def read(self, size: int | None = -1) -> bytes:
        """Read data from the buffer.

        Args:
            size: the number of bytes to read (default is -1, read all)

        Returns:
            the read data

        Raises:
            DataNotExistsError: if the data does not exist
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        if not self._read_mode:
            msg = "Cannot read using write-only mode"
            raise UnsupportedOperation(msg)

        if size is None or size < 0:
            size = len(self._buffer)

        s = self._seek
        self._seek = min(len(self._buffer), self._seek + size)

        with memoryview(self._buffer) as m:
            return bytes(m[s : self._seek])

    @override
    def write(self, data: Buffer | bytes | bytearray | memoryview, /) -> int:
        """Write data to the buffer.

        Args:
            data: the data to write

        Returns:
            the number of bytes written

        Raises:
            ValueError: if the I/O is closed
            OSError: if the I/O is in read-only mode
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        if self._read_mode:
            msg = "Cannot write using read-only mode"
            raise UnsupportedOperation(msg)

        if data:
            self._buffer.extend(data)

        self._seek += len(data)
        return len(data)

    def __repr__(self) -> str:
        """Get the string representation of the file-like I/O.

        Returns:
            the string representation
        """
        return f"<{self.__class__.__name__} name={self.name!r} mode={self.mode!r} closed={self.closed}>"

mode: str property

Get the file mode.

Returns:

Type Description
str

the file mode

name: str property

Get the file name.

Returns:

Type Description
str

the file name

__init__(name, buffer, *, read_mode=True)

[Internal] Initialize the Memory Data file-like I/O.

Parameters:

Name Type Description Default
name str

the data name

required
buffer bytearray

the data buffer to read/write from/to

required
read_mode bool

True if the I/O is in read mode, False otherwise

True
Source code in src/lhcbdirac_log/providers/memory/accessors.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    name: str,
    buffer: bytearray,
    *,
    read_mode: bool = True,
) -> None:
    """[Internal] Initialize the Memory Data file-like I/O.

    Args:
        name: the data name
        buffer: the data buffer to read/write from/to
        read_mode: True if the I/O is in read mode, False otherwise
    """
    super().__init__()

    self._name = name
    self._buffer = buffer

    self._read_mode = read_mode

    self._seek = 0

__repr__()

Get the string representation of the file-like I/O.

Returns:

Type Description
str

the string representation

Source code in src/lhcbdirac_log/providers/memory/accessors.py
292
293
294
295
296
297
298
def __repr__(self) -> str:
    """Get the string representation of the file-like I/O.

    Returns:
        the string representation
    """
    return f"<{self.__class__.__name__} name={self.name!r} mode={self.mode!r} closed={self.closed}>"

read(size=-1)

Read data from the buffer.

Parameters:

Name Type Description Default
size int | None

the number of bytes to read (default is -1, read all)

-1

Returns:

Type Description
bytes

the read data

Raises:

Type Description
DataNotExistsError

if the data does not exist

Source code in src/lhcbdirac_log/providers/memory/accessors.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@override
def read(self, size: int | None = -1) -> bytes:
    """Read data from the buffer.

    Args:
        size: the number of bytes to read (default is -1, read all)

    Returns:
        the read data

    Raises:
        DataNotExistsError: if the data does not exist
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    if not self._read_mode:
        msg = "Cannot read using write-only mode"
        raise UnsupportedOperation(msg)

    if size is None or size < 0:
        size = len(self._buffer)

    s = self._seek
    self._seek = min(len(self._buffer), self._seek + size)

    with memoryview(self._buffer) as m:
        return bytes(m[s : self._seek])

readable()

Check if the file is readable.

Returns:

Type Description
bool

True if the file is readable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed and in read mode

Source code in src/lhcbdirac_log/providers/memory/accessors.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@override
def readable(self) -> bool:
    """Check if the file is readable.

    Returns:
        True if the file is readable, False otherwise

    Raises:
        ValueError: if the I/O is closed and in read mode
    """
    if self.closed and self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._read_mode

readline(limit=-1)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
119
120
121
122
@override
def readline(self, limit: int | None = -1) -> bytes:
    """Unsupported operation."""
    raise UnsupportedOperation

readlines(hint=-1)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
224
225
226
227
@override
def readlines(self, hint: int = -1) -> list[bytes]:
    """Unsupported operation."""
    raise UnsupportedOperation

seek(offset, whence=os.SEEK_SET)

Seek to a position.

Parameters:

Name Type Description Default
offset int

the offset to seek to

required
whence int

the reference point for the offset

SEEK_SET

Returns:

Type Description
int

the new position

Raises:

Type Description
ValueError

if the I/O is closed or invalid whence

OSError

if the I/O is in write-only mode or invalid offset

Source code in src/lhcbdirac_log/providers/memory/accessors.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
@override
def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
    """Seek to a position.

    Args:
        offset: the offset to seek to
        whence: the reference point for the offset

    Returns:
        the new position

    Raises:
        ValueError: if the I/O is closed or invalid whence
        OSError: if the I/O is in write-only mode or invalid offset
    """
    if self.seekable():
        old = self._seek

        match whence:
            case os.SEEK_SET:
                self._seek = offset
            case os.SEEK_CUR:
                self._seek += offset
            case os.SEEK_END:
                self._seek = len(self._buffer) + offset
            case _:
                msg = "Invalid whence"
                raise ValueError(msg)

        if self._seek < 0:
            self._seek = old
            msg = "Invalid offset"
            raise OSError(msg)
    else:
        msg = "Cannot seek using write-only mode"
        raise OSError(msg)

    return self._seek

seekable()

Check if the file is seekable.

Returns:

Type Description
bool

True if the file is seekable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed

Source code in src/lhcbdirac_log/providers/memory/accessors.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@override
def seekable(self) -> bool:
    """Check if the file is seekable.

    Returns:
        True if the file is seekable, False otherwise

    Raises:
        ValueError: if the I/O is closed
    """
    if self.closed and self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._read_mode

tell()

Get the current position.

Returns:

Type Description
int

the current position

Source code in src/lhcbdirac_log/providers/memory/accessors.py
211
212
213
214
215
216
217
218
219
220
221
222
@override
def tell(self) -> int:
    """Get the current position.

    Returns:
        the current position
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._seek

writable()

Check if the file is writable.

Returns:

Type Description
bool

True if the file is writable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed and in write mode

Source code in src/lhcbdirac_log/providers/memory/accessors.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@override
def writable(self) -> bool:
    """Check if the file is writable.

    Returns:
        True if the file is writable, False otherwise

    Raises:
        ValueError: if the I/O is closed and in write mode
    """
    if self.closed and not self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return not self._read_mode

write(data)

Write data to the buffer.

Parameters:

Name Type Description Default
data Buffer | bytes | bytearray | memoryview

the data to write

required

Returns:

Type Description
int

the number of bytes written

Raises:

Type Description
ValueError

if the I/O is closed

OSError

if the I/O is in read-only mode

Source code in src/lhcbdirac_log/providers/memory/accessors.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@override
def write(self, data: Buffer | bytes | bytearray | memoryview, /) -> int:
    """Write data to the buffer.

    Args:
        data: the data to write

    Returns:
        the number of bytes written

    Raises:
        ValueError: if the I/O is closed
        OSError: if the I/O is in read-only mode
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    if self._read_mode:
        msg = "Cannot write using read-only mode"
        raise UnsupportedOperation(msg)

    if data:
        self._buffer.extend(data)

    self._seek += len(data)
    return len(data)

writelines(lines)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
229
230
231
232
@override
def writelines(self, lines: Iterable[Buffer]) -> None:
    """Unsupported operation."""
    raise UnsupportedOperation

MemoryDataProvider

Bases: DataProvider[MemoryJobEntry]

Memory data provider implementation.

This implementation supports both the old uncompressed and new compressed (Zstandard) formats.

Source code in src/lhcbdirac_log/providers/memory/providers.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class MemoryDataProvider(DataProvider[MemoryJobEntry]):
    """Memory data provider implementation.

    This implementation supports both the old uncompressed and new compressed (Zstandard) formats.
    """

    CACHE = dict[str, dict[int, dict[str, bytearray]]]()

    __slots__ = ("_cache",)

    @classmethod
    def clear_cache(cls, name: str | None = None) -> None:
        """Clear the cache.

        Args:
            name: the name of the cache entry to clear (default: None, for all)
        """
        if name is not None:
            cls(name).clear(force=True)
            del cls.CACHE[name]
        else:
            for name in cls.CACHE:
                cls(name).clear(force=True)

            cls.CACHE.clear()

    def __init__(self, name: str, dict_provider: DictProvider | None = None, *, readonly: bool = False) -> None:
        """Initialize the data provider.

        Args:
            name: the provider name in the cache entry
            dict_provider: the dict provider associated to the data (default is None), specifying this implies that the provided data are compressed
            readonly: indicate weather the provider is read-only or not (default: False)
        """
        self._cache = self.CACHE.setdefault(name, {})
        super().__init__(dict_provider, readonly=readonly)

    @override
    def _get(self, job: int, *, create: bool = False) -> MemoryJobEntry:
        return MemoryJobEntry(self._cache, job, compressed=self.compressed, readonly=self._readonly, create=create)

    @override
    def _create(self, job: int, *, exists_ok: bool = False) -> MemoryJobEntry:
        return MemoryJobEntry(self._cache, job, compressed=self.compressed, readonly=self._readonly, exists_ok=exists_ok)

    @override
    def jobs(self) -> Generator[int, None, None]:
        yield from list(self._cache)

    @override
    def _delete(self, job: int, *, force: bool = False) -> None:
        j = self.get(job)

        if force:
            j.clear()
        elif any(j.files()):
            raise DataExistsError(job)

        self._cache.pop(job)

__init__(name, dict_provider=None, *, readonly=False)

Initialize the data provider.

Parameters:

Name Type Description Default
name str

the provider name in the cache entry

required
dict_provider DictProvider | None

the dict provider associated to the data (default is None), specifying this implies that the provided data are compressed

None
readonly bool

indicate weather the provider is read-only or not (default: False)

False
Source code in src/lhcbdirac_log/providers/memory/providers.py
115
116
117
118
119
120
121
122
123
124
def __init__(self, name: str, dict_provider: DictProvider | None = None, *, readonly: bool = False) -> None:
    """Initialize the data provider.

    Args:
        name: the provider name in the cache entry
        dict_provider: the dict provider associated to the data (default is None), specifying this implies that the provided data are compressed
        readonly: indicate weather the provider is read-only or not (default: False)
    """
    self._cache = self.CACHE.setdefault(name, {})
    super().__init__(dict_provider, readonly=readonly)

clear_cache(name=None) classmethod

Clear the cache.

Parameters:

Name Type Description Default
name str | None

the name of the cache entry to clear (default: None, for all)

None
Source code in src/lhcbdirac_log/providers/memory/providers.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@classmethod
def clear_cache(cls, name: str | None = None) -> None:
    """Clear the cache.

    Args:
        name: the name of the cache entry to clear (default: None, for all)
    """
    if name is not None:
        cls(name).clear(force=True)
        del cls.CACHE[name]
    else:
        for name in cls.CACHE:
            cls(name).clear(force=True)

        cls.CACHE.clear()

MemoryDictEntry

Bases: DictEntry

Memory dictionary entry implementation.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
class MemoryDictEntry(DictEntry):
    """Memory dictionary entry implementation."""

    __slots__ = ("_cache",)

    def __init__(self, name: str, cache: dict[str, bytes], config: Config, data: bytes | None = None, zstd_id: int | None = None) -> None:
        """[Internal] Initialize the dictionary entry.

        Args:
            name: the dictionary name
            cache: the dict cache
            config: the configuration to use for precomputing the dictionary
            data: the dictionary data (create a new dict if not None)
            zstd_id: the zstd dictionary id (None for unknown)
        """
        self._cache = cache
        super().__init__(name, config, data, zstd_id)

    @property
    @override
    def exists(self) -> bool:
        return self._name in self._cache

    @property
    @override
    def size(self) -> int:
        if (d := self._cache.get(self._name, None)) is None:
            return 0

        return len(d)

    @override
    def _load_data(self) -> bytes:
        if (d := self._cache.get(self._name, None)) is None:
            raise DictNotExistsError(self._name)

        return d

    @override
    def _save(self) -> None:
        self._cache[self._name] = self._data

__init__(name, cache, config, data=None, zstd_id=None)

[Internal] Initialize the dictionary entry.

Parameters:

Name Type Description Default
name str

the dictionary name

required
cache dict[str, bytes]

the dict cache

required
config Config

the configuration to use for precomputing the dictionary

required
data bytes | None

the dictionary data (create a new dict if not None)

None
zstd_id int | None

the zstd dictionary id (None for unknown)

None
Source code in src/lhcbdirac_log/providers/memory/accessors.py
352
353
354
355
356
357
358
359
360
361
362
363
def __init__(self, name: str, cache: dict[str, bytes], config: Config, data: bytes | None = None, zstd_id: int | None = None) -> None:
    """[Internal] Initialize the dictionary entry.

    Args:
        name: the dictionary name
        cache: the dict cache
        config: the configuration to use for precomputing the dictionary
        data: the dictionary data (create a new dict if not None)
        zstd_id: the zstd dictionary id (None for unknown)
    """
    self._cache = cache
    super().__init__(name, config, data, zstd_id)

MemoryDictProvider

Bases: DictProvider[MemoryDictEntry]

Memory dictionary provider implementation.

Source code in src/lhcbdirac_log/providers/memory/providers.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class MemoryDictProvider(DictProvider[MemoryDictEntry]):
    """Memory dictionary provider implementation."""

    CACHE = dict[str, dict[str, bytes]]()

    __slots__ = ("_cache",)

    @classmethod
    def clear_cache(cls, name: str | None = None) -> None:
        """Clear the cache.

        Args:
            name: the name of the cache entry to clear (default: None, for all)
        """
        if name is not None:
            cls(name).clear()
            del cls.CACHE[name]
        else:
            for name in cls.CACHE:
                cls(name).clear()

            cls.CACHE.clear()

    def __init__(self, name: str, config: Config = DEFAULT_CONFIG, *, readonly: bool = False) -> None:
        """Initialize the dictionary provider.

        Args:
            name: the provider name in the cache entry
            config: the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)
            readonly: indicate weather the provider is read-only or not (default: False)

        Notes:
            - The cache must be manually cleared using the `clear_cache` class method.
        """
        self._cache = self.CACHE.setdefault(name, {})
        super().__init__(config, readonly=readonly)

    @override
    def _load(self, name: str) -> MemoryDictEntry:
        if name not in self._cache:
            raise DictNotExistsError(name)

        return MemoryDictEntry(name, self._cache, self._config)

    @override
    def _add(self, name: str, data: bytes, zstd_id: int) -> MemoryDictEntry:
        if name in self._cache:
            raise DictExistsError(name)

        return MemoryDictEntry(name, self._cache, self._config, data, zstd_id)

    @override
    def _iter_all(self) -> Generator[str, None, None]:
        yield from list(self._cache)

    @override
    def _delete(self, name: str) -> None:
        if self._cache.pop(name, None) is None:
            raise DictNotExistsError(name)

    @override
    @property
    def size(self) -> int:
        return sum(map(len, self._cache.values()))

__init__(name, config=DEFAULT_CONFIG, *, readonly=False)

Initialize the dictionary provider.

Parameters:

Name Type Description Default
name str

the provider name in the cache entry

required
config Config

the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)

DEFAULT_CONFIG
readonly bool

indicate weather the provider is read-only or not (default: False)

False
Notes
  • The cache must be manually cleared using the clear_cache class method.
Source code in src/lhcbdirac_log/providers/memory/providers.py
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, name: str, config: Config = DEFAULT_CONFIG, *, readonly: bool = False) -> None:
    """Initialize the dictionary provider.

    Args:
        name: the provider name in the cache entry
        config: the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)
        readonly: indicate weather the provider is read-only or not (default: False)

    Notes:
        - The cache must be manually cleared using the `clear_cache` class method.
    """
    self._cache = self.CACHE.setdefault(name, {})
    super().__init__(config, readonly=readonly)

clear_cache(name=None) classmethod

Clear the cache.

Parameters:

Name Type Description Default
name str | None

the name of the cache entry to clear (default: None, for all)

None
Source code in src/lhcbdirac_log/providers/memory/providers.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@classmethod
def clear_cache(cls, name: str | None = None) -> None:
    """Clear the cache.

    Args:
        name: the name of the cache entry to clear (default: None, for all)
    """
    if name is not None:
        cls(name).clear()
        del cls.CACHE[name]
    else:
        for name in cls.CACHE:
            cls(name).clear()

        cls.CACHE.clear()

MemoryJobEntry

Bases: JobEntry[MemoryDataEntry]

Memory job entry implementation.

This implementation supports both the old uncompressed and new compressed (Zstandard) formats.

Source code in src/lhcbdirac_log/providers/memory/accessors.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
class MemoryJobEntry(JobEntry[MemoryDataEntry]):
    """Memory job entry implementation.

    This implementation supports both the old uncompressed and new compressed (Zstandard) formats.
    """

    __slots__ = ("_cache",)

    def __init__(
        self, cache: dict[int, dict[str, bytearray]], job: int, *, compressed: bool, readonly: bool, create: bool = True, exists_ok: bool = True
    ) -> None:
        """[Internal] Initialize the job entry.

        Args:
            cache: the data provider cache (production cache)
            job: the job id
            compressed: indicate whether the underlying data is compressed or not (in Zstandard)
            readonly: indicate weather the job is read-only or not
            create: create the job cache if it doesn't exist, otherwise raise an error
            exists_ok: ignore the error if the job cache already exists

        """
        if job in cache:
            if not exists_ok:
                raise JobExistsError(job)
        elif create:
            cache[job] = {}
        else:
            raise JobNotExistsError(job)

        self._cache = cache[job]

        super().__init__(job, compressed=compressed, readonly=readonly)

    @override
    def _get(self, name: str, *, create: bool = False) -> MemoryDataEntry:
        if name not in self._cache and not create:
            raise DataNotExistsError(name)

        return MemoryDataEntry(name, self._job, self._cache, compressed=self.compressed, readonly=self._readonly)

    @override
    def _create(self, name: str, *, exists_ok: bool = False) -> MemoryDataEntry:
        if name in self._cache and not exists_ok:
            raise DataExistsError(name)

        return MemoryDataEntry(name, self._job, self._cache, compressed=self.compressed, readonly=self._readonly)

    @override
    def files(self) -> Generator[str, None, None]:
        yield from list(self._cache)

    @property
    @override
    def data_size(self) -> int:  # optimizes the default implementation
        return sum(len(d) for d in self._cache.values())

    @override
    def delete(self, name: str) -> None:
        if self._readonly:
            msg = f"Job '{self._job}' is read-only"
            raise ReadOnlyError(msg)

        if (d := self._cache.pop(name, None)) is None:
            raise DataNotExistsError(name)

        d.clear()

    @override
    def _update_info(self) -> None:  # pragma: no cover
        pass

__init__(cache, job, *, compressed, readonly, create=True, exists_ok=True)

[Internal] Initialize the job entry.

Parameters:

Name Type Description Default
cache dict[int, dict[str, bytearray]]

the data provider cache (production cache)

required
job int

the job id

required
compressed bool

indicate whether the underlying data is compressed or not (in Zstandard)

required
readonly bool

indicate weather the job is read-only or not

required
create bool

create the job cache if it doesn't exist, otherwise raise an error

True
exists_ok bool

ignore the error if the job cache already exists

True
Source code in src/lhcbdirac_log/providers/memory/accessors.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def __init__(
    self, cache: dict[int, dict[str, bytearray]], job: int, *, compressed: bool, readonly: bool, create: bool = True, exists_ok: bool = True
) -> None:
    """[Internal] Initialize the job entry.

    Args:
        cache: the data provider cache (production cache)
        job: the job id
        compressed: indicate whether the underlying data is compressed or not (in Zstandard)
        readonly: indicate weather the job is read-only or not
        create: create the job cache if it doesn't exist, otherwise raise an error
        exists_ok: ignore the error if the job cache already exists

    """
    if job in cache:
        if not exists_ok:
            raise JobExistsError(job)
    elif create:
        cache[job] = {}
    else:
        raise JobNotExistsError(job)

    self._cache = cache[job]

    super().__init__(job, compressed=compressed, readonly=readonly)