Added sequential task and parallel task.
Changed _run_one_task to run_one_task (now called by new tasks too).
Fix #4969
Signed-off-by: Warren Usui <warren.usui@inktank.com>
log = logging.getLogger(__name__)
-def _run_one_task(taskname, **kwargs):
+def run_one_task(taskname, **kwargs):
submod = taskname
subtask = 'task'
if '.' in taskname:
except ValueError:
raise RuntimeError('Invalid task definition: %s' % taskdict)
log.info('Running task %s...', taskname)
- manager = _run_one_task(taskname, ctx=ctx, config=config)
+ manager = run_one_task(taskname, ctx=ctx, config=config)
if hasattr(manager, '__enter__'):
manager.__enter__()
stack.append(manager)
--- /dev/null
+import sys
+import logging
+import contextlib
+
+from teuthology import run_tasks
+from teuthology import parallel
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+def task(ctx, config):
+ """
+ Run a group of tasks in parallel.
+
+ example:
+ - parallel:
+ - tasktest:
+ - tasktest:
+
+ Sequential task and Parallel tasks can be nested.
+ """
+
+ log.info('starting parallel...')
+ with parallel.parallel() as p:
+ for entry in config:
+ ((taskname, confg),) = entry.iteritems()
+ p.spawn(_run_spawned, ctx, confg, taskname)
+
+def _run_spawned(ctx,config,taskname):
+ mgr = {}
+ try:
+ log.info('In parallel, running task %s...' % taskname)
+ mgr = run_tasks.run_one_task(taskname, ctx=ctx, config=config)
+ if hasattr(mgr, '__enter__'):
+ mgr.__enter__()
+ finally:
+ exc_info = sys.exc_info()
+ if hasattr(mgr, '__exit__'):
+ mgr.__exit__(*exc_info)
+ del exc_info
--- /dev/null
+import sys
+import logging
+import contextlib
+
+from teuthology import run_tasks
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+def task(ctx, config):
+ """
+ Sequentialize a group of tasks into one executable block
+
+ example:
+ - sequential:
+ - tasktest:
+ - tasktest:
+
+ Sequential task and Parallel tasks can be nested.
+ """
+ stack = []
+ try:
+ for entry in config:
+ ((taskname, confg),) = entry.iteritems()
+ log.info('In sequential, running task %s...' % taskname)
+ mgr = run_tasks.run_one_task(taskname, ctx=ctx, config=confg)
+ if hasattr(mgr, '__enter__'):
+ mgr.__enter__()
+ stack.append(mgr)
+ finally:
+ try:
+ exc_info = sys.exc_info()
+ while stack:
+ mgr = stack.pop()
+ endr = mgr.__exit__(*exc_info)
+ finally:
+ del exc_info
--- /dev/null
+import logging
+import contextlib
+import time
+
+from ..orchestra import run
+
+log = logging.getLogger(__name__)
+
+@contextlib.contextmanager
+def task(ctx, config):
+ """
+ Task that just displays information when it is create and when it is
+ destroyed/cleaned up. This task was used to test parallel and
+ sequential task options.
+
+ example:
+
+ tasks:
+ - sequential:
+ - tasktest:
+ - id: 'foo'
+ - tasktest:
+ - id: 'bar'
+ - delay:5
+ - tasktest:
+
+ The above yaml will sequentially start a test task named foo and a test
+ task named bar. Bar will take 5 seconds to complete. After foo and bar
+ have finished, an unidentified tasktest task will run.
+ """
+ try:
+ delay = config.get('delay', 0)
+ id = config.get('id', 'UNKNOWN')
+ except AttributeError:
+ delay = 0
+ id = 'UNKNOWN'
+ try:
+ log.info('**************************************************')
+ log.info('Started task test -- %s' % id)
+ log.info('**************************************************')
+ time.sleep(delay)
+ yield
+
+ finally:
+ log.info('**************************************************')
+ log.info('Task test is being cleaned up -- %s' % id)
+ log.info('**************************************************')
+