Skip to content

FSspec Provider

Implementation of the data provider layer for fsspec supported (remote) filesystems.

This implementation only supports the old uncompressed (means ZIP) format. Thus, Zstandard related features are not implemented, including the dictionary provider.

Since the purpose of this library is to migrate from the old format to the new one, creating new job/data entries with this provider is not allowed nor implemented. Thus, this implementation only supports the read-only mode.

Classes:

Name Description
- RootDataEntry

fsspec data entry implementation

- RootJobEntry

fsspec job entry implementation

- RootDataProvider

fsspec data provider implementation

Notes
  • classes are prefixed with Root because these are mainly intended to be used with the CERN's EOS filesystem using the XRootD protocol

RootDataEntry

Bases: DataEntry

Fsspec data entry implementation.

Only supports the old uncompressed (means ZIP) format. Only supports read-only mode.

Source code in src/lhcbdirac_log/providers/fsspec/accessors.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class RootDataEntry(DataEntry):
    """Fsspec data entry implementation.

    Only supports the old uncompressed (means ZIP) format.
    Only supports read-only mode.
    """

    __slots__ = ("_job_entry",)

    def __init__(self, job: RootJobEntry, name: str) -> None:
        """[Internal] Initialize the data entry.

        Args:
            job: the job entry
            name: the data name
        """
        self._job_entry = job
        super().__init__(name, job.job, compressed=False, readonly=True)

    @override
    def _reader(self) -> BinaryIO | IO[bytes]:
        try:
            return self._job_entry.reader.open(f"{self._job_entry.prefix}{self._name}", "r")
        except FileNotFoundError as err:  # pragma: no cover
            raise DataNotExistsError(self._name) from err

    @override
    def _writer(self) -> BinaryIO:  # pragma: no cover
        pass

    @override
    def _size(self) -> int | None:
        try:
            return self._job_entry.reader.getinfo(f"{self._job_entry.prefix}{self._name}").file_size
        except FileNotFoundError:  # pragma: no cover
            return None

    @override
    def _delete(self) -> None:  # pragma: no cover
        pass

__init__(job, name)

[Internal] Initialize the data entry.

Parameters:

Name Type Description Default
job RootJobEntry

the job entry

required
name str

the data name

required
Source code in src/lhcbdirac_log/providers/fsspec/accessors.py
34
35
36
37
38
39
40
41
42
def __init__(self, job: RootJobEntry, name: str) -> None:
    """[Internal] Initialize the data entry.

    Args:
        job: the job entry
        name: the data name
    """
    self._job_entry = job
    super().__init__(name, job.job, compressed=False, readonly=True)

RootDataProvider

Bases: DataProvider[RootJobEntry]

FSspec data provider implementation.

Only supports the old uncompressed (means ZIP) format. Only supports read-only mode.

Source code in src/lhcbdirac_log/providers/fsspec/providers.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class RootDataProvider(DataProvider[RootJobEntry]):
    """FSspec data provider implementation.

    Only supports the old uncompressed (means ZIP) format.
    Only supports read-only mode.
    """

    __slots__ = ("_driver",)

    @property
    def driver(self) -> RootDriver:
        """Get the root driver.

        Returns:
            the root driver
        """
        return self._driver

    def __init__(self, url: str) -> None:
        """Initialize the data provider.

        Args:
            url: the root url
        """
        self._driver = RootDriver(url)
        super().__init__(readonly=True)

    @override
    def _get(self, job: int, *, create: bool = False) -> RootJobEntry:
        return RootJobEntry(self._driver, job)

    @override
    def _create(self, job: int, *, exists_ok: bool = False) -> RootJobEntry:  # pragma: no cover
        pass

    @override
    def jobs(self) -> Generator[int, None, None]:
        return (int(i[-12:-4]) for i in self._driver.fs.listdir(self._driver.path, detail=False) if i[-12:-4].isdecimal())

    @override
    def _delete(self, job: int, *, force: bool = False) -> None:  # pragma: no cover
        pass

driver: RootDriver property

Get the root driver.

Returns:

Type Description
RootDriver

the root driver

__init__(url)

Initialize the data provider.

Parameters:

Name Type Description Default
url str

the root url

required
Source code in src/lhcbdirac_log/providers/fsspec/providers.py
37
38
39
40
41
42
43
44
def __init__(self, url: str) -> None:
    """Initialize the data provider.

    Args:
        url: the root url
    """
    self._driver = RootDriver(url)
    super().__init__(readonly=True)

RootJobEntry

Bases: JobEntry[RootDataEntry]

Fsspec job entry implementation.

Only supports the old uncompressed (means ZIP) format. Only supports read-only mode.

Source code in src/lhcbdirac_log/providers/fsspec/accessors.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class RootJobEntry(JobEntry[RootDataEntry]):
    """Fsspec job entry implementation.

    Only supports the old uncompressed (means ZIP) format.
    Only supports read-only mode.
    """

    __slots__ = (
        "_driver",
        "_prefix",
        "_reader",
        "_path",
    )

    def __init__(self, driver: RootDriver, job: int) -> None:
        """[Internal] Initialize the job entry.

        Args:
            driver: the root driver
            job: the job id

        Raises:
            JobNotExistsError: if the job does not exist
        """
        self._driver = driver
        self._reader: ZipFile | None = None

        self._path = f"{self._driver.path}/{job:08}.zip"

        if not self._driver.fs.isfile(self._path):
            raise JobNotExistsError(job)

        self._prefix = f"{job:08}/"

        super().__init__(job, compressed=False, readonly=True)

    @property
    def driver(self) -> RootDriver:
        """Get the root driver.

        Returns:
            the root driver
        """
        return self._driver

    @property
    def prefix(self) -> str:
        """Get the job path prefix with trailing slash.

        Returns:
            the path prefix (e.g. "00000001/")
        """
        return self._prefix

    @property
    def reader(self) -> ZipFile:
        """Get the zip-file handler.

        Returns:
            the zip reader
        """
        if self._reader is None:
            self._reader = ZipFile(self._driver.fs.open(self._path))

        return self._reader

    @override
    def _get(self, name: str, *, create: bool = False) -> RootDataEntry:
        r = self.reader
        zname = f"{self._prefix}{name}"

        try:
            r.getinfo(zname)  # raises KeyError if the file does not exist
        except KeyError as err:
            raise DataNotExistsError(name) from err

        return RootDataEntry(self, name)

    @override
    def _create(self, name: str, *, exists_ok: bool = False) -> RootDataEntry:  # pragma: no cover
        pass

    @override
    def files(self) -> Generator[str, None, None]:
        yield from (i.filename.removeprefix(self._prefix) for i in self.reader.infolist())

    @override
    @property
    def job_size(self) -> int:  # optimizes the default implementation
        return self._driver.fs.size(self._path) or 0

    @override
    @property
    def data_size(self) -> int:  # optimizes the default implementation
        return sum(i.file_size for i in self.reader.infolist())

    @override
    def delete(self, name: str) -> None:  # optimizes the default implementation
        raise ReadOnlyError(name)

    @override
    def __len__(self) -> int:
        return len(self.reader.infolist())

    def __del__(self) -> None:
        """Close the zip-file reader."""
        if self._reader is not None:
            fp = self._reader.fp
            self._reader.close()

            if fp is not None:
                fp.close()

            self._reader = None

    @override
    def _update_info(self) -> None:  # pragma: no cover
        pass

driver: RootDriver property

Get the root driver.

Returns:

Type Description
RootDriver

the root driver

prefix: str property

Get the job path prefix with trailing slash.

Returns:

Type Description
str

the path prefix (e.g. "00000001/")

reader: ZipFile property

Get the zip-file handler.

Returns:

Type Description
ZipFile

the zip reader

__del__()

Close the zip-file reader.

Source code in src/lhcbdirac_log/providers/fsspec/accessors.py
171
172
173
174
175
176
177
178
179
180
def __del__(self) -> None:
    """Close the zip-file reader."""
    if self._reader is not None:
        fp = self._reader.fp
        self._reader.close()

        if fp is not None:
            fp.close()

        self._reader = None

__init__(driver, job)

[Internal] Initialize the job entry.

Parameters:

Name Type Description Default
driver RootDriver

the root driver

required
job int

the job id

required

Raises:

Type Description
JobNotExistsError

if the job does not exist

Source code in src/lhcbdirac_log/providers/fsspec/accessors.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(self, driver: RootDriver, job: int) -> None:
    """[Internal] Initialize the job entry.

    Args:
        driver: the root driver
        job: the job id

    Raises:
        JobNotExistsError: if the job does not exist
    """
    self._driver = driver
    self._reader: ZipFile | None = None

    self._path = f"{self._driver.path}/{job:08}.zip"

    if not self._driver.fs.isfile(self._path):
        raise JobNotExistsError(job)

    self._prefix = f"{job:08}/"

    super().__init__(job, compressed=False, readonly=True)