Skip to content

Simple API

This package provides the Zstandard compressor and decompressor wrapper classes.

These classes respect a common interface through ZstandardProcessor and can be used interchangeably. The ZstandardTrainer class is also provided through the same API but is a little special.

Classes:

Name Description
- ZstandardCompressor

Zstandard compressor wrapper class.

- ZstandardDecompressor

Zstandard decompressor wrapper class.

- ZstandardProcessor

Zstandard base processor abstract interface.

- ZstandardTrainer

Zstandard trainer wrapper class.

ZstandardCompressor

Bases: ZstandardProcessor[ZstdCompressor]

Wrapper for Zstandard compressor.

Performs data compression using Zstandard, featuring extra functionality: - zstd-context management (auto compressor reinstantiation) - switchable dictionary

Notes
  • not thread-safe
  • dictionary switching requires reinstantiation of the internal context processor (slow)
Source code in src/lhcbdirac_log/zstd/processors/compressor.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ZstandardCompressor(ZstandardProcessor[ZstdCompressor]):
    """Wrapper for Zstandard compressor.

    Performs data compression using Zstandard, featuring extra functionality:
        - zstd-context management (auto compressor reinstantiation)
        - switchable dictionary

    Notes:
        - not thread-safe
        - dictionary switching requires reinstantiation of the internal context processor (slow)
    """

    __slots__ = ()

    @override
    def _new(self) -> ZstdCompressor:
        return ZstdCompressor(dict_data=None if self._dict is None else self._dict.dict, compression_params=self._config.params)

    @override
    def process(self, *data: bytes | bytearray | memoryview) -> bytes | Iterator[bytes]:
        if len(data) == 1:
            return bytes(self.processor.compress(data[0])) if data[0] else b""
        return (bytes(self.processor.compress(i)) if i else b"" for i in data)

    @override
    def process_stream(
        self,
        instream: BinaryIO,
        outstream: BinaryIO,
        insize: int = -1,
    ) -> tuple[int, int]:
        r: tuple[int, int] = self.processor.copy_stream(
            instream,
            outstream,
            insize,
            self._config.stream_read_buffer,
            self._config.stream_write_buffer,
        )
        return r  # fix mypy error on untyped zstdlib

    compress = process
    compress_stream = process_stream

ZstandardDecompressor

Bases: ZstandardProcessor[ZstdDecompressor]

Wrapper for Zstandard decompressor.

Performs data decompression using Zstandard, featuring extra functionality: - zstd-context management (auto decompressor reinstantiation) - switchable dictionary

Notes
  • not thread-safe
  • dictionary switching requires reinstantiation of the internal context processor (slow)
Source code in src/lhcbdirac_log/zstd/processors/decompressor.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class ZstandardDecompressor(ZstandardProcessor[ZstdDecompressor]):
    """Wrapper for Zstandard decompressor.

    Performs data decompression using Zstandard, featuring extra functionality:
        - zstd-context management (auto decompressor reinstantiation)
        - switchable dictionary

    Notes:
        - not thread-safe
        - dictionary switching requires reinstantiation of the internal context processor (slow)
    """

    __slots__ = ()

    @override
    def _new(self) -> ZstdDecompressor:
        return ZstdDecompressor(dict_data=None if self._dict is None else self._dict.dict)

    @override
    def process(self, *data: bytes | bytearray | memoryview) -> bytes | Iterator[bytes]:
        if len(data) == 1:
            return bytes(self.processor.decompress(data[0])) if data[0] else b""
        return (bytes(self.processor.decompress(i)) if i else b"" for i in data)

    @override
    def process_stream(
        self,
        instream: BinaryIO,
        outstream: BinaryIO,
        insize: int = -1,
    ) -> tuple[int, int]:
        """Process the provided stream.

        Args:
            instream: the stream to process
            outstream: the stream to write the processed data to
            insize: ignored

        Returns:
            a tuple with the number of bytes read and written
        """
        r: tuple[int, int] = self.processor.copy_stream(instream, outstream, self._config.stream_read_buffer, self._config.stream_write_buffer)
        return r

    decompress = process
    decompress_stream = process_stream

process_stream(instream, outstream, insize=-1)

Process the provided stream.

Parameters:

Name Type Description Default
instream BinaryIO

the stream to process

required
outstream BinaryIO

the stream to write the processed data to

required
insize int

ignored

-1

Returns:

Type Description
tuple[int, int]

a tuple with the number of bytes read and written

Source code in src/lhcbdirac_log/zstd/processors/decompressor.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@override
def process_stream(
    self,
    instream: BinaryIO,
    outstream: BinaryIO,
    insize: int = -1,
) -> tuple[int, int]:
    """Process the provided stream.

    Args:
        instream: the stream to process
        outstream: the stream to write the processed data to
        insize: ignored

    Returns:
        a tuple with the number of bytes read and written
    """
    r: tuple[int, int] = self.processor.copy_stream(instream, outstream, self._config.stream_read_buffer, self._config.stream_write_buffer)
    return r

ZstandardProcessor

Bases: ABC

Zstandard processor abstract base class.

Provides a common interface for Zstandard compressor and decompressor wrappers, and trainer too.

Source code in src/lhcbdirac_log/zstd/processors/processor.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class ZstandardProcessor[T](ABC):
    """Zstandard processor abstract base class.

    Provides a common interface for Zstandard compressor and decompressor wrappers, and trainer too.
    """

    __slots__ = (
        "_config",
        "_dict",
        "_processor",
    )

    def __init__(self, config: Config = DEFAULT_CONFIG, dict_entry: DictEntry | None = None) -> None:
        """Initialize the processor.

        Args:
            config: the configuration to use for the processing (default is DEFAULT_CONFIG)
            dict_entry: the dictionary entry to use (can be set/changed later)
        """
        super().__init__()

        self._dict = dict_entry
        self._config = config
        self._processor: T | None = None

    @property
    @final
    def dict_name(self) -> str | None:
        """Get the dictionary name of the processor or None.

        Returns:
            the dictionary name or None if no dictionary is set
        """
        if self._dict is None:
            return None

        return self._dict.dict_name

    @property
    def dict(self) -> DictEntry | None:
        """Get the dictionary entry.

        Returns:
            the dictionary entry or None if no dictionary is set
        """
        return self._dict

    @dict.setter
    def dict(self, value: DictEntry) -> None:
        """Set the dictionary.

        Args:
            value: the dictionary entry

        Notes:
            - Setting a different dictionary will reset the internal processor
        """
        if self._dict is not value:
            self._dict = value
            self._processor = None  # dict cannot be changed on the fly, so we need to reset it

    @property
    @final
    def processor(self) -> T:
        """Get the internal processor.

        Returns:
            the internal processor
        """
        if self._processor is None:
            self._processor = self._new()
        return self._processor

    @abstractmethod
    def _new(self) -> T:
        """[Internal] Create a new internal processor.

        Returns:
            a new internal processor
        """

    @abstractmethod
    def process(self, *data: bytes | bytearray | memoryview) -> bytes | Iterator[bytes]:
        """Process the provided data.

        Args:
            *data: the data to process

        Returns:
            a bytes object (if single output) or
            a bytes iterator

        Notes:
            - empty data will return an empty bytes
        """

    @abstractmethod
    def process_stream(
        self,
        instream: BinaryIO,
        outstream: BinaryIO,
        insize: int = -1,
    ) -> tuple[int, int]:
        """Process the provided stream.

        Args:
            instream: the stream to process
            outstream: the stream to write the processed data to
            insize: the size of the input stream, -1 for unknown

        Returns:
            a tuple with the number of bytes read and written
        """

dict: DictEntry | None property writable

Get the dictionary entry.

Returns:

Type Description
DictEntry | None

the dictionary entry or None if no dictionary is set

dict_name: str | None property

Get the dictionary name of the processor or None.

Returns:

Type Description
str | None

the dictionary name or None if no dictionary is set

processor: T property

Get the internal processor.

Returns:

Type Description
T

the internal processor

__init__(config=DEFAULT_CONFIG, dict_entry=None)

Initialize the processor.

Parameters:

Name Type Description Default
config Config

the configuration to use for the processing (default is DEFAULT_CONFIG)

DEFAULT_CONFIG
dict_entry DictEntry | None

the dictionary entry to use (can be set/changed later)

None
Source code in src/lhcbdirac_log/zstd/processors/processor.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, config: Config = DEFAULT_CONFIG, dict_entry: DictEntry | None = None) -> None:
    """Initialize the processor.

    Args:
        config: the configuration to use for the processing (default is DEFAULT_CONFIG)
        dict_entry: the dictionary entry to use (can be set/changed later)
    """
    super().__init__()

    self._dict = dict_entry
    self._config = config
    self._processor: T | None = None

process(*data) abstractmethod

Process the provided data.

Parameters:

Name Type Description Default
*data bytes | bytearray | memoryview

the data to process

()

Returns:

Type Description
bytes | Iterator[bytes]

a bytes object (if single output) or

bytes | Iterator[bytes]

a bytes iterator

Notes
  • empty data will return an empty bytes
Source code in src/lhcbdirac_log/zstd/processors/processor.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@abstractmethod
def process(self, *data: bytes | bytearray | memoryview) -> bytes | Iterator[bytes]:
    """Process the provided data.

    Args:
        *data: the data to process

    Returns:
        a bytes object (if single output) or
        a bytes iterator

    Notes:
        - empty data will return an empty bytes
    """

process_stream(instream, outstream, insize=-1) abstractmethod

Process the provided stream.

Parameters:

Name Type Description Default
instream BinaryIO

the stream to process

required
outstream BinaryIO

the stream to write the processed data to

required
insize int

the size of the input stream, -1 for unknown

-1

Returns:

Type Description
tuple[int, int]

a tuple with the number of bytes read and written

Source code in src/lhcbdirac_log/zstd/processors/processor.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@abstractmethod
def process_stream(
    self,
    instream: BinaryIO,
    outstream: BinaryIO,
    insize: int = -1,
) -> tuple[int, int]:
    """Process the provided stream.

    Args:
        instream: the stream to process
        outstream: the stream to write the processed data to
        insize: the size of the input stream, -1 for unknown

    Returns:
        a tuple with the number of bytes read and written
    """

ZstandardTrainer

Bases: ZstandardProcessor[ZstandardProcessor]

Wrapper for Zstandard trainer.

Performs dictionary training from data, using Zstandard.

Notes
  • not thread-safe
Source code in src/lhcbdirac_log/zstd/processors/trainer.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class ZstandardTrainer(ZstandardProcessor[ZstandardProcessor]):
    """Wrapper for Zstandard trainer.

    Performs dictionary training from data, using Zstandard.

    Notes:
        - not thread-safe
    """

    __slots__ = ()

    @override
    def _new(self) -> Self:
        return self

    @override
    def process(self, *data: bytes | bytearray | memoryview, dict_id: int = 0, dict_size: int = 0) -> bytes:
        """Train a dictionary from the provided data.

        Args:
            *data: the data to use for training
            dict_id: the zstd dictionary id to use (default is 0 = auto)
            dict_size: the final size of the dictionary to train (default is 0 = auto)

        Returns:
            the trained dictionary data

        Raises:
            ValueError: if the dataset is invalid (too small or empty dataset, or only too big samples...)
            ZstdError: if the training fails
        """
        return self._train([i for i in data if len(data) > 0], dict_id=dict_id, dict_size=dict_size).as_bytes()

    @override
    def process_stream(self, instream: BinaryIO, outstream: BinaryIO, insize: int = -1) -> tuple[int, int]:
        """Process the provided stream. Not implemented.

        Args:
            instream: ignored
            outstream: ignored
            insize: ignored

        Returns:
            a tuple with the number of bytes read and written

        Raises:
            NotImplementedError: always
        """
        raise NotImplementedError

    def train(self, dict_provider: DictProvider, *data: DataEntry, dict_size: int = 0) -> bytes:
        """Train a dict with the sample data.

        The newly trained dictionary is added to the dictionary provider,
        and is available through the `dict` property.

        Args:
            dict_provider: the destination dictionary provider
            *data: the data to use for training
            dict_size: the final size of the dictionary to train (0: auto)

        Returns:
            the trained dictionary data

        Raises:
            ValueError: if the dataset is invalid (too small or empty dataset, or only too big samples...)
            ZstdError: if the training fails
        """
        fdata = [i for i in data if i.size > 0]  # filter out empty data

        if len(fdata) <= 1:
            msg = f"Not enough data to train a dictionary, at least 2 non-empty samples are required, got {len(fdata)}"
            raise ValueError(msg)

        fdata.sort(key=lambda x: x.size)  # sort by size (smallest first)

        samples = []  # dataset
        size = 0

        # load samples until max sample limit or max size limit is reached
        for i in range(min(len(fdata), self._config.train_max_sample)):
            dt = fdata[i]
            size += dt.size

            if size > self._config.train_max_size:
                size -= dt.size
                break

            with dt.reader() as r:
                samples.append(bytes(r.read()))

        d = self._train(samples, dict_size=dict_size, size=size)
        self._dict = dict_provider.add(fdata[0].dict_name, dt := d.as_bytes(), d.dict_id())
        return dt

    def _train(self, samples: list[bytes], *, dict_id: int = 0, size: int = -1, dict_size: int = 0) -> ZstdCompressionDict:
        """[Internal] Train a dictionary from the provided data.

        Args:
            samples: the data to use for training
            dict_id: the zstd dictionary id to use (default is 0 = auto)
            size: the total size of the samples (default is -1 = auto computed)
            dict_size: the final size of the dictionary to train (default is 0 = auto)

        Returns:
            the trained zstd dictionary

        Raises:
            ValueError: if the dataset is invalid (too small or empty dataset, or only too big samples...)
            ZstdError: if the training fails
        """
        if len(samples) <= 1:
            msg = f"Not enough data to train a dictionary, at least 2 non-empty, not too big, samples are required, got {len(samples)} (all samples size limit: {self._config.train_max_size})"
            raise ValueError(msg)

        sp = self._config.train_split_point or 0.75

        if int(sp * len(samples)) <= 1:  # change the split point if too small
            sp = min(2 / len(samples) + 0.0001, 1)  # split point min (+ float epsilon)

        return train_dictionary(
            split_point=sp,
            dict_id=dict_id,
            samples=samples,
            level=self._config.train_level,
            notifications=self._config.train_notifications,
            threads=self._config.train_threads,
            dict_size=max(dict_size or round((sum(len(i) for i in samples) if size < 0 else size) * sp / self._config.train_dict_size_factor), 0x100),
        )

process(*data, dict_id=0, dict_size=0)

Train a dictionary from the provided data.

Parameters:

Name Type Description Default
*data bytes | bytearray | memoryview

the data to use for training

()
dict_id int

the zstd dictionary id to use (default is 0 = auto)

0
dict_size int

the final size of the dictionary to train (default is 0 = auto)

0

Returns:

Type Description
bytes

the trained dictionary data

Raises:

Type Description
ValueError

if the dataset is invalid (too small or empty dataset, or only too big samples...)

ZstdError

if the training fails

Source code in src/lhcbdirac_log/zstd/processors/trainer.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@override
def process(self, *data: bytes | bytearray | memoryview, dict_id: int = 0, dict_size: int = 0) -> bytes:
    """Train a dictionary from the provided data.

    Args:
        *data: the data to use for training
        dict_id: the zstd dictionary id to use (default is 0 = auto)
        dict_size: the final size of the dictionary to train (default is 0 = auto)

    Returns:
        the trained dictionary data

    Raises:
        ValueError: if the dataset is invalid (too small or empty dataset, or only too big samples...)
        ZstdError: if the training fails
    """
    return self._train([i for i in data if len(data) > 0], dict_id=dict_id, dict_size=dict_size).as_bytes()

process_stream(instream, outstream, insize=-1)

Process the provided stream. Not implemented.

Parameters:

Name Type Description Default
instream BinaryIO

ignored

required
outstream BinaryIO

ignored

required
insize int

ignored

-1

Returns:

Type Description
tuple[int, int]

a tuple with the number of bytes read and written

Raises:

Type Description
NotImplementedError

always

Source code in src/lhcbdirac_log/zstd/processors/trainer.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@override
def process_stream(self, instream: BinaryIO, outstream: BinaryIO, insize: int = -1) -> tuple[int, int]:
    """Process the provided stream. Not implemented.

    Args:
        instream: ignored
        outstream: ignored
        insize: ignored

    Returns:
        a tuple with the number of bytes read and written

    Raises:
        NotImplementedError: always
    """
    raise NotImplementedError

train(dict_provider, *data, dict_size=0)

Train a dict with the sample data.

The newly trained dictionary is added to the dictionary provider, and is available through the dict property.

Parameters:

Name Type Description Default
dict_provider DictProvider

the destination dictionary provider

required
*data DataEntry

the data to use for training

()
dict_size int

the final size of the dictionary to train (0: auto)

0

Returns:

Type Description
bytes

the trained dictionary data

Raises:

Type Description
ValueError

if the dataset is invalid (too small or empty dataset, or only too big samples...)

ZstdError

if the training fails

Source code in src/lhcbdirac_log/zstd/processors/trainer.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def train(self, dict_provider: DictProvider, *data: DataEntry, dict_size: int = 0) -> bytes:
    """Train a dict with the sample data.

    The newly trained dictionary is added to the dictionary provider,
    and is available through the `dict` property.

    Args:
        dict_provider: the destination dictionary provider
        *data: the data to use for training
        dict_size: the final size of the dictionary to train (0: auto)

    Returns:
        the trained dictionary data

    Raises:
        ValueError: if the dataset is invalid (too small or empty dataset, or only too big samples...)
        ZstdError: if the training fails
    """
    fdata = [i for i in data if i.size > 0]  # filter out empty data

    if len(fdata) <= 1:
        msg = f"Not enough data to train a dictionary, at least 2 non-empty samples are required, got {len(fdata)}"
        raise ValueError(msg)

    fdata.sort(key=lambda x: x.size)  # sort by size (smallest first)

    samples = []  # dataset
    size = 0

    # load samples until max sample limit or max size limit is reached
    for i in range(min(len(fdata), self._config.train_max_sample)):
        dt = fdata[i]
        size += dt.size

        if size > self._config.train_max_size:
            size -= dt.size
            break

        with dt.reader() as r:
            samples.append(bytes(r.read()))

    d = self._train(samples, dict_size=dict_size, size=size)
    self._dict = dict_provider.add(fdata[0].dict_name, dt := d.as_bytes(), d.dict_id())
    return dt