Threaded API
This package provides zstd parallel processing capability.
Classes:
Name | Description |
---|---|
- CompressorPool |
a thread pool managing data compressors. |
- DecompressorPool |
a thread pool managing data decompressors. |
- ThreadPool |
an abstract thread pool managing workers and their tasks. |
- ProcessorPool |
a pool of zstd processors, managing their assignments. |
- TaskQueue |
a proxy queue managing tasks and their dispatching to training queues, if necessary. |
- Task |
a task to process data. |
- TrainingTask |
a task to train a dictionary. |
- ProcessingTask |
a task to process data. |
- Worker |
a worker processing tasks. |
CompressorPool
Bases: ThreadPool[ZstandardCompressor]
Thread pool implementation for data compression.
Source code in src/lhcbdirac_log/zstd/pool/pool.py
226 227 228 229 230 231 232 233 |
|
DecompressorPool
Bases: ThreadPool[ZstandardDecompressor]
Thread pool implementation for data decompression.
Source code in src/lhcbdirac_log/zstd/pool/pool.py
236 237 238 239 240 241 242 243 |
|
ProcessingTask
Bases: Task[ProcessorPool]
[Internal] Class for processing tasks.
Such tasks are processing data from an input entry to an output entry, using a processor from the processor pool, and a dictionary from the dictionary provider.
Source code in src/lhcbdirac_log/zstd/pool/task.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
|
data_in: DataEntry
property
data_out: DataEntry
property
dict_name: str
property
Get the dictionary name from the entries.
Returns:
Type | Description |
---|---|
str
|
the dictionary name |
__init__(procpool, data_in, data_out, dict_provider, config)
Initialize the task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
procpool |
ProcessorPool
|
the processor pool |
required |
data_in |
DataEntry
|
the input data entry |
required |
data_out |
DataEntry
|
the output data entry |
required |
dict_provider |
DictProvider
|
the dictionary provider to get dictionaries from |
required |
config |
Config
|
the configuration to use |
required |
Source code in src/lhcbdirac_log/zstd/pool/task.py
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
ProcessorPool
[Internal] A pool of data processors, managing their assignments.
Used by tasks to get a free processor, and to release it when done.
Source code in src/lhcbdirac_log/zstd/pool/processor.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
__getitem__(dict_name)
Get a free processor for the given dictionary name, or None if not available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dict_name |
str | None
|
the processor dictionary name, or None for non-assigned processors |
required |
Returns:
Type | Description |
---|---|
P | None
|
a free processor, or None if not available |
Notes
- the pool will create new processors if needed, depending on dict_name and if the pool is variable
Source code in src/lhcbdirac_log/zstd/pool/processor.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
|
__init__(processor_factory, workers=os.cpu_count() or 2, processors=-3)
Initialize the pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
processor_factory |
Callable[[], P]
|
a callable returning a new processor instance (may be a class) |
required |
workers |
int
|
the fixed number of workers (default: os.cpu_count() or 2) |
cpu_count() or 2
|
processors |
int | None
|
the fixed number of processors, gave as positive (workers <= processors), or the variable number of processors, gave as negative (1 <= -processors <= workers), where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum) |
-3
|
Raises:
Type | Description |
---|---|
ValueError
|
if processors is out of range |
Source code in src/lhcbdirac_log/zstd/pool/processor.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
|
get_any()
Get any free processor.
Returns:
Type | Description |
---|---|
P | None
|
a free processor, or None if none available |
Notes
- tries to return a non-assigned processor first
Source code in src/lhcbdirac_log/zstd/pool/processor.py
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
|
release(processor)
Release a processor, marking it as available for other workers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
processor |
P
|
the processor to release |
required |
Notes
- no check is done to ensure the processor is not in use
- releasing an already released processor has not effect
Source code in src/lhcbdirac_log/zstd/pool/processor.py
65 66 67 68 69 70 71 72 73 74 75 76 |
|
Task
Bases: ABC
[Internal] Abstract base class for tasks to be executed by a worker.
Task execution is done using a processor, obtained when the task is running.
Source code in src/lhcbdirac_log/zstd/pool/task.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
__init__(procpool)
Initialize the task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
procpool |
P
|
the processor pool, used to get a processor |
required |
Source code in src/lhcbdirac_log/zstd/pool/task.py
38 39 40 41 42 43 44 |
|
run()
abstractmethod
Run the task.
Notes
- the task should release the processor when done
- the task may not be run again
- may raise exceptions
Source code in src/lhcbdirac_log/zstd/pool/task.py
46 47 48 49 50 51 52 53 54 |
|
TaskQueue
Bases: Queue[Task | None]
[Internal] A proxy queue managing tasks and their dispatching to training queues, if necessary.
Just putting tasks and it will automatically put training tasks, if necessary, when enough tasks are available as a training sample. Then, the trainer will requeue the tasks to the real task queue to be finally processed by the workers.
Source code in src/lhcbdirac_log/zstd/pool/queue.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
|
rejected: list[tuple[Task, Exception]]
property
Get the rejected tasks.
Rejected tasks are tasks that failed to be processed.
Returns:
Type | Description |
---|---|
list[tuple[Task, Exception]]
|
the rejected tasks |
__init__(workers, dict_provider, config=DEFAULT_CONFIG)
Initialize the task queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workers |
int
|
the number of workers available or 0 to disable the training system |
required |
dict_provider |
DictProvider
|
the dict provider |
required |
config |
Config
|
the configuration to use for training (default: DEFAULT_CONFIG) |
DEFAULT_CONFIG
|
Source code in src/lhcbdirac_log/zstd/pool/queue.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
|
flush()
Flush all remaining incomplete training queues to training tasks.
Must be called after all tasks are added to the queue.
Notes
- do nothing if the training system is disabled
Source code in src/lhcbdirac_log/zstd/pool/queue.py
162 163 164 165 166 167 168 169 170 171 172 173 |
|
put(item, block=True, timeout=None)
Put a task in the task queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item |
Task | None
|
the task |
required |
block |
bool
|
if True, blocks until the task is added (default: True) |
True
|
timeout |
float | None
|
the timeout in seconds (default: None) |
None
|
Notes
- if the task is a processing task, and the training system is enabled, it will be added to the training queue if necessary dict is not available
Source code in src/lhcbdirac_log/zstd/pool/queue.py
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
|
reject(task, exc)
Mark a task as rejected.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
Task
|
the rejected task |
required |
exc |
Exception
|
the exception raised |
required |
Source code in src/lhcbdirac_log/zstd/pool/queue.py
100 101 102 103 104 105 106 107 |
|
ThreadPool
Bases: ABC
A thread pool managing workers and the task queue.
Notes
- Must be subclassed with defined processor type
Source code in src/lhcbdirac_log/zstd/pool/pool.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
|
rejected: list[tuple[Task, Exception]]
property
running: bool
property
Check if the pool is running.
Returns:
Type | Description |
---|---|
bool
|
True if the pool is running, False otherwise |
stopped: bool
property
Check if the pool is stopped.
Returns:
Type | Description |
---|---|
bool
|
True if the pool is stopped, False otherwise |
stopping: bool
property
Check if the pool is stopping.
Returns:
Type | Description |
---|---|
bool
|
True if the pool is stopping, False otherwise |
__init__(dict_provider, config=DEFAULT_CONFIG, workers=os.cpu_count() or 2, processors=None)
Initialize the pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dict_provider |
DictProvider
|
the dictionary provider to use |
required |
config |
Config
|
the configuration to use (default: DEFAULT_CONFIG) |
DEFAULT_CONFIG
|
workers |
int
|
the fixed number of workers (default: os.cpu_count() or 2) |
cpu_count() or 2
|
processors |
int | None
|
the fixed number of processors, gave as positive (workers <= processors), or the variable number of processors, gave as negative (1 <= -processors <= workers, where the real number of processors is: N = max(workers, (loaded dict + 1) * -processors). (default None, maximum) |
None
|
Notes
The number of processors and workers can be controlled independently, because of the following reasons: - a worker can process one task at a time, and a task can be processed by one worker only at a time - a task use a processor to process data, and a processor can process data for one task only at a time - processor use a zstd dictionary for processing data, but changing the dictionary is expensive so it is better to keep the same dictionary for a while, and so to have many processors with different dictionaries
Source code in src/lhcbdirac_log/zstd/pool/pool.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
|
__init_subclass__(*, processor=None, training=None)
Initialize the subclass.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
processor |
type[P] | None
|
the processor type to use |
None
|
training |
bool | None
|
indicate if training is enabled or not |
None
|
Raises:
Type | Description |
---|---|
TypeError
|
if any is None or if processor is not a ZstandardProcessor subclass |
Source code in src/lhcbdirac_log/zstd/pool/pool.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
|
add(data_in, data_out)
Add a task to the queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_in |
DataEntry
|
the input data accessor |
required |
data_out |
DataEntry
|
the output data accessor |
required |
Raises:
Type | Description |
---|---|
RuntimeError
|
if the pool is stopping |
Notes
- the pool will start if not running
Source code in src/lhcbdirac_log/zstd/pool/pool.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
|
flush()
Flush all remaining incomplete training queues to training tasks.
Must be called after all tasks are added to the queue.
Source code in src/lhcbdirac_log/zstd/pool/pool.py
184 185 186 187 188 189 190 |
|
join(timeout=None)
Join the pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
float | None
|
the timeout in seconds, used to join the pool (default: None) |
None
|
Source code in src/lhcbdirac_log/zstd/pool/pool.py
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
|
start()
Start the pool.
Notes
- do nothing if the pool is already running
Source code in src/lhcbdirac_log/zstd/pool/pool.py
169 170 171 172 173 174 175 176 177 |
|
stop()
Request the pool to stop.
Notes
- this call wait() first, blocking
- the pool will effectively stop after all workers are done
- workers will effectively stop after all tasks are done (wait())
- to stop the pool instantly, clear the queue first
Source code in src/lhcbdirac_log/zstd/pool/pool.py
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
|
wait()
Wait until the task queue is empty.
Source code in src/lhcbdirac_log/zstd/pool/pool.py
179 180 181 182 |
|
TrainingTask
Bases: Task[Queue[ZstandardTrainer]]
[Internal] Class for training tasks.
Such tasks are training a dictionary from a list of processing tasks. When done, these are reinjected in the task queue.
Source code in src/lhcbdirac_log/zstd/pool/task.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
|
__init__(trainpool, callback, dict_provider, name, tasks)
Initialize the task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trainpool |
Queue[ZstandardTrainer]
|
the training pool, to get a trainer |
required |
callback |
Callable[[str], None]
|
the callback to call when training is done, called with the dictionary name |
required |
dict_provider |
DictProvider
|
the dictionary provider to save the trained dictionary to |
required |
name |
str
|
the name of the dictionary to train |
required |
tasks |
list[ProcessingTask]
|
the tasks to take samples from |
required |
Source code in src/lhcbdirac_log/zstd/pool/task.py
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
|
Worker
Bases: Thread
[Internal] A thread worker class that processes tasks from the specified task queue.
Notes
- the worker will stop if None is fetched from the queue
- the worker signals the queue that tasks are done
- stopped workers cannot be restarted (python threads limitation)
Source code in src/lhcbdirac_log/zstd/pool/worker.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
|
running: bool
property
Check if the worker is running.
Returns:
Type | Description |
---|---|
bool
|
True if the worker is running, False otherwise |
__init__(queue)
Initialize the worker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queue |
TaskQueue
|
the task queue to get tasks from |
required |
Source code in src/lhcbdirac_log/zstd/pool/worker.py
30 31 32 33 34 35 36 37 38 |
|
run()
The worker's main loop.
Notes
- the worker will stop if None is fetched from the queue
- the worker signals the queue that tasks are done
Source code in src/lhcbdirac_log/zstd/pool/worker.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
|