multiml.task_scheduler module

TaskScheduler module.

In the multiml framework, task describes each step of pipeline, and subtask describes component of task with different type of approarchs, e.g. different type of ML models. The following scheme shows the case that the multiml consists of two steps, and three subtasks are defined for each step:

>>> task0 (subtask0, subtask1, subtask2) -> task0 (subtask3, subtask4, subtask5)

TaskScheduler class manages dependencies of task, and stoers subtask class instances and thier hyperparameters.

multiml.task_scheduler.tasktuple

namedtuple of task, which consists of task_id and subtasks, task_id is unique identifier of task, and subtasks is a list of subtasktuple described below.

Type:

namedtuple

multiml.task_scheduler.subtasktuple

namedtuple of subtask, which consists of task_id, subtask_id, env and hps. subtask_id is unique identifier of subtask. env is class instance of subtask. hps is class instance of Hyperparameters.

Type:

namedtuple

class multiml.task_scheduler.tasktuple(task_id, subtasks)

Bases: tuple

subtasks

Alias for field number 1

task_id

Alias for field number 0

class multiml.task_scheduler.subtasktuple(task_id, subtask_id, env, hps)

Bases: tuple

env

Alias for field number 2

hps

Alias for field number 3

subtask_id

Alias for field number 1

task_id

Alias for field number 0

class multiml.task_scheduler.TaskScheduler(ordered_tasks=None)

Bases: object

Task management class for multiml execution.

Manage tasks and subtasks. Ordering of tasks are controlled by DAG by providing parents and childs dependencies.

Examples

>>> subtask = MyTask()
>>> task_scheduler = TaskScheduler()
>>> task_scheduler.add_task('task_id')
>>> task_scheduler.add_subtask('task_id', 'subtask_id', subtask)
>>> task_scheduler.get_sorted_task_ids()
__init__(ordered_tasks=None)

Initialize the TaskScheduler and reset DAG.

ordered_tasks option provides a shortcut of registering ordered task and subtask. Please see add_ordered_tasks() and add_ordered_subtasks() methods for details. If task dependencies are complex, please add task and subtask using add_task() and add_subtask() methods.

Parameters:

ordered_tasks (list) – list of ordered task_ids, or list of ordered subtasks. If given value is list of str, add_ordered_tasks() is called to register task_ids. If given value is list of other types, add_ordered_subtasks() is called to register subtasks.

Examples

>>> # ordered task_ids
>>> task_scheduler = TaskScheduler(['task0', 'task1'])
>>> task_scheduler.add_subtask('task0', 'subtask0', env)
__len__()

Returns number of all grid combination.

Returns:

the number of all grid combination.

Return type:

int

__getitem__(item)

Returns subtasktuples by index.

Parameters:

item (int) – Index between 0 to len(task_scheduler).

Examples

>>> task_scheduler[0]
add_task(task_id, parents=None, children=None, subtasks=None, add_to_dag=True)

Register task and add the relation between tasks.

If subtasks is provided as a list of dict, subtasks are also registered to given task_id. To specify dependencies of tasks, parents or/and children need to be set, and add_to_dag must be True.

Parameters:
  • task_id (str) – unique task identifier

  • parents (list or str) – list of parent task_ids, or str of parent task_id.

  • children (list or str) – list of child task_ids. or str of child task_id.

  • subtasks (list) – list of dict of subtasks with format of {‘subtask_id’: subtask_id, ‘env’: env, ‘hps’: hps}

  • add_to_dag (bool) – add task to DAG or not. To obtain task dependencies, e.g. ordered tasks, task need to be added to DAG.

add_ordered_tasks(ordered_tasks)

Register ordered tasks.

For example, if ordered_tasks is [‘task0’, ‘task1’], ‘task0’ and ‘task0’ are registered with dependency of ‘task0 (parent)’ -> ‘task1 (child)’.

Parameters:

ordered_tasks (list) – list of task_ids

add_ordered_subtasks(ordered_tasks)

Register ordered subtasks.

ordered_tasks need to be a format of [task0, task1…], where e.g. task0 is a list of tuples [(‘subtask0’, env0, hps0), (‘subtask1’, env0, hps0)…]. task_id is automatically set with ‘step0’, ‘step1’… For the examples below, scheme of pipeline is:

>>> step0 (subtask0, subtask1) -> step1 (subtask2, subtask2)
Parameters:

ordered_tasks (list) – list of subtasks. Please see examples below.

Examples

>>> # ordered tasks with subtask_id and hyperparameters
>>> step0 = [('subtask0', env0, hps0), ('subtask1', env1, hps1)]
>>> step1 = [('subtask2', env2, hps2), ('subtask3', env3, hps3)]
>>> steps = [step0, step1]
>>> task_scheduler.add_ordered_subtasks(steps)
>>>
>>> # ordered tasks with hyperparameters (subtask_id will be class name)
>>> step0 = [(env0, hps0), (env1, hps1)]
>>> step1 = [(env2, hps2), (env3, hps3)]
>>> steps = [step0, step1]
>>> task_scheduler.add_ordered_subtasks(steps)
>>>
>>> # ordered tasks without hyperparameters
>>> steps = [env0, env1]
>>> task_scheduler.add_ordered_subtasks(steps)
add_subtask(task_id, subtask_id=None, env=None, hps=None)

Register a subtask to given task.

Need to register the corresponding task before calling this method.

Parameters:
  • task_id (str) – unique task identifier.

  • subtask_id (str) – unique subtask identifier.

  • env (BaseTask) – user defined subtask class instance. subtask class need to inherited from BaseTask class.

  • hps (dict or Hyperparameters) – user defined Hyperparameters class instance or dict. If hps is dict, dict is converted to Hyperparameters class instance automatically.

get_subtasks(task_id)

Returns subtasks of tasktuple for given task_id.

Parameters:

task_id (str) – unique task identifier.

Returns:

list of subtasktuples for given task_id.

Return type:

list

get_subtask_ids(task_id)

Returns subtask_ids by task_id.

Parameters:

task_id (str) – unique task identifier.

Returns:

list of subtask_ids for given task_id.

Return type:

list

get_subtask(task_id, subtask_id)

Returns subtasktuple for given task_id and subtask_id.

Parameters:
  • task_id (str) – unique task identifier.

  • subtask_id (str) – unique subtask identifier.

Returns:

subtasktuple for given task_id and subtask_id.

Return type:

subtasktuple

get_parents_task_ids(task_id)

Returns parent task_ids for given task_id.

Parameters:

task_id (str) – unique task identifier.

Returns:

list of parent task_ids for given task_ids.

Return type:

list

get_children_task_ids(task_id)

Returns child task_ids for given task_id.

Parameters:

task_id (str) – unique task identifier.

Returns:

list of child task_ids for given task_id.

Return type:

list

get_sorted_task_ids()

Returns topologically sorted task_ids.

Returns:

a list of topologically sorted task_ids.

Return type:

list

get_subtasks_with_hps(task_id)

Returns all combination of subtask_ids and hps for given task_id.

Parameters:

task_id (str) – unique task identifier.

Returns:

list of modified subtasktuples. Modified subtasktuple format is .task_id: task_id, .subtask_id: subtask_id, .env: subtask class instance, .hps: dictionary of hps.

Return type:

list

get_all_subtasks_with_hps()

Returns all combination of subtask_ids and hps for all task_ids.

Returns:

list of get_subtasks_with_hps() for each task_id.

Return type:

list

get_subtasks_pipeline(index)

Returns modified subtasktuples for given index.

Returns:

list of modified subtasktuples.

Return type:

list

show_info()

Show information of registered tasks and subtasks.