from __future__ import absolute_import
import collections
-import datetime
+from datetime import datetime, timedelta
+import fnmatch
import importlib
import inspect
import json
with self._view.lock:
self._view.latency = t1 - t0
self._view.value = val
- self._view.value_when = datetime.datetime.now()
+ self._view.value_when = datetime.now()
self._view.getter_thread = None
self._view.exception = None
:return: 2-tuple of value status code, value
"""
with self.lock:
- now = datetime.datetime.now()
- if self.value_when and now - self.value_when < datetime.timedelta(
+ now = datetime.now()
+ if self.value_when and now - self.value_when < timedelta(
seconds=self.STALE_PERIOD):
return ViewCache.VALUE_OK, self.value
self._notify_listeners(self._queue)
self._queue.clear()
logger.debug("notification queue finished")
+
+
+# pylint: disable=too-many-arguments
+class TaskManager(object):
+ FINISHED_TASK_SIZE = 10
+ FINISHED_TASK_TTL = 60.0
+
+ VALUE_DONE = "done"
+ VALUE_EXECUTING = "executing"
+
+ _executing_tasks = set()
+ _finished_tasks = set()
+ _lock = threading.Lock()
+
+ _task_local_data = threading.local()
+
+ @classmethod
+ def init(cls):
+ NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished')
+
+ @classmethod
+ def _handle_finished_task(cls, task):
+ logger.info("TM: finished %s", task)
+ with cls._lock:
+ cls._executing_tasks.remove(task)
+ cls._finished_tasks.add(task)
+
+ @classmethod
+ def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None):
+ if not args:
+ args = []
+ if not kwargs:
+ kwargs = {}
+ if not executor:
+ executor = ThreadedExecutor()
+ task = Task(name, metadata, fn, args, kwargs, executor)
+ with cls._lock:
+ if task in cls._executing_tasks:
+ logger.debug("TM: task already executing: %s", task)
+ for t in cls._executing_tasks:
+ if t == task:
+ return t
+ logger.debug("TM: created %s", task)
+ cls._executing_tasks.add(task)
+ logger.info("TM: running %s", task)
+ task._run()
+ return task
+
+ @classmethod
+ def current_task(cls):
+ """
+ Returns the current task object.
+ This method should only be called from a threaded task operation code.
+ """
+ return cls._task_local_data.task
+
+ @classmethod
+ def _cleanup_old_tasks(cls, task_list):
+ """
+ The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent
+ finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL
+ value.
+ """
+ now = datetime.now()
+ # list of finished tasks that are older than TTL
+ to_remove = [t for t in task_list
+ if now - datetime.fromtimestamp(t.end_time) >
+ timedelta(seconds=cls.FINISHED_TASK_TTL)]
+ to_remove.sort(key=lambda t: t.end_time, reverse=True)
+ for task in to_remove[cls.FINISHED_TASK_SIZE:]:
+ cls._finished_tasks.remove(task)
+
+ @classmethod
+ def list(cls, name_glob=None):
+ executing_tasks = []
+ finished_tasks = []
+ with cls._lock:
+ for task in cls._executing_tasks:
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ executing_tasks.append(task)
+ for task in cls._finished_tasks:
+ if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+ finished_tasks.append(task)
+ cls._cleanup_old_tasks(finished_tasks)
+ executing_tasks.sort(key=lambda t: t.begin_time, reverse=True)
+ finished_tasks.sort(key=lambda t: t.end_time, reverse=True)
+ return executing_tasks, finished_tasks
+
+ @classmethod
+ def list_serializable(cls, ns_glob=None):
+ ex_t, fn_t = cls.list(ns_glob)
+ return [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'progress': t.progress
+ } for t in ex_t if t.begin_time], [{
+ 'name': t.name,
+ 'metadata': t.metadata,
+ 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+ 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()),
+ 'duration': t.duration,
+ 'progress': t.progress,
+ 'success': not t.exception,
+ 'ret_value': t.ret_value,
+ 'exception': t.exception
+ } for t in fn_t]
+
+
+class TaskExecutor(object):
+ def __init__(self):
+ self.task = None
+
+ def init(self, task):
+ self.task = task
+
+ # pylint: disable=broad-except
+ def start(self):
+ logger.debug("EX: executing task %s", self.task)
+ try:
+ self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ except Exception as ex:
+ logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+
+ def finish(self, ret_value, exception):
+ if not exception:
+ logger.debug("EX: successfully finished task: %s", self.task)
+ else:
+ logger.debug("EX: task finished with exception: %s", self.task)
+ self.task._complete(ret_value, exception)
+
+
+class ThreadedExecutor(TaskExecutor):
+ def __init__(self):
+ super(ThreadedExecutor, self).__init__()
+ self._thread = threading.Thread(target=self._run)
+
+ def start(self):
+ self._thread.start()
+
+ # pylint: disable=broad-except
+ def _run(self):
+ TaskManager._task_local_data.task = self.task
+ try:
+ logger.debug("TEX: executing task %s", self.task)
+ val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+ except Exception as ex:
+ logger.exception("Error while calling %s", self.task)
+ self.finish(None, ex)
+ else:
+ self.finish(val, None)
+
+
+class Task(object):
+ def __init__(self, name, metadata, fn, args, kwargs, executor):
+ self.name = name
+ self.metadata = metadata
+ self.fn = fn
+ self.fn_args = args
+ self.fn_kwargs = kwargs
+ self.executor = executor
+ self.running = False
+ self.event = threading.Event()
+ self.progress = None
+ self.ret_value = None
+ self.begin_time = None
+ self.end_time = None
+ self.duration = 0
+ self.exception = None
+ self.lock = threading.Lock()
+
+ def __hash__(self):
+ return hash((self.name, tuple(sorted(self.metadata.items()))))
+
+ def __eq__(self, other):
+ return self.name == self.name and self.metadata == self.metadata
+
+ def __str__(self):
+ return "Task(ns={}, md={})" \
+ .format(self.name, self.metadata)
+
+ def _run(self):
+ with self.lock:
+ assert not self.running
+ self.executor.init(self)
+ self.set_progress(0, in_lock=True)
+ self.begin_time = time.time()
+ self.running = True
+ self.executor.start()
+
+ def _complete(self, ret_value, exception=None):
+ now = time.time()
+ with self.lock:
+ assert self.running, "_complete cannot be called before _run"
+ self.end_time = now
+ self.ret_value = ret_value
+ self.exception = exception
+ self.duration = now - self.begin_time
+ if not self.exception:
+ self.set_progress(100, True)
+ NotificationQueue.new_notification('cd_task_finished', self)
+ self.event.set()
+ logger.debug("TK: execution of %s finished in: %s s", self,
+ self.duration)
+
+ def wait(self, timeout=None):
+ with self.lock:
+ assert self.running, "wait cannot be called before _run"
+ ev = self.event
+
+ success = ev.wait(timeout=timeout)
+ with self.lock:
+ if success:
+ # the action executed within the timeout
+ if self.exception:
+ # pylint: disable=raising-bad-type
+ # execution raised an exception
+ raise self.exception
+ return TaskManager.VALUE_DONE, self.ret_value
+ # the action is still executing
+ return TaskManager.VALUE_EXECUTING, None
+
+ def inc_progress(self, delta, in_lock=False):
+ if not isinstance(delta, int) or delta < 0:
+ raise Exception("Progress delta value must be a positive integer")
+ if not in_lock:
+ self.lock.acquire()
+ prog = self.progress + delta
+ self.progress = prog if prog <= 100 else 100
+ if not in_lock:
+ self.lock.release()
+
+ def set_progress(self, percentage, in_lock=False):
+ if not isinstance(percentage, int) or percentage < 0 or percentage > 100:
+ raise Exception("Progress value must be in percentage "
+ "(0 <= percentage <= 100)")
+ if not in_lock:
+ self.lock.acquire()
+ self.progress = percentage
+ if not in_lock:
+ self.lock.release()