Skip to content

Threaded API

This package provides zstd parallel processing capability.

Classes:

Name Description
- CompressorPool

a thread pool managing data compressors.

- DecompressorPool

a thread pool managing data decompressors.

- ThreadPool

an abstract thread pool managing workers and their tasks.

- ProcessorPool

a pool of zstd processors, managing their assignments.

- TaskQueue

a proxy queue managing tasks and their dispatching to training queues, if necessary.

- Task

a task to process data.

- TrainingTask

a task to train a dictionary.

- ProcessingTask

a task to process data.

- Worker

a worker processing tasks.

CompressorPool

Bases: ThreadPool[ZstandardCompressor]

Thread pool implementation for data compression.

Source code in src/lhcbdirac_log/zstd/pool/pool.py
226
227
228
229
230
231
232
233
class CompressorPool(
    ThreadPool[ZstandardCompressor],
    processor=ZstandardCompressor,
    training=True,
):
    """Thread pool implementation for data compression."""

    __slots__ = ()

DecompressorPool

Bases: ThreadPool[ZstandardDecompressor]

Thread pool implementation for data decompression.

Source code in src/lhcbdirac_log/zstd/pool/pool.py
236
237
238
239
240
241
242
243
class DecompressorPool(
    ThreadPool[ZstandardDecompressor],
    processor=ZstandardDecompressor,
    training=False,
):
    """Thread pool implementation for data decompression."""

    __slots__ = ()

ProcessingTask

Bases: Task[ProcessorPool]

[Internal] Class for processing tasks.

Such tasks are processing data from an input entry to an output entry, using a processor from the processor pool, and a dictionary from the dictionary provider.

Source code in src/lhcbdirac_log/zstd/pool/task.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class ProcessingTask(Task[ProcessorPool]):
    """[Internal] Class for processing tasks.

    Such tasks are processing data from an input entry to an output entry,
    using a processor from the processor pool, and a dictionary from the dictionary provider.
    """

    __slots__ = (
        "_dict_provider",
        "_data_in",
        "_data_out",
        "_config",
    )

    @property
    def dict_name(self) -> str:
        """Get the dictionary name from the entries.

        Returns:
            the dictionary name
        """
        return self._data_in.dict_name

    @property
    def data_in(self) -> DataEntry:
        """Get the input data entry.

        Returns:
            the input data entry
        """
        return self._data_in

    @property
    def data_out(self) -> DataEntry:
        """Get the output data entry.

        Returns:
            the output data entry
        """
        return self._data_out

    def __init__(self, procpool: ProcessorPool, data_in: DataEntry, data_out: DataEntry, dict_provider: DictProvider, config: Config) -> None:
        """Initialize the task.

        Args:
            procpool: the processor pool
            data_in: the input data entry
            data_out: the output data entry
            dict_provider: the dictionary provider to get dictionaries from
            config: the configuration to use
        """
        super().__init__(procpool)
        self._dict_provider = dict_provider
        self._data_in = data_in
        self._data_out = data_out
        self._config = config

    @override
    def run(self) -> None:
        dict_entry = self._dict_provider.get(self._data_in.dict_name, invalid_ok=True, missing_ok=True)

        dict_name = None if dict_entry is None else dict_entry.dict_name

        if (processor := self._procpool[dict_name]) is None:  # Get a free processor with the required dictionary
            if (processor := self._procpool.get_any()) is None:  # Get a free processor with any dictionary
                msg = "No free processor available"
                raise RuntimeError(msg)  # No free, should never happen

            processor.dict = dict_entry  # assign the dictionary ("slow" operation)

        try:
            if (sz := self._data_in.size) >= self._config.max_data_size:
                msg = f"Data too large: {sz} >= {self._config.max_data_size}"
                raise ValueError(msg)

            with self._data_out.writer() as writer:
                if sz > 0:  # empty data is not processed -> out is empty too
                    with self._data_in.reader() as reader:
                        processor.process_stream(reader, writer)
        finally:
            self._procpool.release(processor)

data_in: DataEntry property

Get the input data entry.

Returns:

Type Description
DataEntry

the input data entry

data_out: DataEntry property

Get the output data entry.

Returns:

Type Description
DataEntry

the output data entry

dict_name: str property

Get the dictionary name from the entries.

Returns:

Type Description
str

the dictionary name

__init__(procpool, data_in, data_out, dict_provider, config)

Initialize the task.

Parameters:

Name Type Description Default
procpool ProcessorPool

the processor pool

required
data_in DataEntry

the input data entry

required
data_out DataEntry

the output data entry

required
dict_provider DictProvider

the dictionary provider to get dictionaries from

required
config Config

the configuration to use

required
Source code in src/lhcbdirac_log/zstd/pool/task.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(self, procpool: ProcessorPool, data_in: DataEntry, data_out: DataEntry, dict_provider: DictProvider, config: Config) -> None:
    """Initialize the task.

    Args:
        procpool: the processor pool
        data_in: the input data entry
        data_out: the output data entry
        dict_provider: the dictionary provider to get dictionaries from
        config: the configuration to use
    """
    super().__init__(procpool)
    self._dict_provider = dict_provider
    self._data_in = data_in
    self._data_out = data_out
    self._config = config

ProcessorPool

[Internal] A pool of data processors, managing their assignments.

Used by tasks to get a free processor, and to release it when done.

Source code in src/lhcbdirac_log/zstd/pool/processor.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ProcessorPool[P: ZstandardProcessor]:
    """[Internal] A pool of data processors, managing their assignments.

    Used by tasks to get a free processor, and to release it when done.
    """

    __slots__ = (
        "_factory",
        "_free",
        "_pool",
        "_processors",
        "_workers",
        "_lock",
    )

    def __init__(
        self,
        processor_factory: Callable[[], P],
        workers: int = os.cpu_count() or 2,
        processors: int | None = -3,
    ) -> None:
        """Initialize the pool.

        Args:
            processor_factory: a callable returning a new processor instance (may be a class)
            workers: the fixed number of workers (default: os.cpu_count() or 2)
            processors: the fixed number of processors, gave as positive (workers <= processors),
                or the variable number of processors, gave as negative (1 <= -processors <= workers),
                where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

        Raises:
            ValueError: if processors is out of range
        """
        super().__init__()

        if processors is None:
            processors = -workers

        if -1 < processors < workers or processors < -workers:
            msg = f"processors is out of range, got {processors} (workers: {workers})"
            raise ValueError(msg)

        self._factory = processor_factory
        self._workers = workers
        self._processors = processors
        self._pool: set[P] = {processor_factory() for _ in range(processors)}
        self._free: dict[str | None, set[P]] = {None: self._pool.copy()}  # unused processors
        self._lock = RLock()

    def release(self, processor: P) -> None:
        """Release a processor, marking it as available for other workers.

        Args:
            processor: the processor to release

        Notes:
            - no check is done to ensure the processor is not in use
            - releasing an already released processor has not effect
        """
        with self._lock:
            self._free.setdefault(processor.dict_name, set()).add(processor)

    def __getitem__(self, dict_name: str | None) -> P | None:
        """Get a free processor for the given dictionary name, or None if not available.

        Args:
            dict_name: the processor dictionary name, or None for non-assigned processors

        Returns:
            a free processor, or None if not available

        Notes:
            - the pool will create new processors if needed, depending on dict_name and if the pool is variable
        """
        with self._lock:
            procs = self._free.setdefault(dict_name, set())

            # check if the pool is variable (<0) and needs to be updated (hint)
            if not procs and self._processors < 0:
                for _ in range(len(self._pool), max(self._workers, -self._processors * len(self._free))):  # len: loaded + None
                    self._pool.add(processor := self._factory())
                    self._free[None].add(processor)

            return procs.pop() if procs else None

    def get_any(self) -> P | None:
        """Get any free processor.

        Returns:
            a free processor, or None if none available

        Notes:
            - tries to return a non-assigned processor first
        """
        with self._lock:
            if procs := self._free[None]:  # opti: try with None first
                return procs.pop()

            for procs in self._free.values():
                if procs:
                    return procs.pop()

            return None

__getitem__(dict_name)

Get a free processor for the given dictionary name, or None if not available.

Parameters:

Name Type Description Default
dict_name str | None

the processor dictionary name, or None for non-assigned processors

required

Returns:

Type Description
P | None

a free processor, or None if not available

Notes
  • the pool will create new processors if needed, depending on dict_name and if the pool is variable
Source code in src/lhcbdirac_log/zstd/pool/processor.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __getitem__(self, dict_name: str | None) -> P | None:
    """Get a free processor for the given dictionary name, or None if not available.

    Args:
        dict_name: the processor dictionary name, or None for non-assigned processors

    Returns:
        a free processor, or None if not available

    Notes:
        - the pool will create new processors if needed, depending on dict_name and if the pool is variable
    """
    with self._lock:
        procs = self._free.setdefault(dict_name, set())

        # check if the pool is variable (<0) and needs to be updated (hint)
        if not procs and self._processors < 0:
            for _ in range(len(self._pool), max(self._workers, -self._processors * len(self._free))):  # len: loaded + None
                self._pool.add(processor := self._factory())
                self._free[None].add(processor)

        return procs.pop() if procs else None

__init__(processor_factory, workers=os.cpu_count() or 2, processors=-3)

Initialize the pool.

Parameters:

Name Type Description Default
processor_factory Callable[[], P]

a callable returning a new processor instance (may be a class)

required
workers int

the fixed number of workers (default: os.cpu_count() or 2)

cpu_count() or 2
processors int | None

the fixed number of processors, gave as positive (workers <= processors), or the variable number of processors, gave as negative (1 <= -processors <= workers), where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

-3

Raises:

Type Description
ValueError

if processors is out of range

Source code in src/lhcbdirac_log/zstd/pool/processor.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    processor_factory: Callable[[], P],
    workers: int = os.cpu_count() or 2,
    processors: int | None = -3,
) -> None:
    """Initialize the pool.

    Args:
        processor_factory: a callable returning a new processor instance (may be a class)
        workers: the fixed number of workers (default: os.cpu_count() or 2)
        processors: the fixed number of processors, gave as positive (workers <= processors),
            or the variable number of processors, gave as negative (1 <= -processors <= workers),
            where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

    Raises:
        ValueError: if processors is out of range
    """
    super().__init__()

    if processors is None:
        processors = -workers

    if -1 < processors < workers or processors < -workers:
        msg = f"processors is out of range, got {processors} (workers: {workers})"
        raise ValueError(msg)

    self._factory = processor_factory
    self._workers = workers
    self._processors = processors
    self._pool: set[P] = {processor_factory() for _ in range(processors)}
    self._free: dict[str | None, set[P]] = {None: self._pool.copy()}  # unused processors
    self._lock = RLock()

get_any()

Get any free processor.

Returns:

Type Description
P | None

a free processor, or None if none available

Notes
  • tries to return a non-assigned processor first
Source code in src/lhcbdirac_log/zstd/pool/processor.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def get_any(self) -> P | None:
    """Get any free processor.

    Returns:
        a free processor, or None if none available

    Notes:
        - tries to return a non-assigned processor first
    """
    with self._lock:
        if procs := self._free[None]:  # opti: try with None first
            return procs.pop()

        for procs in self._free.values():
            if procs:
                return procs.pop()

        return None

release(processor)

Release a processor, marking it as available for other workers.

Parameters:

Name Type Description Default
processor P

the processor to release

required
Notes
  • no check is done to ensure the processor is not in use
  • releasing an already released processor has not effect
Source code in src/lhcbdirac_log/zstd/pool/processor.py
65
66
67
68
69
70
71
72
73
74
75
76
def release(self, processor: P) -> None:
    """Release a processor, marking it as available for other workers.

    Args:
        processor: the processor to release

    Notes:
        - no check is done to ensure the processor is not in use
        - releasing an already released processor has not effect
    """
    with self._lock:
        self._free.setdefault(processor.dict_name, set()).add(processor)

Task

Bases: ABC

[Internal] Abstract base class for tasks to be executed by a worker.

Task execution is done using a processor, obtained when the task is running.

Source code in src/lhcbdirac_log/zstd/pool/task.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Task[P](ABC):
    """[Internal] Abstract base class for tasks to be executed by a worker.

    Task execution is done using a processor, obtained when the task is running.
    """

    __slots__ = ("_procpool",)

    def __init__(self, procpool: P) -> None:
        """Initialize the task.

        Args:
            procpool: the processor pool, used to get a processor
        """
        self._procpool = procpool

    @abstractmethod
    def run(self) -> None:
        """Run the task.

        Notes:
            - the task should release the processor when done
            - the task may not be run again
            - may raise exceptions
        """

__init__(procpool)

Initialize the task.

Parameters:

Name Type Description Default
procpool P

the processor pool, used to get a processor

required
Source code in src/lhcbdirac_log/zstd/pool/task.py
38
39
40
41
42
43
44
def __init__(self, procpool: P) -> None:
    """Initialize the task.

    Args:
        procpool: the processor pool, used to get a processor
    """
    self._procpool = procpool

run() abstractmethod

Run the task.

Notes
  • the task should release the processor when done
  • the task may not be run again
  • may raise exceptions
Source code in src/lhcbdirac_log/zstd/pool/task.py
46
47
48
49
50
51
52
53
54
@abstractmethod
def run(self) -> None:
    """Run the task.

    Notes:
        - the task should release the processor when done
        - the task may not be run again
        - may raise exceptions
    """

TaskQueue

Bases: Queue[Task | None]

[Internal] A proxy queue managing tasks and their dispatching to training queues, if necessary.

Just putting tasks and it will automatically put training tasks, if necessary, when enough tasks are available as a training sample. Then, the trainer will requeue the tasks to the real task queue to be finally processed by the workers.

Source code in src/lhcbdirac_log/zstd/pool/queue.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class TaskQueue(Queue[Task | None]):
    """[Internal] A proxy queue managing tasks and their dispatching to training queues, if necessary.

    Just putting tasks and it will automatically put training tasks, if necessary, when enough tasks
    are available as a training sample. Then, the trainer will requeue the tasks to the real task queue
    to be finally processed by the workers.
    """

    __slots__ = (
        "_lock",
        "_dict_provider",
        "_available",
        "_pending_tasks",
        "_trainpool",
        "_train_queue_size",
        "_pushed",
        "_rejected",
        "_training",
        "_config",
    )

    def __init__(
        self,
        workers: int,
        dict_provider: DictProvider,
        config: Config = DEFAULT_CONFIG,
    ) -> None:
        """Initialize the task queue.

        Args:
            workers: the number of workers available or 0 to disable the training system
            dict_provider: the dict provider
            config: the configuration to use for training (default: DEFAULT_CONFIG)
        """
        super().__init__()

        self._trainpool = Queue[ZstandardTrainer]()
        self._pending_tasks: dict[str, list[ProcessingTask]] = {}
        self._dict_provider = dict_provider
        self._train_queue_size = config.train_queue_size
        self._pushed = set[str]()  # names of dict queues pushed for training
        self._available = set[str]()  # names of trained dicts (including failure)
        self._lock = RLock()
        self._rejected = list[tuple[Task, Exception]]()  # rejected tasks
        self._training = workers > 0
        self._config = config

        for _ in range(workers):
            self._trainpool.put(ZstandardTrainer(config))

    @property
    def rejected(self) -> list[tuple[Task, Exception]]:
        """Get the rejected tasks.

        Rejected tasks are tasks that failed to be processed.

        Returns:
            the rejected tasks
        """
        return self._rejected

    def _mark_pushed(self, dict_name: str) -> None:
        """[Internal] Mark a dict as pushed for training.

        Args:
            dict_name: the dict name
        """
        self._pushed.add(dict_name)

    def _mark_available(self, dict_name: str) -> None:
        """[Internal] Mark a dict as available (trained or even invalid).

        Args:
            dict_name: the dict name
        """
        self._available.add(dict_name)

    def reject(self, task: Task, exc: Exception) -> None:
        """Mark a task as rejected.

        Args:
            task: the rejected task
            exc: the exception raised
        """
        self._rejected.append((task, exc))

    def _requeue(self, name: str) -> None:
        """[Internal] Requeue ready processing tasks to the main queue, from a finished training task.

        Args:
            name: the training task name

        Raises:
            KeyError: if there is no pending task with the given name (should not happen)
        """
        with self._lock:
            tasks = self._pending_tasks.pop(name)

            self._mark_available(name)

            while tasks:
                super().put(tasks.pop())

    @override
    def put(self, item: Task | None, block: bool = True, timeout: float | None = None) -> None:
        """Put a task in the task queue.

        Args:
            item: the task
            block: if True, blocks until the task is added (default: True)
            timeout: the timeout in seconds (default: None)

        Notes:
            - if the task is a processing task, and the training system is enabled,
              it will be added to the training queue if necessary dict is not available
        """
        if self._training and isinstance(item, ProcessingTask):
            with self._lock:
                dict_name = item.dict_name

                if dict_name not in self._available:
                    try:  # Check if the dict exists
                        self._dict_provider.get(dict_name, invalid_ok=True, missing_ok=False)
                        self._mark_available(dict_name)

                    except DictNotExistsError:  # Dict not found: add the task to the trainer
                        tasks = self._pending_tasks.setdefault(dict_name, [])
                        tasks.append(item)

                        if len(tasks) == self._train_queue_size:  # == to not restart a running training
                            self._mark_pushed(dict_name)
                            d = TrainingTask(self._trainpool, self._requeue, self._dict_provider, dict_name, tasks)
                            super().put(d, block, timeout)
                        return

                super().put(item, block, timeout)
        else:
            super().put(item, block, timeout)

    def flush(self) -> None:
        """Flush all remaining incomplete training queues to training tasks.

        Must be called after all tasks are added to the queue.

        Notes:
            - do nothing if the training system is disabled
        """
        with self._lock:
            for name in set(self._pending_tasks) - self._pushed:
                self._mark_pushed(name)
                super().put(TrainingTask(self._trainpool, self._requeue, self._dict_provider, name, self._pending_tasks[name]))

rejected: list[tuple[Task, Exception]] property

Get the rejected tasks.

Rejected tasks are tasks that failed to be processed.

Returns:

Type Description
list[tuple[Task, Exception]]

the rejected tasks

__init__(workers, dict_provider, config=DEFAULT_CONFIG)

Initialize the task queue.

Parameters:

Name Type Description Default
workers int

the number of workers available or 0 to disable the training system

required
dict_provider DictProvider

the dict provider

required
config Config

the configuration to use for training (default: DEFAULT_CONFIG)

DEFAULT_CONFIG
Source code in src/lhcbdirac_log/zstd/pool/queue.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
    self,
    workers: int,
    dict_provider: DictProvider,
    config: Config = DEFAULT_CONFIG,
) -> None:
    """Initialize the task queue.

    Args:
        workers: the number of workers available or 0 to disable the training system
        dict_provider: the dict provider
        config: the configuration to use for training (default: DEFAULT_CONFIG)
    """
    super().__init__()

    self._trainpool = Queue[ZstandardTrainer]()
    self._pending_tasks: dict[str, list[ProcessingTask]] = {}
    self._dict_provider = dict_provider
    self._train_queue_size = config.train_queue_size
    self._pushed = set[str]()  # names of dict queues pushed for training
    self._available = set[str]()  # names of trained dicts (including failure)
    self._lock = RLock()
    self._rejected = list[tuple[Task, Exception]]()  # rejected tasks
    self._training = workers > 0
    self._config = config

    for _ in range(workers):
        self._trainpool.put(ZstandardTrainer(config))

flush()

Flush all remaining incomplete training queues to training tasks.

Must be called after all tasks are added to the queue.

Notes
  • do nothing if the training system is disabled
Source code in src/lhcbdirac_log/zstd/pool/queue.py
162
163
164
165
166
167
168
169
170
171
172
173
def flush(self) -> None:
    """Flush all remaining incomplete training queues to training tasks.

    Must be called after all tasks are added to the queue.

    Notes:
        - do nothing if the training system is disabled
    """
    with self._lock:
        for name in set(self._pending_tasks) - self._pushed:
            self._mark_pushed(name)
            super().put(TrainingTask(self._trainpool, self._requeue, self._dict_provider, name, self._pending_tasks[name]))

put(item, block=True, timeout=None)

Put a task in the task queue.

Parameters:

Name Type Description Default
item Task | None

the task

required
block bool

if True, blocks until the task is added (default: True)

True
timeout float | None

the timeout in seconds (default: None)

None
Notes
  • if the task is a processing task, and the training system is enabled, it will be added to the training queue if necessary dict is not available
Source code in src/lhcbdirac_log/zstd/pool/queue.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@override
def put(self, item: Task | None, block: bool = True, timeout: float | None = None) -> None:
    """Put a task in the task queue.

    Args:
        item: the task
        block: if True, blocks until the task is added (default: True)
        timeout: the timeout in seconds (default: None)

    Notes:
        - if the task is a processing task, and the training system is enabled,
          it will be added to the training queue if necessary dict is not available
    """
    if self._training and isinstance(item, ProcessingTask):
        with self._lock:
            dict_name = item.dict_name

            if dict_name not in self._available:
                try:  # Check if the dict exists
                    self._dict_provider.get(dict_name, invalid_ok=True, missing_ok=False)
                    self._mark_available(dict_name)

                except DictNotExistsError:  # Dict not found: add the task to the trainer
                    tasks = self._pending_tasks.setdefault(dict_name, [])
                    tasks.append(item)

                    if len(tasks) == self._train_queue_size:  # == to not restart a running training
                        self._mark_pushed(dict_name)
                        d = TrainingTask(self._trainpool, self._requeue, self._dict_provider, dict_name, tasks)
                        super().put(d, block, timeout)
                    return

            super().put(item, block, timeout)
    else:
        super().put(item, block, timeout)

reject(task, exc)

Mark a task as rejected.

Parameters:

Name Type Description Default
task Task

the rejected task

required
exc Exception

the exception raised

required
Source code in src/lhcbdirac_log/zstd/pool/queue.py
100
101
102
103
104
105
106
107
def reject(self, task: Task, exc: Exception) -> None:
    """Mark a task as rejected.

    Args:
        task: the rejected task
        exc: the exception raised
    """
    self._rejected.append((task, exc))

ThreadPool

Bases: ABC

A thread pool managing workers and the task queue.

Notes
  • Must be subclassed with defined processor type
Source code in src/lhcbdirac_log/zstd/pool/pool.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class ThreadPool[P: ZstandardProcessor](ABC):
    """A thread pool managing workers and the task queue.

    Notes:
        - Must be subclassed with defined processor type
    """

    PROCESSOR_TYPE: ClassVar[type[ZstandardProcessor]]
    TRAINING_ENABLED: ClassVar[bool]

    __slots__ = (
        "_stopping",
        "_queue",
        "_dict_provider",
        "_processor_type",
        "_procpool",
        "_workpool",
        "_config",
    )

    @override
    def __init_subclass__(cls, *, processor: type[P] | None = None, training: bool | None = None) -> None:
        """Initialize the subclass.

        Args:
            processor: the processor type to use
            training: indicate if training is enabled or not

        Raises:
            TypeError: if any is None or if processor is not a ZstandardProcessor subclass
        """
        if processor is None or not issubclass(processor, ZstandardProcessor):
            msg = f"'processor' subclass argument is not or badly defined: {None if processor is None else processor.__qualname__} is not a ZstandardProcessor subclass"
            raise TypeError(msg)

        if training is None:
            msg = f"'training' subclass argument is not defined for {cls.__qualname__}"
            raise TypeError(msg)

        cls.PROCESSOR_TYPE = processor
        cls.TRAINING_ENABLED = training

    def __init__(
        self,
        dict_provider: DictProvider,
        config: Config = DEFAULT_CONFIG,
        workers: int = os.cpu_count() or 2,
        processors: int | None = None,
    ) -> None:
        """Initialize the pool.

        Args:
            dict_provider: the dictionary provider to use
            config: the configuration to use (default: DEFAULT_CONFIG)
            workers: the fixed number of workers (default: os.cpu_count() or 2)
            processors: the fixed number of processors, gave as positive (workers <= processors),
                or the variable number of processors, gave as negative (1 <= -processors <= workers,
                where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

        Notes:
            The number of processors and workers can be controlled independently, because of the following reasons:
            - a worker can process one task at a time, and a task can be processed by one worker only at a time
            - a task use a processor to process data, and a processor can process data for one task only at a time
            - processor use a zstd dictionary for processing data, but changing the dictionary is expensive
              so it is better to keep the same dictionary for a while, and so to have many processors with different dictionaries
        """
        super().__init__()

        self._config = config
        self._queue = TaskQueue(workers * self.TRAINING_ENABLED, dict_provider, config)
        self._dict_provider = dict_provider

        self._processor_type: type[P] = self.PROCESSOR_TYPE  # type: ignore  # mypy failure

        self._workpool = tuple(Worker(self._queue) for _ in range(workers))
        self._procpool = ProcessorPool[P](lambda: self._processor_type(config), workers, processors)
        self._stopping = False

    @property
    def running(self) -> bool:
        """Check if the pool is running.

        Returns:
            True if the pool is running, False otherwise
        """
        return any(w.running for w in self._workpool)

    @property
    def stopping(self) -> bool:
        """Check if the pool is stopping.

        Returns:
            True if the pool is stopping, False otherwise
        """
        return self._stopping

    @property
    def stopped(self) -> bool:
        """Check if the pool is stopped.

        Returns:
            True if the pool is stopped, False otherwise
        """
        return not self.running and self._stopping

    @property
    def rejected(self) -> list[tuple[Task, Exception]]:
        """Get the rejected tasks.

        Returns:
            the rejected tasks
        """
        return self._queue.rejected

    def add(self, data_in: DataEntry, data_out: DataEntry) -> None:
        """Add a task to the queue.

        Args:
            data_in: the input data accessor
            data_out: the output data accessor

        Raises:
            RuntimeError: if the pool is stopping

        Notes:
            - the pool will start if not running
        """
        if not self.running:
            self.start()

        if self._stopping:
            msg = f"{self.__class__.__qualname__} is stopping or stopped"
            raise RuntimeError(msg)

        self._queue.put(ProcessingTask(self._procpool, data_in, data_out, self._dict_provider, self._config))

    def start(self) -> None:
        """Start the pool.

        Notes:
            - do nothing if the pool is already running
        """
        if not self.running and not self._stopping:
            for w in self._workpool:
                w.start()

    def wait(self) -> None:
        """Wait until the task queue is empty."""
        if self.running:
            self._queue.join()

    def flush(self) -> None:
        """Flush all remaining incomplete training queues to training tasks.

        Must be called after all tasks are added to the queue.
        """
        if self.running:
            self._queue.flush()

    def stop(self) -> None:
        """Request the pool to stop.

        Notes:
            - this call wait() first, blocking
            - the pool will effectively stop after all workers are done
            - workers will effectively stop after all tasks are done (wait())
            - to stop the pool instantly, clear the queue first
        """
        self.wait()
        if not self._stopping and self.running:
            for _ in self._workpool:
                self._queue.put(None)  # one None for each worker

            self._stopping = True

    def join(self, timeout: float | None = None) -> None:
        """Join the pool.

        Args:
            timeout: the timeout in seconds, used to join the pool (default: None)
        """
        t = time()

        for w in self._workpool:
            w.join(timeout)

            if timeout is not None:
                timeout += t - (s := time())
                t = s
                if timeout <= 0:
                    break

rejected: list[tuple[Task, Exception]] property

Get the rejected tasks.

Returns:

Type Description
list[tuple[Task, Exception]]

the rejected tasks

running: bool property

Check if the pool is running.

Returns:

Type Description
bool

True if the pool is running, False otherwise

stopped: bool property

Check if the pool is stopped.

Returns:

Type Description
bool

True if the pool is stopped, False otherwise

stopping: bool property

Check if the pool is stopping.

Returns:

Type Description
bool

True if the pool is stopping, False otherwise

__init__(dict_provider, config=DEFAULT_CONFIG, workers=os.cpu_count() or 2, processors=None)

Initialize the pool.

Parameters:

Name Type Description Default
dict_provider DictProvider

the dictionary provider to use

required
config Config

the configuration to use (default: DEFAULT_CONFIG)

DEFAULT_CONFIG
workers int

the fixed number of workers (default: os.cpu_count() or 2)

cpu_count() or 2
processors int | None

the fixed number of processors, gave as positive (workers <= processors), or the variable number of processors, gave as negative (1 <= -processors <= workers, where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

None
Notes

The number of processors and workers can be controlled independently, because of the following reasons: - a worker can process one task at a time, and a task can be processed by one worker only at a time - a task use a processor to process data, and a processor can process data for one task only at a time - processor use a zstd dictionary for processing data, but changing the dictionary is expensive so it is better to keep the same dictionary for a while, and so to have many processors with different dictionaries

Source code in src/lhcbdirac_log/zstd/pool/pool.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(
    self,
    dict_provider: DictProvider,
    config: Config = DEFAULT_CONFIG,
    workers: int = os.cpu_count() or 2,
    processors: int | None = None,
) -> None:
    """Initialize the pool.

    Args:
        dict_provider: the dictionary provider to use
        config: the configuration to use (default: DEFAULT_CONFIG)
        workers: the fixed number of workers (default: os.cpu_count() or 2)
        processors: the fixed number of processors, gave as positive (workers <= processors),
            or the variable number of processors, gave as negative (1 <= -processors <= workers,
            where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum)

    Notes:
        The number of processors and workers can be controlled independently, because of the following reasons:
        - a worker can process one task at a time, and a task can be processed by one worker only at a time
        - a task use a processor to process data, and a processor can process data for one task only at a time
        - processor use a zstd dictionary for processing data, but changing the dictionary is expensive
          so it is better to keep the same dictionary for a while, and so to have many processors with different dictionaries
    """
    super().__init__()

    self._config = config
    self._queue = TaskQueue(workers * self.TRAINING_ENABLED, dict_provider, config)
    self._dict_provider = dict_provider

    self._processor_type: type[P] = self.PROCESSOR_TYPE  # type: ignore  # mypy failure

    self._workpool = tuple(Worker(self._queue) for _ in range(workers))
    self._procpool = ProcessorPool[P](lambda: self._processor_type(config), workers, processors)
    self._stopping = False

__init_subclass__(*, processor=None, training=None)

Initialize the subclass.

Parameters:

Name Type Description Default
processor type[P] | None

the processor type to use

None
training bool | None

indicate if training is enabled or not

None

Raises:

Type Description
TypeError

if any is None or if processor is not a ZstandardProcessor subclass

Source code in src/lhcbdirac_log/zstd/pool/pool.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@override
def __init_subclass__(cls, *, processor: type[P] | None = None, training: bool | None = None) -> None:
    """Initialize the subclass.

    Args:
        processor: the processor type to use
        training: indicate if training is enabled or not

    Raises:
        TypeError: if any is None or if processor is not a ZstandardProcessor subclass
    """
    if processor is None or not issubclass(processor, ZstandardProcessor):
        msg = f"'processor' subclass argument is not or badly defined: {None if processor is None else processor.__qualname__} is not a ZstandardProcessor subclass"
        raise TypeError(msg)

    if training is None:
        msg = f"'training' subclass argument is not defined for {cls.__qualname__}"
        raise TypeError(msg)

    cls.PROCESSOR_TYPE = processor
    cls.TRAINING_ENABLED = training

add(data_in, data_out)

Add a task to the queue.

Parameters:

Name Type Description Default
data_in DataEntry

the input data accessor

required
data_out DataEntry

the output data accessor

required

Raises:

Type Description
RuntimeError

if the pool is stopping

Notes
  • the pool will start if not running
Source code in src/lhcbdirac_log/zstd/pool/pool.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def add(self, data_in: DataEntry, data_out: DataEntry) -> None:
    """Add a task to the queue.

    Args:
        data_in: the input data accessor
        data_out: the output data accessor

    Raises:
        RuntimeError: if the pool is stopping

    Notes:
        - the pool will start if not running
    """
    if not self.running:
        self.start()

    if self._stopping:
        msg = f"{self.__class__.__qualname__} is stopping or stopped"
        raise RuntimeError(msg)

    self._queue.put(ProcessingTask(self._procpool, data_in, data_out, self._dict_provider, self._config))

flush()

Flush all remaining incomplete training queues to training tasks.

Must be called after all tasks are added to the queue.

Source code in src/lhcbdirac_log/zstd/pool/pool.py
184
185
186
187
188
189
190
def flush(self) -> None:
    """Flush all remaining incomplete training queues to training tasks.

    Must be called after all tasks are added to the queue.
    """
    if self.running:
        self._queue.flush()

join(timeout=None)

Join the pool.

Parameters:

Name Type Description Default
timeout float | None

the timeout in seconds, used to join the pool (default: None)

None
Source code in src/lhcbdirac_log/zstd/pool/pool.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def join(self, timeout: float | None = None) -> None:
    """Join the pool.

    Args:
        timeout: the timeout in seconds, used to join the pool (default: None)
    """
    t = time()

    for w in self._workpool:
        w.join(timeout)

        if timeout is not None:
            timeout += t - (s := time())
            t = s
            if timeout <= 0:
                break

start()

Start the pool.

Notes
  • do nothing if the pool is already running
Source code in src/lhcbdirac_log/zstd/pool/pool.py
169
170
171
172
173
174
175
176
177
def start(self) -> None:
    """Start the pool.

    Notes:
        - do nothing if the pool is already running
    """
    if not self.running and not self._stopping:
        for w in self._workpool:
            w.start()

stop()

Request the pool to stop.

Notes
  • this call wait() first, blocking
  • the pool will effectively stop after all workers are done
  • workers will effectively stop after all tasks are done (wait())
  • to stop the pool instantly, clear the queue first
Source code in src/lhcbdirac_log/zstd/pool/pool.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def stop(self) -> None:
    """Request the pool to stop.

    Notes:
        - this call wait() first, blocking
        - the pool will effectively stop after all workers are done
        - workers will effectively stop after all tasks are done (wait())
        - to stop the pool instantly, clear the queue first
    """
    self.wait()
    if not self._stopping and self.running:
        for _ in self._workpool:
            self._queue.put(None)  # one None for each worker

        self._stopping = True

wait()

Wait until the task queue is empty.

Source code in src/lhcbdirac_log/zstd/pool/pool.py
179
180
181
182
def wait(self) -> None:
    """Wait until the task queue is empty."""
    if self.running:
        self._queue.join()

TrainingTask

Bases: Task[Queue[ZstandardTrainer]]

[Internal] Class for training tasks.

Such tasks are training a dictionary from a list of processing tasks. When done, these are reinjected in the task queue.

Source code in src/lhcbdirac_log/zstd/pool/task.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class TrainingTask(Task[Queue[ZstandardTrainer]]):
    """[Internal] Class for training tasks.

    Such tasks are training a dictionary from a list of processing tasks.
    When done, these are reinjected in the task queue.
    """

    __slots__ = (
        "_tasks",
        "_name",
        "_callback",
        "_dict_provider",
    )

    def __init__(
        self, trainpool: Queue[ZstandardTrainer], callback: Callable[[str], None], dict_provider: DictProvider, name: str, tasks: list[ProcessingTask]
    ) -> None:
        """Initialize the task.

        Args:
            trainpool: the training pool, to get a trainer
            callback: the callback to call when training is done, called with the dictionary name
            dict_provider: the dictionary provider to save the trained dictionary to
            name: the name of the dictionary to train
            tasks: the tasks to take samples from
        """
        super().__init__(trainpool)
        self._name = name
        self._tasks = tasks
        self._callback = callback
        self._dict_provider = dict_provider

    @override
    def run(self) -> None:
        trainer = self._procpool.get()

        try:
            trainer.train(self._dict_provider, *(i.data_in for i in self._tasks))
        except (ValueError, ZstdError):
            self._dict_provider.mark_invalid(self._name)
        finally:
            trainer.dict = None
            self._procpool.put(trainer)
            self._callback(self._name)

__init__(trainpool, callback, dict_provider, name, tasks)

Initialize the task.

Parameters:

Name Type Description Default
trainpool Queue[ZstandardTrainer]

the training pool, to get a trainer

required
callback Callable[[str], None]

the callback to call when training is done, called with the dictionary name

required
dict_provider DictProvider

the dictionary provider to save the trained dictionary to

required
name str

the name of the dictionary to train

required
tasks list[ProcessingTask]

the tasks to take samples from

required
Source code in src/lhcbdirac_log/zstd/pool/task.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def __init__(
    self, trainpool: Queue[ZstandardTrainer], callback: Callable[[str], None], dict_provider: DictProvider, name: str, tasks: list[ProcessingTask]
) -> None:
    """Initialize the task.

    Args:
        trainpool: the training pool, to get a trainer
        callback: the callback to call when training is done, called with the dictionary name
        dict_provider: the dictionary provider to save the trained dictionary to
        name: the name of the dictionary to train
        tasks: the tasks to take samples from
    """
    super().__init__(trainpool)
    self._name = name
    self._tasks = tasks
    self._callback = callback
    self._dict_provider = dict_provider

Worker

Bases: Thread

[Internal] A thread worker class that processes tasks from the specified task queue.

Notes
  • the worker will stop if None is fetched from the queue
  • the worker signals the queue that tasks are done
  • stopped workers cannot be restarted (python threads limitation)
Source code in src/lhcbdirac_log/zstd/pool/worker.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Worker(Thread):
    """[Internal] A thread worker class that processes tasks from the specified task queue.

    Notes:
        - the worker will stop if None is fetched from the queue
        - the worker signals the queue that tasks are done
        - stopped workers cannot be restarted (python threads limitation)
    """

    _COUNT = 0

    __slots__ = ("_queue",)

    def __init__(self, queue: TaskQueue) -> None:
        """Initialize the worker.

        Args:
            queue: the task queue to get tasks from
        """
        self._inc_count()
        super().__init__(name=f"{self.__class__.__qualname__}-{self._COUNT}")
        self._queue = queue

    @property
    def running(self) -> bool:
        """Check if the worker is running.

        Returns:
            True if the worker is running, False otherwise
        """
        return self.is_alive()

    @override
    def run(self) -> None:
        """The worker's main loop.

        Notes:
            - the worker will stop if None is fetched from the queue
            - the worker signals the queue that tasks are done
        """
        while (task := self._queue.get()) is not None:
            try:
                task.run()
            except Exception as e:
                self._queue.reject(task, e)
            finally:
                self._queue.task_done()

        self._queue.task_done()

    @classmethod
    def _inc_count(cls) -> None:
        """Increment the instance counter."""
        cls._COUNT += 1

running: bool property

Check if the worker is running.

Returns:

Type Description
bool

True if the worker is running, False otherwise

__init__(queue)

Initialize the worker.

Parameters:

Name Type Description Default
queue TaskQueue

the task queue to get tasks from

required
Source code in src/lhcbdirac_log/zstd/pool/worker.py
30
31
32
33
34
35
36
37
38
def __init__(self, queue: TaskQueue) -> None:
    """Initialize the worker.

    Args:
        queue: the task queue to get tasks from
    """
    self._inc_count()
    super().__init__(name=f"{self.__class__.__qualname__}-{self._COUNT}")
    self._queue = queue

run()

The worker's main loop.

Notes
  • the worker will stop if None is fetched from the queue
  • the worker signals the queue that tasks are done
Source code in src/lhcbdirac_log/zstd/pool/worker.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@override
def run(self) -> None:
    """The worker's main loop.

    Notes:
        - the worker will stop if None is fetched from the queue
        - the worker signals the queue that tasks are done
    """
    while (task := self._queue.get()) is not None:
        try:
            task.run()
        except Exception as e:
            self._queue.reject(task, e)
        finally:
            self._queue.task_done()

    self._queue.task_done()