Dask manager
Distributed computation management using Dask.
This module handles distributed parallel execution of benchmark evaluations across multiple worker nodes. It provides abstractions for mapping functions across data in a distributed manner with resource management.
DaskManager()
DaskManager class for distributed parallel execution.
Source code in mlir_rl_artifact/utils/dask_manager.py
workers_names
property
List of available worker names.
num_workers
property
Number of available workers.
map_objs(func, objs, benchs, main_exec_data, training, obj_str=lambda o: str(o))
Map a function across objects in a distributed manner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[obj_T, str, Benchmarks, Optional[dict[str, dict[str, int]]]], T]
|
The function to apply to each object. |
required |
objs
|
Iterable[obj_T]
|
The objects to apply the function to. |
required |
benchs
|
Benchmarks
|
The benchmark suite to use. |
required |
main_exec_data
|
dict[str, dict[str, int]] | None
|
The main execution data (if available). |
required |
training
|
bool
|
Whether the mapping is for training. if True, the function will be executed with a timeout and the training benchmarks will be used instead of evaluation. |
required |
obj_str
|
Callable[[obj_T], str]
|
A function to convert each object to a string for logging. |
lambda o: str(o)
|
Returns:
| Type | Description |
|---|---|
list[T | None]
|
A list of the results of the function applied to each object. |
Source code in mlir_rl_artifact/utils/dask_manager.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | |
run_and_register_to_workers(func)
Run a function both locally and on the workers. The result will be registered to all workers, and returned by this function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[], T]
|
The function to run. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The result of the function. |
Source code in mlir_rl_artifact/utils/dask_manager.py
__submit_persistent(key, worker)
Submit a persistent function to a worker, and keep track of its result (Future) for re-use.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the function. |
required |
worker
|
str
|
The worker to submit the function to. |
required |
Returns:
| Type | Description |
|---|---|
Future
|
The future of the function. |
Source code in mlir_rl_artifact/utils/dask_manager.py
__get_persistent(key, worker)
Get the result of a persistent function from a worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the function. |
required |
worker
|
str
|
The worker to get the result from. |
required |
Returns:
| Type | Description |
|---|---|
Future
|
The future that points to the result of the function. |
Source code in mlir_rl_artifact/utils/dask_manager.py
__renew_persistent(key, worker)
Recompute the result of a persistent function on a worker. This should be called when a persistent result (Future) has become invalid (due to a worker failure mostly).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The key of the function. |
required |
worker
|
str
|
The worker to renew the result on. |
required |
Returns:
| Type | Description |
|---|---|
Future
|
The future that points to the result of the function. |
Source code in mlir_rl_artifact/utils/dask_manager.py
__renew_worker_persistents(worker)
Recompute all persistent functions on a worker. This should be called when a worker has failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
worker
|
str
|
The worker to renew the results on. |
required |
Source code in mlir_rl_artifact/utils/dask_manager.py
__submit_obj(func, idx, obj, worker, training)
Execute a function on an object, and submit it to a worker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[obj_T, str, Benchmarks, Optional[dict[str, dict[str, int]]]], T]
|
The function to execute. |
required |
idx
|
int
|
The index of the object (for tracking purposes). |
required |
obj
|
obj_T
|
The object to execute the function on. |
required |
worker
|
str
|
The worker to submit the result to. |
required |
training
|
bool
|
Whether the object is for training. if True, the function will be executed with a timeout and the training benchmarks will be used instead of evaluation. |
required |
Returns:
| Type | Description |
|---|---|
Future
|
The future that points to the result of the function. |
Source code in mlir_rl_artifact/utils/dask_manager.py
__keep_only_running()
Keep only workers with running jobs