Source code for cloudify.workflows.tasks_graph

########
# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    * See the License for the specific language governing permissions and
#    * limitations under the License.


import os
import json
import time

import networkx as nx

from cloudify.workflows import api
from cloudify.workflows import tasks


[docs]class TaskDependencyGraph(object): """ A task graph builder :param workflow_context: A WorkflowContext instance (used for logging) """ def __init__(self, workflow_context, default_subgraph_task_config=None): self.ctx = workflow_context self.graph = nx.DiGraph() default_subgraph_task_config = default_subgraph_task_config or {} self._default_subgraph_task_config = default_subgraph_task_config
[docs] def add_task(self, task): """Add a WorkflowTask to this graph :param task: The task """ self.ctx.logger.debug('adding task: {0}'.format(task)) self.graph.add_node(task.id, task=task)
[docs] def get_task(self, task_id): """Get a task instance that was inserted to this graph by its id :param task_id: the task id :return: a WorkflowTask instance for the requested task if found. None, otherwise. """ data = self.graph.node.get(task_id) return data['task'] if data is not None else None
[docs] def remove_task(self, task): """Remove the provided task from the graph :param task: The task """ self.graph.remove_node(task.id) # src depends on dst
[docs] def add_dependency(self, src_task, dst_task): """ Add a dependency between tasks. The source task will only be executed after the target task terminates. A task may depend on several tasks, in which case it will only be executed after all its 'destination' tasks terminate :param src_task: The source task :param dst_task: The target task """ self.ctx.logger.debug('adding dependency: {0} -> {1}'.format(src_task, dst_task)) if not self.graph.has_node(src_task.id): raise RuntimeError('source task {0} is not in graph (task id: ' '{1})'.format(src_task, src_task.id)) if not self.graph.has_node(dst_task.id): raise RuntimeError('destination task {0} is not in graph (task ' 'id: {1})'.format(dst_task, dst_task.id)) self.graph.add_edge(src_task.id, dst_task.id)
[docs] def sequence(self): """ :return: a new TaskSequence for this graph """ return TaskSequence(self)
[docs] def subgraph(self, name): task = SubgraphTask(name, self, **self._default_subgraph_task_config) self.add_task(task) return task
[docs] def execute(self): """ Start executing the graph based on tasks and dependencies between them. Calling this method will block until one of the following occurs: 1. all tasks terminated 2. a task failed 3. an unhandled exception is raised 4. the execution is cancelled Note: This method will raise an api.ExecutionCancelled error if the execution has been cancelled. When catching errors raised from this method, make sure to re-raise the error if it's api.ExecutionsCancelled in order to allow the execution to be set in cancelled mode properly. Also note that for the time being, if such a cancelling event occurs, the method might return even while there's some operations still being executed. """ while True: if self._is_execution_cancelled(): raise api.ExecutionCancelled() self._check_dump_request() # handle all terminated tasks # it is important this happens before handling # executable tasks so we get to make tasks executable # and then execute them in this iteration (otherwise, it would # be the next one) for task in self._terminated_tasks(): self._handle_terminated_task(task) # handle all executable tasks for task in self._executable_tasks(): self._handle_executable_task(task) # no more tasks to process, time to move on if len(self.graph.node) == 0: return # sleep some and do it all over again else: time.sleep(0.1)
@staticmethod def _is_execution_cancelled(): return api.has_cancel_request() def _executable_tasks(self): """ A task is executable if it is in pending state , it has no dependencies at the moment (i.e. all of its dependencies already terminated) and its execution timestamp is smaller then the current timestamp :return: An iterator for executable tasks """ now = time.time() return (task for task in self.tasks_iter() if task.get_state() == tasks.TASK_PENDING and task.execute_after <= now and not (task.containing_subgraph and task.containing_subgraph.get_state() == tasks.TASK_FAILED) and not self._task_has_dependencies(task)) def _terminated_tasks(self): """ A task is terminated if it is in 'succeeded' or 'failed' state :return: An iterator for terminated tasks """ return (task for task in self.tasks_iter() if task.get_state() in tasks.TERMINATED_STATES) def _task_has_dependencies(self, task): """ :param task: The task :return: Does this task have any dependencies """ return (len(self.graph.succ.get(task.id, {})) > 0 or (task.containing_subgraph and self._task_has_dependencies( task.containing_subgraph)))
[docs] def tasks_iter(self): """ An iterator on tasks added to the graph """ return (data['task'] for _, data in self.graph.nodes_iter(data=True))
def _handle_executable_task(self, task): """Handle executable task""" task.set_state(tasks.TASK_SENDING) task.apply_async() def _handle_terminated_task(self, task): """Handle terminated task""" handler_result = task.handle_task_terminated() if handler_result.action == tasks.HandlerResult.HANDLER_FAIL: if isinstance(task, SubgraphTask) and task.failed_task: task = task.failed_task raise RuntimeError( "Workflow failed: Task failed '{0}' -> {1}".format(task.name, task.error)) dependents = self.graph.predecessors(task.id) removed_edges = [(dependent, task.id) for dependent in dependents] self.graph.remove_edges_from(removed_edges) self.graph.remove_node(task.id) if handler_result.action == tasks.HandlerResult.HANDLER_RETRY: new_task = handler_result.retried_task self.add_task(new_task) added_edges = [(dependent, new_task.id) for dependent in dependents] self.graph.add_edges_from(added_edges) def _check_dump_request(self): task_dump = os.environ.get('WORKFLOW_TASK_DUMP') if not (task_dump and os.path.exists(task_dump)): return os.remove(task_dump) task_dump_path = '{0}.{1}'.format(task_dump, time.time()) with open(task_dump_path, 'w') as f: f.write(json.dumps({ 'tasks': [task.dump() for task in self.tasks_iter()], 'edges': [[s, t] for s, t in self.graph.edges_iter()]}))
[docs]class forkjoin(object): """ A simple wrapper for tasks. Used in conjunction with TaskSequence. Defined to make the code easier to read (instead of passing a list) see ``TaskSequence.add`` for more details """ def __init__(self, *tasks): self.tasks = tasks
[docs]class TaskSequence(object): """ Helper class to add tasks in a sequential manner to a task dependency graph :param graph: The TaskDependencyGraph instance """ def __init__(self, graph): self.graph = graph self.last_fork_join_tasks = None
[docs] def add(self, *tasks): """ Add tasks to the sequence. :param tasks: Each task might be: * A WorkflowTask instance, in which case, it will be added to the graph with a dependency between it and the task previously inserted into the sequence * A forkjoin of tasks, in which case it will be treated as a "fork-join" task in the sequence, i.e. all the fork-join tasks will depend on the last task in the sequence (could be fork join) and the next added task will depend on all tasks in this fork-join task """ for fork_join_tasks in tasks: if isinstance(fork_join_tasks, forkjoin): fork_join_tasks = fork_join_tasks.tasks else: fork_join_tasks = [fork_join_tasks] for task in fork_join_tasks: self.graph.add_task(task) if self.last_fork_join_tasks is not None: for last_fork_join_task in self.last_fork_join_tasks: self.graph.add_dependency(task, last_fork_join_task) if fork_join_tasks: self.last_fork_join_tasks = fork_join_tasks
[docs]class SubgraphTask(tasks.WorkflowTask): def __init__(self, name, graph, task_id=None, info=None, on_success=None, on_failure=None, total_retries=tasks.DEFAULT_SUBGRAPH_TOTAL_RETRIES, retry_interval=tasks.DEFAULT_RETRY_INTERVAL, send_task_events=tasks.DEFAULT_SEND_TASK_EVENTS): super(SubgraphTask, self).__init__( graph.ctx, task_id, info=info, on_success=on_success, on_failure=on_failure, total_retries=total_retries, retry_interval=retry_interval, send_task_events=send_task_events) self.graph = graph self._name = name self.tasks = {} self.failed_task = None if not self.on_failure: self.on_failure = lambda tsk: tasks.HandlerResult.fail() self.async_result = tasks.StubAsyncResult() def _duplicate(self): raise NotImplementedError('self.retried_task should be set explicitly' ' in self.on_failure handler') @property def cloudify_context(self): return {}
[docs] def is_local(self): return True
@property def name(self): return self._name
[docs] def sequence(self): return TaskSequence(self)
[docs] def subgraph(self, name): task = SubgraphTask(name, self.graph, **self.graph._default_subgraph_task_config) self.add_task(task) return task
[docs] def add_task(self, task): self.graph.add_task(task) self.tasks[task.id] = task if task.containing_subgraph and task.containing_subgraph is not self: raise RuntimeError('task {0}[{1}] cannot be contained in more ' 'than one subgraph. It is currently contained ' 'in {2} and it is now being added to {3}' .format(task, task.id, task.containing_subgraph.name, self.name)) task.containing_subgraph = self
[docs] def add_dependency(self, src_task, dst_task): self.graph.add_dependency(src_task, dst_task)
[docs] def apply_async(self): if not self.tasks: self.set_state(tasks.TASK_SUCCEEDED) else: self.set_state(tasks.TASK_STARTED)
[docs] def task_terminated(self, task, new_task=None): del self.tasks[task.id] if new_task: self.tasks[new_task.id] = new_task new_task.containing_subgraph = self if not self.tasks and self.get_state() not in tasks.TERMINATED_STATES: self.set_state(tasks.TASK_SUCCEEDED)