/usr/lib/python3/dist-packages/doit/control.py is in python3-doit 0.28.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 | """Control tasks execution order"""
import fnmatch
from collections import deque
import six
from .exceptions import InvalidTask, InvalidCommand, InvalidDodoFile
from .task import Task, DelayedLoaded
from .loader import generate_tasks
class TaskControl(object):
"""Manages tasks inter-relationship
There are 3 phases
1) the constructor gets a list of tasks and do initialization
2) 'process' the command line options for tasks are processed
3) 'task_dispatcher' dispatch tasks to runner
Process dependencies and targets to find out the order tasks
should be executed. Also apply filter to exclude tasks from
execution. And parse task cmd line options.
@ivar tasks: (dict) Key: task name ([taskgen.]name)
Value: L{Task} instance
@ivar targets: (dict) Key: fileName
Value: task_name
"""
def __init__(self, task_list):
self.tasks = {}
self.targets = {}
# name of task in order to be executed
# this the order as in the dodo file. the real execution
# order might be different if the dependecies require so.
self._def_order = []
# list of tasks selected to be executed
self.selected_tasks = None
# sanity check and create tasks dict
for task in task_list:
# task must be a Task
if not isinstance(task, Task):
msg = "Task must an instance of Task class. %s"
raise InvalidTask(msg % (task.__class__))
# task name must be unique
if task.name in self.tasks:
msg = "Task names must be unique. %s"
raise InvalidDodoFile(msg % task.name)
self.tasks[task.name] = task
self._def_order.append(task.name)
# expand wild-card task-dependencies
for task in six.itervalues(self.tasks):
for pattern in task.wild_dep:
task.task_dep.extend(self._get_wild_tasks(pattern))
self._check_dep_names()
self.set_implicit_deps(self.targets, task_list)
def _check_dep_names(self):
"""check if user input task_dep or setup_task that doesnt exist"""
# check task-dependencies exist.
for task in six.itervalues(self.tasks):
for dep in task.task_dep:
if dep not in self.tasks:
msg = "%s. Task dependency '%s' does not exist."
raise InvalidTask(msg% (task.name, dep))
for setup_task in task.setup_tasks:
if setup_task not in self.tasks:
msg = "Task '%s': invalid setup task '%s'."
raise InvalidTask(msg % (task.name, setup_task))
@staticmethod
def set_implicit_deps(targets, task_list):
"""set/add task_dep based on file_dep on a target from another task
@param targets: (dict) fileName -> task_name
@param task_list: (list - Task) task with newly added file_dep
"""
# 1) create a dictionary associating every target->task. where the task
# builds that target.
for task in task_list:
for target in task.targets:
if target in targets:
msg = ("Two different tasks can't have a common target." +
"'%s' is a target for %s and %s.")
raise InvalidTask(msg % (target, task.name,
targets[target]))
targets[target] = task.name
# 2) now go through all dependencies and check if they are target from
# another task.
# FIXME - when used with delayed tasks needs to check if
# any new target matches any old file_dep.
for task in task_list:
TaskControl.add_implicit_task_dep(targets, task, task.file_dep)
@staticmethod
def add_implicit_task_dep(targets, task, deps_list):
"""add implicit task_dep for `task` for newly added `file_dep`
@param targets: (dict) fileName -> task_name
@param task: (Task) task with newly added file_dep
@param dep_list: (list - str): list of file_dep for task
"""
for dep in deps_list:
if (dep in targets and targets[dep] not in task.task_dep):
task.task_dep.append(targets[dep])
def _get_wild_tasks(self, pattern):
"""get list of tasks that match pattern"""
wild_list = []
for t_name in self._def_order:
if fnmatch.fnmatch(t_name, pattern):
wild_list.append(t_name)
return wild_list
def _process_filter(self, task_selection):
"""process cmd line task options
[task_name [-task_opt [opt_value]] ...] ...
@param task_selection: list of strings with task names/params or target
@return list of task names. Expanding glob and removed params
"""
filter_list = []
def add_filtered_task(seq, f_name):
"""add task to list `filter_list` and set task.options from params
@return list - str: of elements not yet
"""
filter_list.append(f_name)
# only tasks specified by name can contain parameters
if f_name in self.tasks:
# parse task_selection
the_task = self.tasks[f_name]
# remaining items are other tasks not positional options
the_task.options, seq = the_task.taskcmd.parse(seq)
# if task takes positional parameters set all as pos_arg_val
if the_task.pos_arg is not None:
the_task.pos_arg_val = seq
seq = []
return seq
# process...
seq = task_selection[:]
# process cmd_opts until nothing left
while seq:
f_name = seq.pop(0) # always start with a task/target name
# select tasks by task-name pattern
if '*' in f_name:
for task_name in self._get_wild_tasks(f_name):
add_filtered_task((), task_name)
else:
seq = add_filtered_task(seq, f_name)
return filter_list
def _filter_tasks(self, task_selection):
"""Select tasks specified by filter.
@param task_selection: list of strings with task names/params or target
@return (list) of string. where elements are task name.
"""
selected_task = []
filter_list = self._process_filter(task_selection)
for filter_ in filter_list:
# by task name
if filter_ in self.tasks:
selected_task.append(filter_)
# by target
elif filter_ in self.targets:
selected_task.append(self.targets[filter_])
else:
# if can not find name check if it is a sub-task of a delayed
basename = filter_.split(':', 1)[0]
if basename in self.tasks:
loader = self.tasks[basename].loader
loader.basename = basename
self.tasks[filter_] = Task(filter_, None, loader=loader)
selected_task.append(filter_)
else:
msg = ('cmd `run` invalid parameter: "%s".' +
' Must be a task, or a target.\n' +
'Type "doit list" to see available tasks')
raise InvalidCommand(msg % filter_)
return selected_task
def process(self, task_selection):
"""
@param task_selection: list of strings with task names/params
@return (list - string) each element is the name of a task
"""
# execute only tasks in the filter in the order specified by filter
if task_selection is not None:
self.selected_tasks = self._filter_tasks(task_selection)
else:
# if no filter is defined execute all tasks
# in the order they were defined.
self.selected_tasks = self._def_order
def task_dispatcher(self):
"""return a TaskDispatcher generator
"""
assert self.selected_tasks is not None, \
"must call 'process' before this"
return TaskDispatcher(self.tasks, self.targets, self.selected_tasks)
class ExecNode(object):
"""Each task will have an instace of this
This used to keep track of waiting events and the generator for dep nodes
@ivar run_status (str): contains the result of Dependency.get_status
modified by runner, value can be:
- None: not processed yet
- run: task is selected to be executed (it might be running or
waiting for setup)
- ignore: task wont be executed (user forced deselect)
- up-to-date: task wont be executed (no need)
- done: task finished its execution
"""
def __init__(self, task, parent):
self.task = task
# list of dependencies not processed by _add_task yet
self.task_dep = task.task_dep[:]
self.calc_dep = task.calc_dep.copy()
# ancestors are used to detect cyclic references.
# it does not contain a list of tasks that depends on this node
# for that check the attribute waiting_me
self.ancestors = []
if parent:
self.ancestors.extend(parent.ancestors)
self.ancestors.append(task.name)
# Wait for a task to be selected to its execution
# checking if it is up-to-date
self.wait_select = False
# Wait for a task to finish its execution
self.wait_run = set() # task names
self.wait_run_calc = set() # task names
self.waiting_me = set() # ExecNode
self.run_status = None
# all ancestors that failed
self.bad_deps = []
self.ignored_deps = []
# generator from TaskDispatcher._add_task
self.generator = None
def reset_task(self, task, generator):
"""reset task & generator after task is created by its own `loader`"""
task.loader = DelayedLoaded
self.task = task
self.task_dep = task.task_dep[:]
self.calc_dep = task.calc_dep.copy()
self.generator = generator
def parent_status(self, parent_node):
if parent_node.run_status == 'failure':
self.bad_deps.append(parent_node)
elif parent_node.run_status == 'ignore':
self.ignored_deps.append(parent_node)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.task.name)
def step(self):
"""get node's next step"""
try:
return next(self.generator)
except StopIteration:
return None
def no_none(decorated):
"""decorator for a generator to discard/filter-out None values"""
def _func(*args, **kwargs):
"""wrap generator"""
for value in decorated(*args, **kwargs):
if value is not None:
yield value
return _func
class TaskDispatcher(object):
"""Dispatch another task to be selected/executed, mostly handle with MP
Note that a dispatched task might not be ready to be executed.
"""
def __init__(self, tasks, targets, selected_tasks):
self.tasks = tasks
self.targets = targets
self.nodes = {} # key task-name, value: ExecNode
# queues
self.waiting = set() # of ExecNode
self.ready = deque() # of ExecNode
self.generator = self._dispatcher_generator(selected_tasks)
def _gen_node(self, parent, task_name):
"""return ExecNode for task_name if not created yet"""
node = self.nodes.get(task_name, None)
# first time, create node
if node is None:
node = ExecNode(self.tasks[task_name], parent)
node.generator = self._add_task(node)
self.nodes[task_name] = node
return node
# detect cyclic/recursive dependencies
if parent and task_name in parent.ancestors:
msg = "Cyclic/recursive dependencies for task %s: [%s]"
cycle = " -> ".join(parent.ancestors + [task_name])
raise InvalidDodoFile(msg % (task_name, cycle))
def _node_add_wait_run(self, node, task_list, calc=False):
"""updates node.wait_run
@param node (ExecNode)
@param task_list (list - str) tasks that node should wait for
@param calc (bool) task_list is for calc_dep
"""
# wait_for: contains tasks that `node` needs to wait for and
# were not executed yet.
wait_for = set()
for name in task_list:
dep_node = self.nodes[name]
if (not dep_node) or dep_node.run_status in (None, 'run'):
wait_for.add(name)
else:
# if dep task was already executed:
# a) set parent status
node.parent_status(dep_node)
# b) update dependencies from calc_dep results
if calc:
self._process_calc_dep_results(dep_node, node)
# update ExecNode setting parent/dependent relationship
for name in wait_for:
self.nodes[name].waiting_me.add(node)
if calc:
node.wait_run_calc.update(wait_for)
else:
node.wait_run.update(wait_for)
@no_none
def _add_task(self, node):
"""@return a generator that produces:
- ExecNode for task dependencies
- 'wait' to wait for an event (i.e. a dep task run)
- Task when ready to be dispatched to runner (run or be selected)
- None values are of no interest and are filtered out
by the decorator no_none
note that after a 'wait' is sent it is the reponsability of the
caller to ensure the current ExecNode cleared all its waiting
before calling `next()` again on this generator
"""
this_task = node.task
# add calc_dep & task_dep until all processed
# calc_dep may add more deps so need to loop until nothing left
while True:
calc_dep_list = list(node.calc_dep)
node.calc_dep.clear()
task_dep_list = node.task_dep[:]
node.task_dep = []
for calc_dep in calc_dep_list:
yield self._gen_node(node, calc_dep)
self._node_add_wait_run(node, calc_dep_list, calc=True)
# add task_dep
for task_dep in task_dep_list:
yield self._gen_node(node, task_dep)
self._node_add_wait_run(node, task_dep_list)
# do not wait until all possible task_dep are created
if (node.calc_dep or node.task_dep):
continue # pragma: no cover # coverage cant catch this #198
elif (node.wait_run or node.wait_run_calc):
yield 'wait'
else:
break
if this_task.loader:
if this_task.loader.created:
# This loader was already executed but its task was
# not replaced.
# This happens when a sub-task is specified from the
# command line but actually not created.
# TODO: needs a new error class ?
return
ref = this_task.loader.creator
basename = this_task.loader.basename or this_task.name
new_tasks = generate_tasks(basename, ref(), ref.__doc__)
TaskControl.set_implicit_deps(self.targets, new_tasks)
for nt in new_tasks:
if not nt.loader:
nt.loader = DelayedLoaded
self.tasks[nt.name] = nt
this_task.loader.created = True
# this task was placeholder to execute the loader
# now it needs to be re-processed with the real task
yield "reset generator"
assert False, "This generator can not be used again"
# add itself
yield this_task
# tasks that contain setup-tasks need to be yielded twice
if this_task.setup_tasks:
# run_status None means task is waiting for other tasks
# in order to check if up-to-date. so it needs to wait
# before scheduling its setup-tasks.
if node.run_status is None:
node.wait_select = True
yield "wait"
# if this task should run, so schedule setup-tasks before itself
if node.run_status == 'run':
for setup_task in this_task.setup_tasks:
yield self._gen_node(node, setup_task)
self._node_add_wait_run(node, this_task.setup_tasks)
if node.wait_run:
yield 'wait'
# re-send this task after setup_tasks are sent
yield this_task
def _get_next_node(self, ready, tasks_to_run):
"""get ExecNode from (in order):
.1 ready
.2 tasks_to_run (list in reverse order)
"""
if ready:
return ready.popleft()
# get task group from tasks_to_run
while tasks_to_run:
task_name = tasks_to_run.pop()
node = self._gen_node(None, task_name)
if node:
return node
def _update_waiting(self, processed):
"""updates 'ready' and 'waiting' queues after processed
@param processed (ExecNode) or None
"""
# no task processed, just ignore
if processed is None:
return
node = processed
# if node was waiting select must only receive select event
if node.wait_select:
self.ready.append(node)
self.waiting.remove(node)
node.wait_select = False
# status == run means this was not just select completed
if node.run_status == 'run':
return
for waiting_node in node.waiting_me:
waiting_node.parent_status(node)
# is_ready indicates if node.generator can be invoked again
task_name = node.task.name
# node wait_run will be ready if there are nothing left to wait
if task_name in waiting_node.wait_run:
waiting_node.wait_run.remove(task_name)
is_ready = not (waiting_node.wait_run or
waiting_node.wait_run_calc)
# node wait_run_calc
else:
assert task_name in waiting_node.wait_run_calc
waiting_node.wait_run_calc.remove(task_name)
# calc_dep might add new deps that can be run without
# waiting for the completion of the remaining deps
is_ready = True
self._process_calc_dep_results(node, waiting_node)
# this node can be further processed
if is_ready and (waiting_node in self.waiting):
self.ready.append(waiting_node)
self.waiting.remove(waiting_node)
def _process_calc_dep_results(self, node, waiting_node):
# refresh this task dependencies with values got from calc_dep
values = node.task.values
len_task_deps = len(waiting_node.task.task_dep)
old_calc_dep = waiting_node.task.calc_dep.copy()
waiting_node.task.update_deps(values)
TaskControl.add_implicit_task_dep(
self.targets, waiting_node.task,
values.get('file_dep', []))
# update node's list of non-processed dependencies
new_task_dep = waiting_node.task.task_dep[len_task_deps:]
waiting_node.task_dep.extend(new_task_dep)
new_calc_dep = waiting_node.task.calc_dep - old_calc_dep
waiting_node.calc_dep.update(new_calc_dep)
def _dispatcher_generator(self, selected_tasks):
"""return generator dispatching tasks"""
# each selected task will create a tree (from dependencies) of
# tasks to be processed
tasks_to_run = list(reversed(selected_tasks))
node = None # current active ExecNode
while True:
# get current node
if not node:
node = self._get_next_node(self.ready, tasks_to_run)
if not node:
if self.waiting:
# all tasks are waiting, hold on
processed = (yield "hold on")
self._update_waiting(processed)
continue
# we are done!
return
# get next step from current node
next_step = node.step()
# got None, nothing left for this generator
if next_step is None:
node = None
continue
# got a task, send ExecNode to runner
if isinstance(next_step, Task):
processed = (yield self.nodes[next_step.name])
self._update_waiting(processed)
# got new ExecNode, add to ready_queue
elif isinstance(next_step, ExecNode):
self.ready.append(next_step)
# node just performed a delayed creation of tasks, restart
elif next_step == "reset generator":
node.reset_task(self.tasks[node.task.name],
self._add_task(node))
# got 'wait', add ExecNode to waiting queue
else:
assert next_step == "wait"
self.waiting.add(node)
node = None
|