Skip to content

Database Provider

Implementation of the data provider layer for SQL databases.

This implementation only support the new compressed (Zstandard) format. Thus, a dictionary provider always exists and is used to store the Zstandard dictionaries.

Thanks to the database layout (using SQLite with SQLAlchemy), dict and data entries are linked, making dict impossible to be deleted if data entries are still using it (safety).

Schema
  • data: String name [PK], Integer job [PK, FK:job.id], Integer dict [FK:dict.id], BLOB data.
  • dict: Integer id [PK], String name [UNIQUE], Integer zstd_id [NULLABLE], BLOB data [NULLABLE].
  • job: Integer id [PK], Integer dirac_id [NULLABLE], Boolean success [NULLABLE].

Classes:

Name Description
- SQLDataEntry

SQL data entry implementation.

- SQLDictEntry

SQL dictionary entry implementation.

- SQLJobEntry

SQL job entry implementation.

- SQLDataProvider

SQL data provider implementation.

- SQLDictProvider

SQL dictionary provider implementation.

- SQLDriver

the SQLite driver used by the other classes.

- SQLDataIO

SQL data file-like IO implementation.

SQLDataEntry

Bases: DataEntry

SQL Database data entry implementation.

Only supports the new compressed (Zstandard) format.

Source code in src/lhcbdirac_log/providers/database/accessors.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
class SQLDataEntry(DataEntry):
    """SQL Database data entry implementation.

    Only supports the new compressed (Zstandard) format.
    """

    __slots__ = ("_driver",)

    def __init__(self, driver: SQLDriver, name: str, job: int, *, readonly: bool) -> None:
        """[Internal] Initialize the data entry.

        Args:
            driver: the SQL driver
            name: the data name
            job: the job id
            readonly: indicate weather the data is read-only or not

        Notes:
            - compressed is always True
        """
        self._driver = driver
        super().__init__(name, job, compressed=True, readonly=readonly)

    @override
    def _reader(self) -> BinaryIO:
        return SQLDataIO(self._driver, self)

    @override
    def _writer(self) -> BinaryIO:
        return SQLDataIO(self._driver, self, read_mode=False)

    @override
    def _size(self) -> int | None:
        return self._driver.get_data_size(self.job, self.name)

    @override
    def _delete(self) -> None:
        self._driver.delete_data(self.job, self.name)

__init__(driver, name, job, *, readonly)

[Internal] Initialize the data entry.

Parameters:

Name Type Description Default
driver SQLDriver

the SQL driver

required
name str

the data name

required
job int

the job id

required
readonly bool

indicate weather the data is read-only or not

required
Notes
  • compressed is always True
Source code in src/lhcbdirac_log/providers/database/accessors.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def __init__(self, driver: SQLDriver, name: str, job: int, *, readonly: bool) -> None:
    """[Internal] Initialize the data entry.

    Args:
        driver: the SQL driver
        name: the data name
        job: the job id
        readonly: indicate weather the data is read-only or not

    Notes:
        - compressed is always True
    """
    self._driver = driver
    super().__init__(name, job, compressed=True, readonly=readonly)

SQLDataIO

Bases: BufferedIOBase, BinaryIO

SQL Data file-like I/O implementation for binary data access.

Supported methods and properties
  • name
  • mode
  • readable
  • writable
  • read
  • write
  • seekable
  • seek
  • tell
  • flush
  • close
  • enter
  • exit
Source code in src/lhcbdirac_log/providers/database/accessors.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class SQLDataIO(BufferedIOBase, BinaryIO):
    """SQL Data file-like I/O implementation for binary data access.

    Supported methods and properties:
        - name
        - mode
        - readable
        - writable
        - read
        - write
        - seekable
        - seek
        - tell
        - flush
        - close
        - __enter__
        - __exit__
    """

    __slots__ = (
        "_driver",
        "_entry",
        "_need_flush",
        "_read_buffer",
        "_read_mode",
        "_read_view",
        "_seek",
        "_write_buffer",
        "_write_buffer_size",
    )

    def __init__(
        self,
        driver: SQLDriver,
        entry: SQLDataEntry,
        write_buffer: int = 0,
        *,
        read_mode: bool = True,
    ) -> None:
        """[Internal] Initialize the SQL Data file-like I/O.

        Args:
            driver: the SQL driver
            entry: the data entry to work with
            write_buffer: the size of the write buffer (default is driver.BLOB_LIMIT)
            read_mode: True if the I/O is in read mode, False otherwise
        """
        super().__init__()

        self._entry = entry
        self._driver = driver

        self._read_mode = read_mode

        self._read_view: memoryview | None = None
        self._read_buffer: bytes | None = None
        self._seek = 0

        self._need_flush = True
        self._write_buffer: bytearray = bytearray()
        self._write_buffer_size = min(write_buffer, driver.blob_limit) or driver.blob_limit

    @override
    def __enter__(self) -> Self:
        return super().__enter__()

    @property
    @override
    def name(self) -> str:
        """Get the file name.

        Returns:
            the file name
        """
        return self._entry.name

    @property
    @override
    def mode(self) -> str:
        """Get the file mode.

        Returns:
            the file mode
        """
        return "rb" if self._read_mode else "wb"

    @override
    def __next__(self) -> bytes:
        raise UnsupportedOperation

    @override
    def readline(self, limit: int | None = -1) -> bytes:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def readable(self) -> bool:
        """Check if the file is readable.

        Returns:
            True if the file is readable, False otherwise

        Raises:
            ValueError: if the I/O is closed and in read mode
        """
        if self.closed and self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._read_mode

    @override
    def writable(self) -> bool:
        """Check if the file is writable.

        Returns:
            True if the file is writable, False otherwise

        Raises:
            ValueError: if the I/O is closed and in write mode
        """
        if self.closed and not self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return not self._read_mode

    @override
    def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
        """Seek to a position.

        Args:
            offset: the offset to seek to
            whence: the reference point for the offset

        Returns:
            the new position

        Raises:
            ValueError: if the I/O is closed or invalid whence
            OSError: if the I/O is in write-only mode or invalid offset
        """
        if self.seekable():
            old = self._seek

            match whence:
                case os.SEEK_SET:
                    self._seek = offset
                case os.SEEK_CUR:
                    self._seek += offset
                case os.SEEK_END:
                    if self._read_buffer is None:
                        self.read(0)
                    self._seek = len(self._read_buffer) + offset
                case _:
                    msg = "Invalid whence"
                    raise ValueError(msg)

            if self._seek < 0:
                self._seek = old
                msg = "Invalid offset"
                raise OSError(msg)
        else:
            msg = "Cannot seek using write-only mode"
            raise OSError(msg)

        return self._seek

    @override
    def seekable(self) -> bool:
        """Check if the file is seekable.

        Returns:
            True if the file is seekable, False otherwise

        Raises:
            ValueError: if the I/O is closed
        """
        if self.closed and self._read_mode:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._read_mode

    @override
    def tell(self) -> int:
        """Get the current position.

        Returns:
            the current position
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        return self._seek

    @override
    def readlines(self, hint: int = -1) -> list[bytes]:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def writelines(self, lines: Iterable[Buffer]) -> None:
        """Unsupported operation."""
        raise UnsupportedOperation

    @override
    def read(self, size: int | None = -1) -> bytes:
        """Read data from the database.

        Args:
            size: the number of bytes to read (default is -1, read all)

        Returns:
            the read data

        Raises:
            DataNotExistsError: if the data does not exist
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        if not self._read_mode:
            msg = "Cannot read using write-only mode"
            raise UnsupportedOperation(msg)

        if self._read_buffer is None or self._read_view is None:
            self._read_buffer = self._driver.load_data(self._entry.job, self._entry.name)
            self._read_view = memoryview(self._read_buffer)
            self._seek = 0

        if size is None or size < 0:
            size = len(self._read_view)

        s = self._seek
        self._seek = min(len(self._read_buffer), self._seek + size)

        return bytes(self._read_view[s : self._seek])

    def _flush(self) -> None:
        """[Internal] Flush the write buffer into the database.

        Raises:
            OSError: if the I/O is in read-only mode

        Notes:
            - clear the internal buffer after flush (avoid sqlalchemy data duplication and memory leak)
            - reflushing will override existing data so may not be called between write operations
        """
        if self._read_mode:
            return

        if self._need_flush:
            self._need_flush = False

            self._driver.save_data(
                self._entry.job,
                self._entry.name,
                self._write_buffer,
            )

            self._write_buffer = bytearray()  # "clear" the buffer (sqlalchemy doesn't release its memoryview, locking the buffer)

    @override
    def write(self, data: Buffer | bytes | bytearray | memoryview, /) -> int:
        """Write data to the database.

        Args:
            data: the data to write

        Returns:
            the number of bytes written

        Raises:
            ValueError: if the I/O is closed
            OSError: if the I/O is in read-only mode
            BufferError: if the write buffer limit will be reached
        """
        if self.closed:
            msg = "I/O operation on closed file"
            raise ValueError(msg)

        if self._read_mode:
            msg = "Cannot write using read-only mode"
            raise UnsupportedOperation(msg)

        if len(data) + len(self._write_buffer) >= self._write_buffer_size:
            msg = f"Buffer limit reached: {len(self._write_buffer)} / {self._write_buffer_size} bytes"
            raise BufferError(msg)

        if data:
            self._need_flush = True
            self._write_buffer.extend(data)

        self._seek += len(data)
        return len(data)

    @override
    def close(self) -> None:
        """Close the file-like I/O."""
        if not self.closed:
            self._flush()
            super().close()
            if self._read_view is not None:
                self._read_view.release()
                self._read_view = None

    def __repr__(self) -> str:
        """Get the string representation of the file-like I/O.

        Returns:
            the string representation
        """
        return f"<{self.__class__.__name__} name={self.name!r} mode={self.mode!r} closed={self.closed}>"

mode: str property

Get the file mode.

Returns:

Type Description
str

the file mode

name: str property

Get the file name.

Returns:

Type Description
str

the file name

__init__(driver, entry, write_buffer=0, *, read_mode=True)

[Internal] Initialize the SQL Data file-like I/O.

Parameters:

Name Type Description Default
driver SQLDriver

the SQL driver

required
entry SQLDataEntry

the data entry to work with

required
write_buffer int

the size of the write buffer (default is driver.BLOB_LIMIT)

0
read_mode bool

True if the I/O is in read mode, False otherwise

True
Source code in src/lhcbdirac_log/providers/database/accessors.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    driver: SQLDriver,
    entry: SQLDataEntry,
    write_buffer: int = 0,
    *,
    read_mode: bool = True,
) -> None:
    """[Internal] Initialize the SQL Data file-like I/O.

    Args:
        driver: the SQL driver
        entry: the data entry to work with
        write_buffer: the size of the write buffer (default is driver.BLOB_LIMIT)
        read_mode: True if the I/O is in read mode, False otherwise
    """
    super().__init__()

    self._entry = entry
    self._driver = driver

    self._read_mode = read_mode

    self._read_view: memoryview | None = None
    self._read_buffer: bytes | None = None
    self._seek = 0

    self._need_flush = True
    self._write_buffer: bytearray = bytearray()
    self._write_buffer_size = min(write_buffer, driver.blob_limit) or driver.blob_limit

__repr__()

Get the string representation of the file-like I/O.

Returns:

Type Description
str

the string representation

Source code in src/lhcbdirac_log/providers/database/accessors.py
340
341
342
343
344
345
346
def __repr__(self) -> str:
    """Get the string representation of the file-like I/O.

    Returns:
        the string representation
    """
    return f"<{self.__class__.__name__} name={self.name!r} mode={self.mode!r} closed={self.closed}>"

close()

Close the file-like I/O.

Source code in src/lhcbdirac_log/providers/database/accessors.py
330
331
332
333
334
335
336
337
338
@override
def close(self) -> None:
    """Close the file-like I/O."""
    if not self.closed:
        self._flush()
        super().close()
        if self._read_view is not None:
            self._read_view.release()
            self._read_view = None

read(size=-1)

Read data from the database.

Parameters:

Name Type Description Default
size int | None

the number of bytes to read (default is -1, read all)

-1

Returns:

Type Description
bytes

the read data

Raises:

Type Description
DataNotExistsError

if the data does not exist

Source code in src/lhcbdirac_log/providers/database/accessors.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
@override
def read(self, size: int | None = -1) -> bytes:
    """Read data from the database.

    Args:
        size: the number of bytes to read (default is -1, read all)

    Returns:
        the read data

    Raises:
        DataNotExistsError: if the data does not exist
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    if not self._read_mode:
        msg = "Cannot read using write-only mode"
        raise UnsupportedOperation(msg)

    if self._read_buffer is None or self._read_view is None:
        self._read_buffer = self._driver.load_data(self._entry.job, self._entry.name)
        self._read_view = memoryview(self._read_buffer)
        self._seek = 0

    if size is None or size < 0:
        size = len(self._read_view)

    s = self._seek
    self._seek = min(len(self._read_buffer), self._seek + size)

    return bytes(self._read_view[s : self._seek])

readable()

Check if the file is readable.

Returns:

Type Description
bool

True if the file is readable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed and in read mode

Source code in src/lhcbdirac_log/providers/database/accessors.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@override
def readable(self) -> bool:
    """Check if the file is readable.

    Returns:
        True if the file is readable, False otherwise

    Raises:
        ValueError: if the I/O is closed and in read mode
    """
    if self.closed and self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._read_mode

readline(limit=-1)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/database/accessors.py
121
122
123
124
@override
def readline(self, limit: int | None = -1) -> bytes:
    """Unsupported operation."""
    raise UnsupportedOperation

readlines(hint=-1)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/database/accessors.py
228
229
230
231
@override
def readlines(self, hint: int = -1) -> list[bytes]:
    """Unsupported operation."""
    raise UnsupportedOperation

seek(offset, whence=os.SEEK_SET)

Seek to a position.

Parameters:

Name Type Description Default
offset int

the offset to seek to

required
whence int

the reference point for the offset

SEEK_SET

Returns:

Type Description
int

the new position

Raises:

Type Description
ValueError

if the I/O is closed or invalid whence

OSError

if the I/O is in write-only mode or invalid offset

Source code in src/lhcbdirac_log/providers/database/accessors.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@override
def seek(self, offset: int, whence: int = os.SEEK_SET) -> int:
    """Seek to a position.

    Args:
        offset: the offset to seek to
        whence: the reference point for the offset

    Returns:
        the new position

    Raises:
        ValueError: if the I/O is closed or invalid whence
        OSError: if the I/O is in write-only mode or invalid offset
    """
    if self.seekable():
        old = self._seek

        match whence:
            case os.SEEK_SET:
                self._seek = offset
            case os.SEEK_CUR:
                self._seek += offset
            case os.SEEK_END:
                if self._read_buffer is None:
                    self.read(0)
                self._seek = len(self._read_buffer) + offset
            case _:
                msg = "Invalid whence"
                raise ValueError(msg)

        if self._seek < 0:
            self._seek = old
            msg = "Invalid offset"
            raise OSError(msg)
    else:
        msg = "Cannot seek using write-only mode"
        raise OSError(msg)

    return self._seek

seekable()

Check if the file is seekable.

Returns:

Type Description
bool

True if the file is seekable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed

Source code in src/lhcbdirac_log/providers/database/accessors.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@override
def seekable(self) -> bool:
    """Check if the file is seekable.

    Returns:
        True if the file is seekable, False otherwise

    Raises:
        ValueError: if the I/O is closed
    """
    if self.closed and self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._read_mode

tell()

Get the current position.

Returns:

Type Description
int

the current position

Source code in src/lhcbdirac_log/providers/database/accessors.py
215
216
217
218
219
220
221
222
223
224
225
226
@override
def tell(self) -> int:
    """Get the current position.

    Returns:
        the current position
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return self._seek

writable()

Check if the file is writable.

Returns:

Type Description
bool

True if the file is writable, False otherwise

Raises:

Type Description
ValueError

if the I/O is closed and in write mode

Source code in src/lhcbdirac_log/providers/database/accessors.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@override
def writable(self) -> bool:
    """Check if the file is writable.

    Returns:
        True if the file is writable, False otherwise

    Raises:
        ValueError: if the I/O is closed and in write mode
    """
    if self.closed and not self._read_mode:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    return not self._read_mode

write(data)

Write data to the database.

Parameters:

Name Type Description Default
data Buffer | bytes | bytearray | memoryview

the data to write

required

Returns:

Type Description
int

the number of bytes written

Raises:

Type Description
ValueError

if the I/O is closed

OSError

if the I/O is in read-only mode

BufferError

if the write buffer limit will be reached

Source code in src/lhcbdirac_log/providers/database/accessors.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@override
def write(self, data: Buffer | bytes | bytearray | memoryview, /) -> int:
    """Write data to the database.

    Args:
        data: the data to write

    Returns:
        the number of bytes written

    Raises:
        ValueError: if the I/O is closed
        OSError: if the I/O is in read-only mode
        BufferError: if the write buffer limit will be reached
    """
    if self.closed:
        msg = "I/O operation on closed file"
        raise ValueError(msg)

    if self._read_mode:
        msg = "Cannot write using read-only mode"
        raise UnsupportedOperation(msg)

    if len(data) + len(self._write_buffer) >= self._write_buffer_size:
        msg = f"Buffer limit reached: {len(self._write_buffer)} / {self._write_buffer_size} bytes"
        raise BufferError(msg)

    if data:
        self._need_flush = True
        self._write_buffer.extend(data)

    self._seek += len(data)
    return len(data)

writelines(lines)

Unsupported operation.

Source code in src/lhcbdirac_log/providers/database/accessors.py
233
234
235
236
@override
def writelines(self, lines: Iterable[Buffer]) -> None:
    """Unsupported operation."""
    raise UnsupportedOperation

SQLDataProvider

Bases: DataProvider[SQLJobEntry]

SQL data provider implementation.

Only supports the new compressed (Zstandard) format.

Source code in src/lhcbdirac_log/providers/database/providers.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class SQLDataProvider(DataProvider[SQLJobEntry]):
    """SQL data provider implementation.

    Only supports the new compressed (Zstandard) format.
    """

    __slots__ = ("_driver",)

    @override
    def open(self) -> None:
        self._dict_provider.open()

    @override
    def close(self) -> None:
        self._dict_provider.close()

    @property
    def driver(self) -> SQLDriver:
        """Get the SQL driver.

        Returns:
            the SQL driver
        """
        return self._driver

    def __init__(self, driver: SQLDriver | None = None, dict_provider: SQLDictProvider | None = None, *, readonly: bool = False) -> None:
        """Initialize the data provider.

        Args:
            driver: the SQL driver, use the `dict_provider`'s driver if None, or an in-memory SQLite database if `dict_provider` is None too
            dict_provider: the dictionary provider, if None, a new one will be created
            readonly: indicate weather the provider is read-only or not (default: False)

        Raises:
            ValueError: if the specified dictionary provider uses a different SQL driver
        """
        if dict_provider is None:
            if driver is None:
                driver = SQLDriver.create()

            dict_provider = SQLDictProvider(driver, readonly=readonly)

        elif driver is None:
            driver = dict_provider.driver

        elif dict_provider.driver is not driver:
            msg = "The dictionary provider must use the same SQL driver"
            raise ValueError(msg)

        self._driver = driver
        super().__init__(dict_provider, readonly=readonly)

    @override
    def _get(self, job: int, *, create: bool = False) -> SQLJobEntry:
        return SQLJobEntry(self._driver, job, readonly=self._readonly, create=create)

    @override
    def _create(self, job: int, *, exists_ok: bool = False) -> SQLJobEntry:
        return SQLJobEntry(self._driver, job, readonly=self._readonly, exists_ok=exists_ok)

    @override
    def jobs(self) -> Generator[int, None, None]:
        yield from self._driver.get_jobs()

    @override
    def _delete(self, job: int, *, force: bool = False) -> None:
        if not self._driver.check_job(job):
            raise JobNotExistsError(job)

        self._driver.delete_job(job, force=force)

    @override
    @property
    def data_size(self) -> int:  # optimizes the default implementation
        return self._driver.get_all_data_size()

    @override
    @property
    def size(self) -> int:  # optimizes the default implementation
        return self._driver.get_db_size()

driver: SQLDriver property

Get the SQL driver.

Returns:

Type Description
SQLDriver

the SQL driver

__init__(driver=None, dict_provider=None, *, readonly=False)

Initialize the data provider.

Parameters:

Name Type Description Default
driver SQLDriver | None

the SQL driver, use the dict_provider's driver if None, or an in-memory SQLite database if dict_provider is None too

None
dict_provider SQLDictProvider | None

the dictionary provider, if None, a new one will be created

None
readonly bool

indicate weather the provider is read-only or not (default: False)

False

Raises:

Type Description
ValueError

if the specified dictionary provider uses a different SQL driver

Source code in src/lhcbdirac_log/providers/database/providers.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(self, driver: SQLDriver | None = None, dict_provider: SQLDictProvider | None = None, *, readonly: bool = False) -> None:
    """Initialize the data provider.

    Args:
        driver: the SQL driver, use the `dict_provider`'s driver if None, or an in-memory SQLite database if `dict_provider` is None too
        dict_provider: the dictionary provider, if None, a new one will be created
        readonly: indicate weather the provider is read-only or not (default: False)

    Raises:
        ValueError: if the specified dictionary provider uses a different SQL driver
    """
    if dict_provider is None:
        if driver is None:
            driver = SQLDriver.create()

        dict_provider = SQLDictProvider(driver, readonly=readonly)

    elif driver is None:
        driver = dict_provider.driver

    elif dict_provider.driver is not driver:
        msg = "The dictionary provider must use the same SQL driver"
        raise ValueError(msg)

    self._driver = driver
    super().__init__(dict_provider, readonly=readonly)

SQLDictEntry

Bases: DictEntry

SQL Database dictionary entry implementation.

Source code in src/lhcbdirac_log/providers/database/accessors.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
class SQLDictEntry(DictEntry):
    """SQL Database dictionary entry implementation."""

    __slots__ = ("_driver",)

    def __init__(
        self,
        driver: SQLDriver,
        name: str,
        config: Config,
        data: bytes | None = None,
        zstd_id: int | None = None,
    ) -> None:
        """[Internal] Initialize the dictionary entry.

        Args:
            driver: the SQL driver
            name: the dictionary name
            config: the configuration to use for precomputing the dictionary
            data: the dictionary data (create a new dict if not None)
            zstd_id: the zstd dictionary id (None for unknown)
        """
        self._driver = driver
        super().__init__(name, config, data, zstd_id)

    @override
    @property
    def exists(self) -> bool:
        return self._driver.check_dict(self.dict_name) is not None

    @override
    @property
    def size(self) -> int:
        return self._driver.get_dict_size(self.dict_name)

    @override
    def _load_data(self) -> bytes:
        d, z = self._driver.load_dict(self.dict_name)
        self._zstd_id = z
        return d

    @override
    def _save(self) -> None:
        self._driver.save_dict(self.name, self._data, self._zstd_id)

__init__(driver, name, config, data=None, zstd_id=None)

[Internal] Initialize the dictionary entry.

Parameters:

Name Type Description Default
driver SQLDriver

the SQL driver

required
name str

the dictionary name

required
config Config

the configuration to use for precomputing the dictionary

required
data bytes | None

the dictionary data (create a new dict if not None)

None
zstd_id int | None

the zstd dictionary id (None for unknown)

None
Source code in src/lhcbdirac_log/providers/database/accessors.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def __init__(
    self,
    driver: SQLDriver,
    name: str,
    config: Config,
    data: bytes | None = None,
    zstd_id: int | None = None,
) -> None:
    """[Internal] Initialize the dictionary entry.

    Args:
        driver: the SQL driver
        name: the dictionary name
        config: the configuration to use for precomputing the dictionary
        data: the dictionary data (create a new dict if not None)
        zstd_id: the zstd dictionary id (None for unknown)
    """
    self._driver = driver
    super().__init__(name, config, data, zstd_id)

SQLDictProvider

Bases: DictProvider[SQLDictEntry]

SQL dictionary provider implementation.

Source code in src/lhcbdirac_log/providers/database/providers.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class SQLDictProvider(DictProvider[SQLDictEntry]):
    """SQL dictionary provider implementation."""

    __slots__ = ("_driver",)

    @override
    def close(self) -> None:
        self._driver.close()

    @property
    def driver(self) -> SQLDriver:
        """Get the SQL driver.

        Returns:
            the SQL driver
        """
        return self._driver

    def __init__(self, driver: SQLDriver, config: Config = DEFAULT_CONFIG, *, readonly: bool = False) -> None:
        """Initialize the dictionary provider.

        Args:
            driver: the SQL driver
            config: the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)
            readonly: whether the provider is read-only (default: False)
        """
        self._driver = driver
        super().__init__(config, readonly=readonly)

    @override
    def _load_invalid(self) -> set[str]:
        return self._driver.get_invalid_dicts()

    @override
    def _mark_invalid(self, name: str) -> None:
        self._driver.save_invalid_dict(name)

    @override
    def _load(self, name: str) -> SQLDictEntry:
        if not self._driver.check_dict(name):
            raise ValueError(name)

        return SQLDictEntry(self._driver, name, self._config)

    @override
    def _add(self, name: str, data: bytes, zstd_id: int) -> SQLDictEntry:
        if self._driver.check_dict(name):
            raise DictExistsError(name)

        return SQLDictEntry(self._driver, name, self._config, data, zstd_id)

    @override
    def _iter_all(self) -> Generator[str, None, None]:
        yield from self._driver.get_dicts()

    @override
    def _delete(self, name: str) -> None:
        self._driver.delete_dict(name)

    @override
    @property
    def size(self) -> int:
        return self._driver.get_all_dict_size()

driver: SQLDriver property

Get the SQL driver.

Returns:

Type Description
SQLDriver

the SQL driver

__init__(driver, config=DEFAULT_CONFIG, *, readonly=False)

Initialize the dictionary provider.

Parameters:

Name Type Description Default
driver SQLDriver

the SQL driver

required
config Config

the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)

DEFAULT_CONFIG
readonly bool

whether the provider is read-only (default: False)

False
Source code in src/lhcbdirac_log/providers/database/providers.py
42
43
44
45
46
47
48
49
50
51
def __init__(self, driver: SQLDriver, config: Config = DEFAULT_CONFIG, *, readonly: bool = False) -> None:
    """Initialize the dictionary provider.

    Args:
        driver: the SQL driver
        config: the configuration to use for precomputing the dictionaries (default: DEFAULT_CONFIG)
        readonly: whether the provider is read-only (default: False)
    """
    self._driver = driver
    super().__init__(config, readonly=readonly)

SQLDriver

[Internal] SQL low-level database driver for SQLite databases.

This class provides the low-level database access used for all other classes of this implementation.

This class must be instantiated manually but not used directly. Must be used through the SQLDataProvider and SQLDictProvider classes.

Source code in src/lhcbdirac_log/providers/database/driver.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
class SQLDriver:
    """[Internal] SQL low-level database driver for SQLite databases.

    This class provides the low-level database access used for all other classes of this implementation.

    This class must be instantiated manually but not used directly.
    Must be used through the SQLDataProvider and SQLDictProvider classes.
    """

    SQLITE_BLOB_LIMIT = 1_000_000_000

    __slots__ = (
        "_connection",
        "_data_table",
        "_dict_table",
        "_engine",
        "_job_table",
        "_lock",
        "_metadata",
    )

    @property
    def blob_limit(self) -> int:
        """Get the maximum size of a BLOB in the database.

        Returns:
            the maximum size of a BLOB in the database
        """
        return self.SQLITE_BLOB_LIMIT

    @property
    def engine(self) -> Engine:  # pragma: no cover
        """Get the database engine.

        Returns:
            the database engine
        """
        return self._engine

    @property
    def connection(self) -> Connection:
        """Get the database connection.

        Returns:
            the database connection
        """
        return self._connection

    @classmethod
    def create(cls, path: str | None = None) -> Self:
        """Create a new SQLite database driver instance attached to a SQLite database.

        Args:
            path: the path to the database file (new or existing), in-memory database by default (default is None)

        Returns:
            a new SQLManager instance
        """
        q = ":memory:" if path is None else str(Path(path).resolve())
        engine = create_engine(
            f"sqlite:///{q}",
            # , echo=True
        )

        return cls(engine)

    def __init__(self, engine: Engine) -> None:
        """Initialize a new SQLite database driver.

        Args:
            engine: the database engine

        Notes:
            - use the `create` class method to create a new driver from a database path

        Schema:
            - data: String name [PK], Integer job [PK, FK:job.id], Integer dict [FK:dict.id], BLOB data
            - dict: Integer id [PK], String name [UNIQUE], Integer zstd_id [NULLABLE], BLOB data [NULLABLE]
            - job: Integer id [PK], Integer dirac_id [NULLABLE], Boolean success [NULLABLE]
        """
        self._engine = engine
        self._connection = engine.connect()

        self._connection.execute(text("PRAGMA foreign_keys=ON"))
        self._connection.execute(text("PRAGMA synchronous=OFF"))
        self._connection.execute(text("PRAGMA cache_size=-131072"))  # 128 MiB
        self._connection.execute(text("PRAGMA temp_store=MEMORY"))
        self._connection.execute(text("PRAGMA locking_mode=EXCLUSIVE"))
        self._connection.execute(text("PRAGMA journal_mode=OFF"))

        self._lock = RLock()

        self._metadata = MetaData()

        self._data_table = Table(
            "data",
            self._metadata,
            Column("name", String, primary_key=True),
            Column("job", ForeignKey("job.id", ondelete="RESTRICT"), primary_key=True),
            Column("dict", ForeignKey("dict.id", ondelete="RESTRICT"), nullable=False),
            Column("data", BLOB, nullable=False),
        )

        self._dict_table = Table(
            "dict",
            self._metadata,
            Column("id", Integer, primary_key=True, autoincrement=True),
            Column("name", String, nullable=False, unique=True),  # name <=> id
            Column("zstd_id", Integer, nullable=True),  # NULL for invalid dict
            Column("data", BLOB, nullable=True),  # NULL for invalid dict
        )

        self._job_table = Table(
            "job",
            self._metadata,
            Column("id", Integer, primary_key=True),
            Column("dirac_id", Integer, nullable=True),
            Column("success", Boolean, nullable=True),
        )

        self._metadata.create_all(self._connection)

    def _check_dict(self, dict_name: str) -> tuple[int, bool]:
        """[Internal] Check if a dictionary exists in the database.

        Args:
            dict_name: the dictionary name

        Returns:
            the dict id if the dictionary exists, None otherwise, and a bool indicating if the dictionary is invalid
        """
        with self._lock:
            r = self.connection.execute(
                select(self._dict_table.c.id, self._dict_table.c.zstd_id).where(self._dict_table.c.name == dict_name)
            ).one_or_none()
            return (None, False) if r is None else (r.id, r.zstd_id is None)

    def check_dict(self, dict_name: str, *, invalid: bool = False) -> int | None:
        """Check if a dictionary exists in the database.

        Args:
            dict_name: the dictionary name
            invalid: if True, return the dict id even if the dictionary is invalid, otherwise return None (default is False)

        Returns:
            the dict id if the dictionary exists, None otherwise
        """
        r, i = self._check_dict(dict_name)
        return r if not i or invalid else None

    def check_data(self, job: int, file_name: str) -> bool:
        """Check if a file exists in the database.

        Args:
            job: the job id
            file_name: the file name

        Returns:
            True if the file exists, False otherwise
        """
        with self._lock:
            s = select(self._data_table.c.name).where(self._data_table.c.job == job, self._data_table.c.name == file_name)
            return self.connection.execute(s).one_or_none() is not None

    def check_job(self, job: int) -> bool:
        """Check if a job exists in the database.

        Args:
            job: the job id

        Returns:
            True if the job exists, False otherwise
        """
        with self._lock:
            return self.connection.execute(select(self._job_table.c.id).where(self._job_table.c.id == job)).one_or_none() is not None

    def load_dict(self, dict_name: str) -> tuple[bytes, int]:
        """Load a (non-invalid) dictionary from the database.

        Args:
            dict_name: the dictionary name

        Returns:
            a tuple of the dictionary data and its zstd ID

        Raises:
            DictNotExistsError: if the dictionary does not exist or is invalid
        """
        with self._lock:
            o = self.connection.execute(
                select(self._dict_table.c.data, self._dict_table.c.zstd_id).where(self._dict_table.c.name == dict_name)
            ).one_or_none()

            if o is not None and o.zstd_id is not None:
                return bytes(o.data), o.zstd_id

            msg = f"dictionary '{dict_name}' does not exist"
            raise DictNotExistsError(msg)

    def load_data(self, job: int, file_name: str) -> bytes:
        """Load a file from the database.

        Args:
            job: the job id
            file_name: the file name

        Returns:
            the file data or None if the file does not exist

        Raises:
            DataNotExistsError: if the file does not exist
        """
        with self._lock:
            s = select(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name)
            o = self.connection.execute(s).one_or_none()

            if o is not None:
                return bytes(o.data)  # mypy cast

            msg = f"file {file_name} does not exist in job {job}"  # pragma: no cover
            raise DataNotExistsError(msg)  # pragma: no cover

    def load_job(self, job: int) -> JobInfo:
        """Load job metadata from the database.

        Args:
            job: the job id

        Returns:
            the job metadata
        """
        with self._lock:
            o = self.connection.execute(select(self._job_table).where(self._job_table.c.id == job)).one_or_none()
            return JobInfo(o.dirac_id, o.success)

    def save_invalid_dict(self, name: str) -> None:
        """Save an invalid dictionary to the database.

        Args:
            name: the dictionary name

        Notes:
            - forwards to save_dict with data=None and zstd_id=None
        """
        self.save_dict(name, None, None)

    def save_dict(self, name: str, data: bytes | None, zstd_id: int | None) -> None:
        """Save a dictionary to the database.

        Args:
            name: the dictionary name
            data: the dictionary data or None for invalid
            zstd_id: the zstd dictionary id or None for invalid

        Raises:
            ValueError: if any of the dictionary data or zstd id is None and not the other
            DictExistsError: if the (non-invalid) dictionary already exists

        Notes:
            - overwrites the dictionary if it already exists but is invalid
        """
        s: Executable

        if (data is None) != (zstd_id is None):  # pragma: no cover
            msg = "both data and zstd_id must be None or not None"
            raise ValueError(msg)

        with self._lock:
            d_id, inv = self._check_dict(name)
            if d_id is None:
                s = insert(self._dict_table).values(name=name, data=data, zstd_id=zstd_id)
            elif inv:
                s = update(self._dict_table).where(self._dict_table.c.id == d_id).values(data=data, zstd_id=zstd_id)
            else:  # pragma: no cover
                msg = f"dictionary {name} (id: {d_id}, zstd_id: {zstd_id}) already exists"
                raise DictExistsError(msg)

            conn = self.connection
            conn.execute(s)
            conn.commit()

    def save_data(
        self,
        job: int,
        file_name: str,
        data: bytes,
    ) -> None:
        """Save a file to the database.

        Args:
            job: the job id
            file_name: the file name
            data: the data to save

        Raises:
            DictNotExistsError: if the dictionary does not exist
        """
        s: Executable

        with self._lock:
            if self.check_data(job, file_name):
                s = update(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name).values(data=data)
            else:
                dict_name = DataEntry.filename_to_dictname(file_name)
                d_id = self.check_dict(dict_name, invalid=True)

                if d_id is None:
                    msg = f"dictionary {dict_name} (for {file_name}) does not exist"
                    raise DictNotExistsError(msg)

                s = insert(self._data_table).values(name=file_name, job=job, dict=d_id, data=data)

            conn = self.connection
            conn.execute(s)
            conn.commit()

    def save_job(self, job: int, info: JobInfo | None = None) -> None:
        """Save/overwrite a job to the database.

        Args:
            job: the job id
            info: the job metadata (default is None)
        """
        s: Executable

        k = {} if info is None else {"dirac_id": info.dirac_id, "success": info.success}

        with self._lock:
            if not self.check_job(job):
                s = insert(self._job_table).values(id=job, **k)
            else:
                s = update(self._job_table).where(self._job_table.c.id == job).values(**k)

            conn = self.connection
            conn.execute(s)
            conn.commit()

    def delete_dict(self, dict_name: str) -> None:
        """Delete a dictionary from the database.

        Args:
            dict_name: the dictionary name

        Raises:
            DataExistsError: if the dictionary has associated files
            DictNotExistsError: if the dictionary does not exist
        """
        with self._lock:
            if not self.check_dict(dict_name):
                msg = f"dictionary {dict_name} does not exist"
                raise DictNotExistsError(msg)

            conn = self.connection
            try:
                conn.execute(delete(self._dict_table).where(self._dict_table.c.name == dict_name))
                conn.commit()
            except IntegrityError as err:
                conn.rollback()
                raise DataExistsError(dict_name) from err

    def delete_data(self, job: int, file_name: str) -> None:
        """Delete a file from the database.

        Args:
            job: the job id
            file_name: the file name

        Raises:
            DataNotExistsError: if the file does not exist
        """
        with self._lock:
            if not self.check_data(job, file_name):
                msg = f"file {file_name} does not exist in job {job}"
                raise DataNotExistsError(msg)

            s = delete(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name)

            conn = self.connection
            conn.execute(s)
            conn.commit()

    def delete_job(self, job: int, *, force: bool = False) -> None:
        """Delete a job from the database.

        Args:
            job: the job id
            force: if True, delete the job and its associated files (default is False)

        Raises:
            DataExistsError: if the job has associated files and force is False
        """
        with self._lock:
            conn = self.connection
            try:
                if force:
                    conn.execute(delete(self._data_table).where(self._data_table.c.job == job))
                conn.execute(delete(self._job_table).where(self._job_table.c.id == job))
                conn.commit()
            except IntegrityError as err:
                conn.rollback()
                raise DataExistsError(job) from err

    def clear_job(self, job: int) -> None:
        """Clear all files from a job.

        Args:
            job: the job id
        """
        with self._lock:
            conn = self.connection
            conn.execute(delete(self._data_table).where(self._data_table.c.job == job))
            conn.commit()

    def get_invalid_dicts(self) -> set[str]:
        """Get all invalid dictionaries in the database.

        Returns:
            a set of dictionary names

        Notes:
            - may not be a huge set
        """
        with self._lock:
            return {o.name for o in self.connection.execute(select(self._dict_table.c.name).where(self._dict_table.c.zstd_id.is_(None)))}

    def get_dicts(self, *, invalid: bool = False) -> set[str]:
        """Get all dictionaries in the database.

        Args:
            invalid: if True, include invalid dictionaries (default is False)

        Returns:
            a set of dictionary names

        Notes:
            - may not be a huge set
        """
        s = select(self._dict_table.c.name)
        if not invalid:
            s = s.where(self._dict_table.c.zstd_id.isnot(None))

        return {o.name for o in self.connection.execute(s)}

    def get_files(self, job: int) -> set[str]:
        """Get all files in a job.

        Args:
            job: the job id

        Returns:
            a set of file names

        Notes:
            - may not be a huge set
        """
        s = select(self._data_table.c.name).where(self._data_table.c.job == job)
        return {o.name for o in self.connection.execute(s)}

    def get_jobs(self) -> Generator[int, None, None]:
        """Get all jobs in the database.

        Returns:
            a generator of job ids
        """
        for o in self.connection.execute(select(self._job_table.c.id)):
            yield o.id

    def get_data_size(self, job: int, file_name: str) -> int | None:
        """Get the size of a file in the database.

        Args:
            job: the job id
            file_name: the file name

        Returns:
            the size of the file, or None if the file does not exist
        """
        with self._lock:
            s = select(func.char_length(self._data_table.c.data)).where(self._data_table.c.job == job, self._data_table.c.name == file_name)

            return self.connection.execute(s).scalar()

    def get_dict_size(self, dict_name: str) -> int:
        """Get the storage size of the specified dictionary.

        Args:
            dict_name: the dictionary name

        Returns:
            the storage size of the specified dictionary
        """
        s = select(func.sum(func.char_length(self._dict_table.c.data))).where(self._dict_table.c.name == dict_name)

        return self.connection.execute(s).scalar() or 0

    def get_all_dict_size(self) -> int:
        """Get the storage size of all dictionaries data.

        Returns:
            the storage size of all dictionaries data
        """
        s = select(func.sum(func.char_length(self._dict_table.c.data)))

        return self.connection.execute(s).scalar() or 0

    def get_job_size(self, job: int) -> int:
        """Get the storage size of all files data from a job.

        Returns:
            the storage size of all files data from a job
        """
        s = select(func.sum(func.char_length(self._data_table.c.data))).where(self._data_table.c.job == job)

        return self.connection.execute(s).scalar() or 0

    def get_all_data_size(self) -> int:
        """Get the storage size of all files data.

        Returns:
            the storage size of all files data
        """
        s = select(func.sum(func.char_length(self._data_table.c.data)))

        return self.connection.execute(s).scalar() or 0

    def get_data_count(self, job: int) -> int:
        """Get the number of files in a job.

        Args:
            job: the job id

        Returns:
            the number of files in the job
        """
        s = select(func.count()).where(self._data_table.c.job == job)

        return self.connection.execute(s).scalar()

    def get_db_size(self) -> int:
        """Get the storage size of the database file.

        Returns:
            the storage size of the database file
        """
        conn = self.connection
        return conn.execute(text("pragma page_count;")).scalar() * conn.execute(text("pragma page_size;")).scalar()

    def close(self) -> None:
        """Close the connection to the database, and dispose it.

        Notes:
            - the driver may not be used after this method is called
        """
        self.connection.close()
        self._engine.dispose()

blob_limit: int property

Get the maximum size of a BLOB in the database.

Returns:

Type Description
int

the maximum size of a BLOB in the database

connection: Connection property

Get the database connection.

Returns:

Type Description
Connection

the database connection

engine: Engine property

Get the database engine.

Returns:

Type Description
Engine

the database engine

__init__(engine)

Initialize a new SQLite database driver.

Parameters:

Name Type Description Default
engine Engine

the database engine

required
Notes
  • use the create class method to create a new driver from a database path
Schema
  • data: String name [PK], Integer job [PK, FK:job.id], Integer dict [FK:dict.id], BLOB data
  • dict: Integer id [PK], String name [UNIQUE], Integer zstd_id [NULLABLE], BLOB data [NULLABLE]
  • job: Integer id [PK], Integer dirac_id [NULLABLE], Boolean success [NULLABLE]
Source code in src/lhcbdirac_log/providers/database/driver.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(self, engine: Engine) -> None:
    """Initialize a new SQLite database driver.

    Args:
        engine: the database engine

    Notes:
        - use the `create` class method to create a new driver from a database path

    Schema:
        - data: String name [PK], Integer job [PK, FK:job.id], Integer dict [FK:dict.id], BLOB data
        - dict: Integer id [PK], String name [UNIQUE], Integer zstd_id [NULLABLE], BLOB data [NULLABLE]
        - job: Integer id [PK], Integer dirac_id [NULLABLE], Boolean success [NULLABLE]
    """
    self._engine = engine
    self._connection = engine.connect()

    self._connection.execute(text("PRAGMA foreign_keys=ON"))
    self._connection.execute(text("PRAGMA synchronous=OFF"))
    self._connection.execute(text("PRAGMA cache_size=-131072"))  # 128 MiB
    self._connection.execute(text("PRAGMA temp_store=MEMORY"))
    self._connection.execute(text("PRAGMA locking_mode=EXCLUSIVE"))
    self._connection.execute(text("PRAGMA journal_mode=OFF"))

    self._lock = RLock()

    self._metadata = MetaData()

    self._data_table = Table(
        "data",
        self._metadata,
        Column("name", String, primary_key=True),
        Column("job", ForeignKey("job.id", ondelete="RESTRICT"), primary_key=True),
        Column("dict", ForeignKey("dict.id", ondelete="RESTRICT"), nullable=False),
        Column("data", BLOB, nullable=False),
    )

    self._dict_table = Table(
        "dict",
        self._metadata,
        Column("id", Integer, primary_key=True, autoincrement=True),
        Column("name", String, nullable=False, unique=True),  # name <=> id
        Column("zstd_id", Integer, nullable=True),  # NULL for invalid dict
        Column("data", BLOB, nullable=True),  # NULL for invalid dict
    )

    self._job_table = Table(
        "job",
        self._metadata,
        Column("id", Integer, primary_key=True),
        Column("dirac_id", Integer, nullable=True),
        Column("success", Boolean, nullable=True),
    )

    self._metadata.create_all(self._connection)

check_data(job, file_name)

Check if a file exists in the database.

Parameters:

Name Type Description Default
job int

the job id

required
file_name str

the file name

required

Returns:

Type Description
bool

True if the file exists, False otherwise

Source code in src/lhcbdirac_log/providers/database/driver.py
189
190
191
192
193
194
195
196
197
198
199
200
201
def check_data(self, job: int, file_name: str) -> bool:
    """Check if a file exists in the database.

    Args:
        job: the job id
        file_name: the file name

    Returns:
        True if the file exists, False otherwise
    """
    with self._lock:
        s = select(self._data_table.c.name).where(self._data_table.c.job == job, self._data_table.c.name == file_name)
        return self.connection.execute(s).one_or_none() is not None

check_dict(dict_name, *, invalid=False)

Check if a dictionary exists in the database.

Parameters:

Name Type Description Default
dict_name str

the dictionary name

required
invalid bool

if True, return the dict id even if the dictionary is invalid, otherwise return None (default is False)

False

Returns:

Type Description
int | None

the dict id if the dictionary exists, None otherwise

Source code in src/lhcbdirac_log/providers/database/driver.py
176
177
178
179
180
181
182
183
184
185
186
187
def check_dict(self, dict_name: str, *, invalid: bool = False) -> int | None:
    """Check if a dictionary exists in the database.

    Args:
        dict_name: the dictionary name
        invalid: if True, return the dict id even if the dictionary is invalid, otherwise return None (default is False)

    Returns:
        the dict id if the dictionary exists, None otherwise
    """
    r, i = self._check_dict(dict_name)
    return r if not i or invalid else None

check_job(job)

Check if a job exists in the database.

Parameters:

Name Type Description Default
job int

the job id

required

Returns:

Type Description
bool

True if the job exists, False otherwise

Source code in src/lhcbdirac_log/providers/database/driver.py
203
204
205
206
207
208
209
210
211
212
213
def check_job(self, job: int) -> bool:
    """Check if a job exists in the database.

    Args:
        job: the job id

    Returns:
        True if the job exists, False otherwise
    """
    with self._lock:
        return self.connection.execute(select(self._job_table.c.id).where(self._job_table.c.id == job)).one_or_none() is not None

clear_job(job)

Clear all files from a job.

Parameters:

Name Type Description Default
job int

the job id

required
Source code in src/lhcbdirac_log/providers/database/driver.py
441
442
443
444
445
446
447
448
449
450
def clear_job(self, job: int) -> None:
    """Clear all files from a job.

    Args:
        job: the job id
    """
    with self._lock:
        conn = self.connection
        conn.execute(delete(self._data_table).where(self._data_table.c.job == job))
        conn.commit()

close()

Close the connection to the database, and dispose it.

Notes
  • the driver may not be used after this method is called
Source code in src/lhcbdirac_log/providers/database/driver.py
586
587
588
589
590
591
592
593
def close(self) -> None:
    """Close the connection to the database, and dispose it.

    Notes:
        - the driver may not be used after this method is called
    """
    self.connection.close()
    self._engine.dispose()

create(path=None) classmethod

Create a new SQLite database driver instance attached to a SQLite database.

Parameters:

Name Type Description Default
path str | None

the path to the database file (new or existing), in-memory database by default (default is None)

None

Returns:

Type Description
Self

a new SQLManager instance

Source code in src/lhcbdirac_log/providers/database/driver.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@classmethod
def create(cls, path: str | None = None) -> Self:
    """Create a new SQLite database driver instance attached to a SQLite database.

    Args:
        path: the path to the database file (new or existing), in-memory database by default (default is None)

    Returns:
        a new SQLManager instance
    """
    q = ":memory:" if path is None else str(Path(path).resolve())
    engine = create_engine(
        f"sqlite:///{q}",
        # , echo=True
    )

    return cls(engine)

delete_data(job, file_name)

Delete a file from the database.

Parameters:

Name Type Description Default
job int

the job id

required
file_name str

the file name

required

Raises:

Type Description
DataNotExistsError

if the file does not exist

Source code in src/lhcbdirac_log/providers/database/driver.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def delete_data(self, job: int, file_name: str) -> None:
    """Delete a file from the database.

    Args:
        job: the job id
        file_name: the file name

    Raises:
        DataNotExistsError: if the file does not exist
    """
    with self._lock:
        if not self.check_data(job, file_name):
            msg = f"file {file_name} does not exist in job {job}"
            raise DataNotExistsError(msg)

        s = delete(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name)

        conn = self.connection
        conn.execute(s)
        conn.commit()

delete_dict(dict_name)

Delete a dictionary from the database.

Parameters:

Name Type Description Default
dict_name str

the dictionary name

required

Raises:

Type Description
DataExistsError

if the dictionary has associated files

DictNotExistsError

if the dictionary does not exist

Source code in src/lhcbdirac_log/providers/database/driver.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def delete_dict(self, dict_name: str) -> None:
    """Delete a dictionary from the database.

    Args:
        dict_name: the dictionary name

    Raises:
        DataExistsError: if the dictionary has associated files
        DictNotExistsError: if the dictionary does not exist
    """
    with self._lock:
        if not self.check_dict(dict_name):
            msg = f"dictionary {dict_name} does not exist"
            raise DictNotExistsError(msg)

        conn = self.connection
        try:
            conn.execute(delete(self._dict_table).where(self._dict_table.c.name == dict_name))
            conn.commit()
        except IntegrityError as err:
            conn.rollback()
            raise DataExistsError(dict_name) from err

delete_job(job, *, force=False)

Delete a job from the database.

Parameters:

Name Type Description Default
job int

the job id

required
force bool

if True, delete the job and its associated files (default is False)

False

Raises:

Type Description
DataExistsError

if the job has associated files and force is False

Source code in src/lhcbdirac_log/providers/database/driver.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def delete_job(self, job: int, *, force: bool = False) -> None:
    """Delete a job from the database.

    Args:
        job: the job id
        force: if True, delete the job and its associated files (default is False)

    Raises:
        DataExistsError: if the job has associated files and force is False
    """
    with self._lock:
        conn = self.connection
        try:
            if force:
                conn.execute(delete(self._data_table).where(self._data_table.c.job == job))
            conn.execute(delete(self._job_table).where(self._job_table.c.id == job))
            conn.commit()
        except IntegrityError as err:
            conn.rollback()
            raise DataExistsError(job) from err

get_all_data_size()

Get the storage size of all files data.

Returns:

Type Description
int

the storage size of all files data

Source code in src/lhcbdirac_log/providers/database/driver.py
554
555
556
557
558
559
560
561
562
def get_all_data_size(self) -> int:
    """Get the storage size of all files data.

    Returns:
        the storage size of all files data
    """
    s = select(func.sum(func.char_length(self._data_table.c.data)))

    return self.connection.execute(s).scalar() or 0

get_all_dict_size()

Get the storage size of all dictionaries data.

Returns:

Type Description
int

the storage size of all dictionaries data

Source code in src/lhcbdirac_log/providers/database/driver.py
534
535
536
537
538
539
540
541
542
def get_all_dict_size(self) -> int:
    """Get the storage size of all dictionaries data.

    Returns:
        the storage size of all dictionaries data
    """
    s = select(func.sum(func.char_length(self._dict_table.c.data)))

    return self.connection.execute(s).scalar() or 0

get_data_count(job)

Get the number of files in a job.

Parameters:

Name Type Description Default
job int

the job id

required

Returns:

Type Description
int

the number of files in the job

Source code in src/lhcbdirac_log/providers/database/driver.py
564
565
566
567
568
569
570
571
572
573
574
575
def get_data_count(self, job: int) -> int:
    """Get the number of files in a job.

    Args:
        job: the job id

    Returns:
        the number of files in the job
    """
    s = select(func.count()).where(self._data_table.c.job == job)

    return self.connection.execute(s).scalar()

get_data_size(job, file_name)

Get the size of a file in the database.

Parameters:

Name Type Description Default
job int

the job id

required
file_name str

the file name

required

Returns:

Type Description
int | None

the size of the file, or None if the file does not exist

Source code in src/lhcbdirac_log/providers/database/driver.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def get_data_size(self, job: int, file_name: str) -> int | None:
    """Get the size of a file in the database.

    Args:
        job: the job id
        file_name: the file name

    Returns:
        the size of the file, or None if the file does not exist
    """
    with self._lock:
        s = select(func.char_length(self._data_table.c.data)).where(self._data_table.c.job == job, self._data_table.c.name == file_name)

        return self.connection.execute(s).scalar()

get_db_size()

Get the storage size of the database file.

Returns:

Type Description
int

the storage size of the database file

Source code in src/lhcbdirac_log/providers/database/driver.py
577
578
579
580
581
582
583
584
def get_db_size(self) -> int:
    """Get the storage size of the database file.

    Returns:
        the storage size of the database file
    """
    conn = self.connection
    return conn.execute(text("pragma page_count;")).scalar() * conn.execute(text("pragma page_size;")).scalar()

get_dict_size(dict_name)

Get the storage size of the specified dictionary.

Parameters:

Name Type Description Default
dict_name str

the dictionary name

required

Returns:

Type Description
int

the storage size of the specified dictionary

Source code in src/lhcbdirac_log/providers/database/driver.py
521
522
523
524
525
526
527
528
529
530
531
532
def get_dict_size(self, dict_name: str) -> int:
    """Get the storage size of the specified dictionary.

    Args:
        dict_name: the dictionary name

    Returns:
        the storage size of the specified dictionary
    """
    s = select(func.sum(func.char_length(self._dict_table.c.data))).where(self._dict_table.c.name == dict_name)

    return self.connection.execute(s).scalar() or 0

get_dicts(*, invalid=False)

Get all dictionaries in the database.

Parameters:

Name Type Description Default
invalid bool

if True, include invalid dictionaries (default is False)

False

Returns:

Type Description
set[str]

a set of dictionary names

Notes
  • may not be a huge set
Source code in src/lhcbdirac_log/providers/database/driver.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def get_dicts(self, *, invalid: bool = False) -> set[str]:
    """Get all dictionaries in the database.

    Args:
        invalid: if True, include invalid dictionaries (default is False)

    Returns:
        a set of dictionary names

    Notes:
        - may not be a huge set
    """
    s = select(self._dict_table.c.name)
    if not invalid:
        s = s.where(self._dict_table.c.zstd_id.isnot(None))

    return {o.name for o in self.connection.execute(s)}

get_files(job)

Get all files in a job.

Parameters:

Name Type Description Default
job int

the job id

required

Returns:

Type Description
set[str]

a set of file names

Notes
  • may not be a huge set
Source code in src/lhcbdirac_log/providers/database/driver.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def get_files(self, job: int) -> set[str]:
    """Get all files in a job.

    Args:
        job: the job id

    Returns:
        a set of file names

    Notes:
        - may not be a huge set
    """
    s = select(self._data_table.c.name).where(self._data_table.c.job == job)
    return {o.name for o in self.connection.execute(s)}

get_invalid_dicts()

Get all invalid dictionaries in the database.

Returns:

Type Description
set[str]

a set of dictionary names

Notes
  • may not be a huge set
Source code in src/lhcbdirac_log/providers/database/driver.py
452
453
454
455
456
457
458
459
460
461
462
def get_invalid_dicts(self) -> set[str]:
    """Get all invalid dictionaries in the database.

    Returns:
        a set of dictionary names

    Notes:
        - may not be a huge set
    """
    with self._lock:
        return {o.name for o in self.connection.execute(select(self._dict_table.c.name).where(self._dict_table.c.zstd_id.is_(None)))}

get_job_size(job)

Get the storage size of all files data from a job.

Returns:

Type Description
int

the storage size of all files data from a job

Source code in src/lhcbdirac_log/providers/database/driver.py
544
545
546
547
548
549
550
551
552
def get_job_size(self, job: int) -> int:
    """Get the storage size of all files data from a job.

    Returns:
        the storage size of all files data from a job
    """
    s = select(func.sum(func.char_length(self._data_table.c.data))).where(self._data_table.c.job == job)

    return self.connection.execute(s).scalar() or 0

get_jobs()

Get all jobs in the database.

Returns:

Type Description
Generator[int, None, None]

a generator of job ids

Source code in src/lhcbdirac_log/providers/database/driver.py
497
498
499
500
501
502
503
504
def get_jobs(self) -> Generator[int, None, None]:
    """Get all jobs in the database.

    Returns:
        a generator of job ids
    """
    for o in self.connection.execute(select(self._job_table.c.id)):
        yield o.id

load_data(job, file_name)

Load a file from the database.

Parameters:

Name Type Description Default
job int

the job id

required
file_name str

the file name

required

Returns:

Type Description
bytes

the file data or None if the file does not exist

Raises:

Type Description
DataNotExistsError

if the file does not exist

Source code in src/lhcbdirac_log/providers/database/driver.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def load_data(self, job: int, file_name: str) -> bytes:
    """Load a file from the database.

    Args:
        job: the job id
        file_name: the file name

    Returns:
        the file data or None if the file does not exist

    Raises:
        DataNotExistsError: if the file does not exist
    """
    with self._lock:
        s = select(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name)
        o = self.connection.execute(s).one_or_none()

        if o is not None:
            return bytes(o.data)  # mypy cast

        msg = f"file {file_name} does not exist in job {job}"  # pragma: no cover
        raise DataNotExistsError(msg)  # pragma: no cover

load_dict(dict_name)

Load a (non-invalid) dictionary from the database.

Parameters:

Name Type Description Default
dict_name str

the dictionary name

required

Returns:

Type Description
tuple[bytes, int]

a tuple of the dictionary data and its zstd ID

Raises:

Type Description
DictNotExistsError

if the dictionary does not exist or is invalid

Source code in src/lhcbdirac_log/providers/database/driver.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def load_dict(self, dict_name: str) -> tuple[bytes, int]:
    """Load a (non-invalid) dictionary from the database.

    Args:
        dict_name: the dictionary name

    Returns:
        a tuple of the dictionary data and its zstd ID

    Raises:
        DictNotExistsError: if the dictionary does not exist or is invalid
    """
    with self._lock:
        o = self.connection.execute(
            select(self._dict_table.c.data, self._dict_table.c.zstd_id).where(self._dict_table.c.name == dict_name)
        ).one_or_none()

        if o is not None and o.zstd_id is not None:
            return bytes(o.data), o.zstd_id

        msg = f"dictionary '{dict_name}' does not exist"
        raise DictNotExistsError(msg)

load_job(job)

Load job metadata from the database.

Parameters:

Name Type Description Default
job int

the job id

required

Returns:

Type Description
JobInfo

the job metadata

Source code in src/lhcbdirac_log/providers/database/driver.py
261
262
263
264
265
266
267
268
269
270
271
272
def load_job(self, job: int) -> JobInfo:
    """Load job metadata from the database.

    Args:
        job: the job id

    Returns:
        the job metadata
    """
    with self._lock:
        o = self.connection.execute(select(self._job_table).where(self._job_table.c.id == job)).one_or_none()
        return JobInfo(o.dirac_id, o.success)

save_data(job, file_name, data)

Save a file to the database.

Parameters:

Name Type Description Default
job int

the job id

required
file_name str

the file name

required
data bytes

the data to save

required

Raises:

Type Description
DictNotExistsError

if the dictionary does not exist

Source code in src/lhcbdirac_log/providers/database/driver.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def save_data(
    self,
    job: int,
    file_name: str,
    data: bytes,
) -> None:
    """Save a file to the database.

    Args:
        job: the job id
        file_name: the file name
        data: the data to save

    Raises:
        DictNotExistsError: if the dictionary does not exist
    """
    s: Executable

    with self._lock:
        if self.check_data(job, file_name):
            s = update(self._data_table).where(self._data_table.c.job == job, self._data_table.c.name == file_name).values(data=data)
        else:
            dict_name = DataEntry.filename_to_dictname(file_name)
            d_id = self.check_dict(dict_name, invalid=True)

            if d_id is None:
                msg = f"dictionary {dict_name} (for {file_name}) does not exist"
                raise DictNotExistsError(msg)

            s = insert(self._data_table).values(name=file_name, job=job, dict=d_id, data=data)

        conn = self.connection
        conn.execute(s)
        conn.commit()

save_dict(name, data, zstd_id)

Save a dictionary to the database.

Parameters:

Name Type Description Default
name str

the dictionary name

required
data bytes | None

the dictionary data or None for invalid

required
zstd_id int | None

the zstd dictionary id or None for invalid

required

Raises:

Type Description
ValueError

if any of the dictionary data or zstd id is None and not the other

DictExistsError

if the (non-invalid) dictionary already exists

Notes
  • overwrites the dictionary if it already exists but is invalid
Source code in src/lhcbdirac_log/providers/database/driver.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def save_dict(self, name: str, data: bytes | None, zstd_id: int | None) -> None:
    """Save a dictionary to the database.

    Args:
        name: the dictionary name
        data: the dictionary data or None for invalid
        zstd_id: the zstd dictionary id or None for invalid

    Raises:
        ValueError: if any of the dictionary data or zstd id is None and not the other
        DictExistsError: if the (non-invalid) dictionary already exists

    Notes:
        - overwrites the dictionary if it already exists but is invalid
    """
    s: Executable

    if (data is None) != (zstd_id is None):  # pragma: no cover
        msg = "both data and zstd_id must be None or not None"
        raise ValueError(msg)

    with self._lock:
        d_id, inv = self._check_dict(name)
        if d_id is None:
            s = insert(self._dict_table).values(name=name, data=data, zstd_id=zstd_id)
        elif inv:
            s = update(self._dict_table).where(self._dict_table.c.id == d_id).values(data=data, zstd_id=zstd_id)
        else:  # pragma: no cover
            msg = f"dictionary {name} (id: {d_id}, zstd_id: {zstd_id}) already exists"
            raise DictExistsError(msg)

        conn = self.connection
        conn.execute(s)
        conn.commit()

save_invalid_dict(name)

Save an invalid dictionary to the database.

Parameters:

Name Type Description Default
name str

the dictionary name

required
Notes
  • forwards to save_dict with data=None and zstd_id=None
Source code in src/lhcbdirac_log/providers/database/driver.py
274
275
276
277
278
279
280
281
282
283
def save_invalid_dict(self, name: str) -> None:
    """Save an invalid dictionary to the database.

    Args:
        name: the dictionary name

    Notes:
        - forwards to save_dict with data=None and zstd_id=None
    """
    self.save_dict(name, None, None)

save_job(job, info=None)

Save/overwrite a job to the database.

Parameters:

Name Type Description Default
job int

the job id

required
info JobInfo | None

the job metadata (default is None)

None
Source code in src/lhcbdirac_log/providers/database/driver.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def save_job(self, job: int, info: JobInfo | None = None) -> None:
    """Save/overwrite a job to the database.

    Args:
        job: the job id
        info: the job metadata (default is None)
    """
    s: Executable

    k = {} if info is None else {"dirac_id": info.dirac_id, "success": info.success}

    with self._lock:
        if not self.check_job(job):
            s = insert(self._job_table).values(id=job, **k)
        else:
            s = update(self._job_table).where(self._job_table.c.id == job).values(**k)

        conn = self.connection
        conn.execute(s)
        conn.commit()

SQLJobEntry

Bases: JobEntry[SQLDataEntry]

SQL Database job entry implementation.

Only supports the new compressed (Zstandard) format.

Source code in src/lhcbdirac_log/providers/database/accessors.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
class SQLJobEntry(JobEntry[SQLDataEntry]):
    """SQL Database job entry implementation.

    Only supports the new compressed (Zstandard) format.
    """

    __slots__ = ("_driver",)

    def __init__(self, driver: SQLDriver, job: int, *, readonly: bool, create: bool = True, exists_ok: bool = True) -> None:
        """[Internal] Initialize the job entry.

        Args:
            driver: the SQL driver
            job: the job id
            readonly: indicate weather the job is read-only or not
            create: if True, create the job (default is True)
            exists_ok: if True, ignore the error if the data already exists (default is True)

        Raises:
            JobExistsError: if the job exists and exists_ok is False
            JobNotExistsError: if the job does not exist and create is False

        Notes:
            - compressed is always True
        """
        self._driver = driver

        if self._driver.check_job(job):
            if not exists_ok:
                raise JobExistsError(job)
        elif create:
            self._driver.save_job(job)
        else:
            raise JobNotExistsError(job)

        super().__init__(job, compressed=True, readonly=readonly)

    @property
    def driver(self) -> SQLDriver:
        """Get the SQL driver.

        Returns:
            the SQL driver
        """
        return self._driver

    @override
    def _get(self, name: str, *, create: bool = False) -> SQLDataEntry:
        if not create and not self._driver.check_data(self._job, name):
            raise DataNotExistsError(name)

        return SQLDataEntry(self._driver, name, self._job, readonly=self._readonly)

    @override
    def _create(self, name: str, *, exists_ok: bool = False) -> SQLDataEntry:
        if not exists_ok and self._driver.check_data(self._job, name):
            raise DataExistsError(name)

        return SQLDataEntry(self._driver, name, self._job, readonly=self._readonly)

    @override
    def files(self) -> Generator[str, None, None]:
        yield from self._driver.get_files(self._job)

    @override
    @property
    def data_size(self) -> int:  # optimizes the default implementation
        return self._driver.get_job_size(self._job)

    @override
    def delete(self, name: str) -> None:  # optimizes the default implementation
        if self._readonly:
            msg = "Cannot delete data from read-only job"
            raise ReadOnlyError(msg)

        self._driver.delete_data(self._job, name)

    @override
    def clear(self) -> None:  # optimizes the default implementation
        if self._readonly:
            msg = "Cannot clear data from read-only job"
            raise ReadOnlyError(msg)

        self._driver.clear_job(self._job)

    @override
    def __len__(self) -> int:
        return self._driver.get_data_count(self._job)

    @override
    def _update_info(self) -> None:
        self._driver.save_job(self._job, self._info)

    @override
    def _load_info(self) -> JobInfo:
        return self._driver.load_job(self._job)

driver: SQLDriver property

Get the SQL driver.

Returns:

Type Description
SQLDriver

the SQL driver

__init__(driver, job, *, readonly, create=True, exists_ok=True)

[Internal] Initialize the job entry.

Parameters:

Name Type Description Default
driver SQLDriver

the SQL driver

required
job int

the job id

required
readonly bool

indicate weather the job is read-only or not

required
create bool

if True, create the job (default is True)

True
exists_ok bool

if True, ignore the error if the data already exists (default is True)

True

Raises:

Type Description
JobExistsError

if the job exists and exists_ok is False

JobNotExistsError

if the job does not exist and create is False

Notes
  • compressed is always True
Source code in src/lhcbdirac_log/providers/database/accessors.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def __init__(self, driver: SQLDriver, job: int, *, readonly: bool, create: bool = True, exists_ok: bool = True) -> None:
    """[Internal] Initialize the job entry.

    Args:
        driver: the SQL driver
        job: the job id
        readonly: indicate weather the job is read-only or not
        create: if True, create the job (default is True)
        exists_ok: if True, ignore the error if the data already exists (default is True)

    Raises:
        JobExistsError: if the job exists and exists_ok is False
        JobNotExistsError: if the job does not exist and create is False

    Notes:
        - compressed is always True
    """
    self._driver = driver

    if self._driver.check_job(job):
        if not exists_ok:
            raise JobExistsError(job)
    elif create:
        self._driver.save_job(job)
    else:
        raise JobNotExistsError(job)

    super().__init__(job, compressed=True, readonly=readonly)