]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/dashboard: Asynchronous tasks implementation
authorRicardo Dias <rdias@suse.com>
Thu, 8 Mar 2018 13:47:47 +0000 (13:47 +0000)
committerRicardo Dias <rdias@suse.com>
Tue, 27 Mar 2018 13:08:47 +0000 (14:08 +0100)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/pybind/mgr/dashboard/module.py
src/pybind/mgr/dashboard/tools.py

index 6877bd86b64d969a6fc4a7cbc5e98d6844365767..e8bbea3fce5fe2a11cf8809194d5e1ff7ea3334d 100644 (file)
@@ -28,7 +28,7 @@ if 'COVERAGE_ENABLED' in os.environ:
 from . import logger, mgr
 from .controllers.auth import Auth
 from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool, \
-                   NotificationQueue, RequestLoggingTool
+                   NotificationQueue, RequestLoggingTool, TaskManager
 from .settings import options_command_list, handle_option_command
 
 
@@ -151,6 +151,7 @@ class Module(MgrModule):
 
         cherrypy.engine.start()
         NotificationQueue.start_queue()
+        TaskManager.init()
         logger.info('Waiting for engine...')
         cherrypy.engine.block()
         if 'COVERAGE_ENABLED' in os.environ:
index 1f0293a82117541d3f4142a87b0b23bf1bd08d79..6d273f721cb57e7e8c3c68b3b282a8be17da136d 100644 (file)
@@ -3,7 +3,8 @@
 from __future__ import absolute_import
 
 import collections
-import datetime
+from datetime import datetime, timedelta
+import fnmatch
 import importlib
 import inspect
 import json
@@ -327,7 +328,7 @@ class ViewCache(object):
                 with self._view.lock:
                     self._view.latency = t1 - t0
                     self._view.value = val
-                    self._view.value_when = datetime.datetime.now()
+                    self._view.value_when = datetime.now()
                     self._view.getter_thread = None
                     self._view.exception = None
 
@@ -363,8 +364,8 @@ class ViewCache(object):
             :return: 2-tuple of value status code, value
             """
             with self.lock:
-                now = datetime.datetime.now()
-                if self.value_when and now - self.value_when < datetime.timedelta(
+                now = datetime.now()
+                if self.value_when and now - self.value_when < timedelta(
                         seconds=self.STALE_PERIOD):
                     return ViewCache.VALUE_OK, self.value
 
@@ -710,3 +711,245 @@ class NotificationQueue(threading.Thread):
         self._notify_listeners(self._queue)
         self._queue.clear()
         logger.debug("notification queue finished")
+
+
+# pylint: disable=too-many-arguments
+class TaskManager(object):
+    FINISHED_TASK_SIZE = 10
+    FINISHED_TASK_TTL = 60.0
+
+    VALUE_DONE = "done"
+    VALUE_EXECUTING = "executing"
+
+    _executing_tasks = set()
+    _finished_tasks = set()
+    _lock = threading.Lock()
+
+    _task_local_data = threading.local()
+
+    @classmethod
+    def init(cls):
+        NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished')
+
+    @classmethod
+    def _handle_finished_task(cls, task):
+        logger.info("TM: finished %s", task)
+        with cls._lock:
+            cls._executing_tasks.remove(task)
+            cls._finished_tasks.add(task)
+
+    @classmethod
+    def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None):
+        if not args:
+            args = []
+        if not kwargs:
+            kwargs = {}
+        if not executor:
+            executor = ThreadedExecutor()
+        task = Task(name, metadata, fn, args, kwargs, executor)
+        with cls._lock:
+            if task in cls._executing_tasks:
+                logger.debug("TM: task already executing: %s", task)
+                for t in cls._executing_tasks:
+                    if t == task:
+                        return t
+            logger.debug("TM: created %s", task)
+            cls._executing_tasks.add(task)
+        logger.info("TM: running %s", task)
+        task._run()
+        return task
+
+    @classmethod
+    def current_task(cls):
+        """
+        Returns the current task object.
+        This method should only be called from a threaded task operation code.
+        """
+        return cls._task_local_data.task
+
+    @classmethod
+    def _cleanup_old_tasks(cls, task_list):
+        """
+        The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent
+        finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL
+        value.
+        """
+        now = datetime.now()
+        # list of finished tasks that are older than TTL
+        to_remove = [t for t in task_list
+                     if now - datetime.fromtimestamp(t.end_time) >
+                     timedelta(seconds=cls.FINISHED_TASK_TTL)]
+        to_remove.sort(key=lambda t: t.end_time, reverse=True)
+        for task in to_remove[cls.FINISHED_TASK_SIZE:]:
+            cls._finished_tasks.remove(task)
+
+    @classmethod
+    def list(cls, name_glob=None):
+        executing_tasks = []
+        finished_tasks = []
+        with cls._lock:
+            for task in cls._executing_tasks:
+                if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+                    executing_tasks.append(task)
+            for task in cls._finished_tasks:
+                if not name_glob or fnmatch.fnmatch(task.name, name_glob):
+                    finished_tasks.append(task)
+            cls._cleanup_old_tasks(finished_tasks)
+        executing_tasks.sort(key=lambda t: t.begin_time, reverse=True)
+        finished_tasks.sort(key=lambda t: t.end_time, reverse=True)
+        return executing_tasks, finished_tasks
+
+    @classmethod
+    def list_serializable(cls, ns_glob=None):
+        ex_t, fn_t = cls.list(ns_glob)
+        return [{
+            'name': t.name,
+            'metadata': t.metadata,
+            'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+            'progress': t.progress
+        } for t in ex_t if t.begin_time], [{
+            'name': t.name,
+            'metadata': t.metadata,
+            'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()),
+            'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()),
+            'duration': t.duration,
+            'progress': t.progress,
+            'success': not t.exception,
+            'ret_value': t.ret_value,
+            'exception': t.exception
+        } for t in fn_t]
+
+
+class TaskExecutor(object):
+    def __init__(self):
+        self.task = None
+
+    def init(self, task):
+        self.task = task
+
+    # pylint: disable=broad-except
+    def start(self):
+        logger.debug("EX: executing task %s", self.task)
+        try:
+            self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+        except Exception as ex:
+            logger.exception("Error while calling %s", self.task)
+            self.finish(None, ex)
+
+    def finish(self, ret_value, exception):
+        if not exception:
+            logger.debug("EX: successfully finished task: %s", self.task)
+        else:
+            logger.debug("EX: task finished with exception: %s", self.task)
+        self.task._complete(ret_value, exception)
+
+
+class ThreadedExecutor(TaskExecutor):
+    def __init__(self):
+        super(ThreadedExecutor, self).__init__()
+        self._thread = threading.Thread(target=self._run)
+
+    def start(self):
+        self._thread.start()
+
+    # pylint: disable=broad-except
+    def _run(self):
+        TaskManager._task_local_data.task = self.task
+        try:
+            logger.debug("TEX: executing task %s", self.task)
+            val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs)
+        except Exception as ex:
+            logger.exception("Error while calling %s", self.task)
+            self.finish(None, ex)
+        else:
+            self.finish(val, None)
+
+
+class Task(object):
+    def __init__(self, name, metadata, fn, args, kwargs, executor):
+        self.name = name
+        self.metadata = metadata
+        self.fn = fn
+        self.fn_args = args
+        self.fn_kwargs = kwargs
+        self.executor = executor
+        self.running = False
+        self.event = threading.Event()
+        self.progress = None
+        self.ret_value = None
+        self.begin_time = None
+        self.end_time = None
+        self.duration = 0
+        self.exception = None
+        self.lock = threading.Lock()
+
+    def __hash__(self):
+        return hash((self.name, tuple(sorted(self.metadata.items()))))
+
+    def __eq__(self, other):
+        return self.name == self.name and self.metadata == self.metadata
+
+    def __str__(self):
+        return "Task(ns={}, md={})" \
+               .format(self.name, self.metadata)
+
+    def _run(self):
+        with self.lock:
+            assert not self.running
+            self.executor.init(self)
+            self.set_progress(0, in_lock=True)
+            self.begin_time = time.time()
+            self.running = True
+        self.executor.start()
+
+    def _complete(self, ret_value, exception=None):
+        now = time.time()
+        with self.lock:
+            assert self.running, "_complete cannot be called before _run"
+            self.end_time = now
+            self.ret_value = ret_value
+            self.exception = exception
+            self.duration = now - self.begin_time
+            if not self.exception:
+                self.set_progress(100, True)
+        NotificationQueue.new_notification('cd_task_finished', self)
+        self.event.set()
+        logger.debug("TK: execution of %s finished in: %s s", self,
+                     self.duration)
+
+    def wait(self, timeout=None):
+        with self.lock:
+            assert self.running, "wait cannot be called before _run"
+            ev = self.event
+
+        success = ev.wait(timeout=timeout)
+        with self.lock:
+            if success:
+                # the action executed within the timeout
+                if self.exception:
+                    # pylint: disable=raising-bad-type
+                    # execution raised an exception
+                    raise self.exception
+                return TaskManager.VALUE_DONE, self.ret_value
+            # the action is still executing
+            return TaskManager.VALUE_EXECUTING, None
+
+    def inc_progress(self, delta, in_lock=False):
+        if not isinstance(delta, int) or delta < 0:
+            raise Exception("Progress delta value must be a positive integer")
+        if not in_lock:
+            self.lock.acquire()
+        prog = self.progress + delta
+        self.progress = prog if prog <= 100 else 100
+        if not in_lock:
+            self.lock.release()
+
+    def set_progress(self, percentage, in_lock=False):
+        if not isinstance(percentage, int) or percentage < 0 or percentage > 100:
+            raise Exception("Progress value must be in percentage "
+                            "(0 <= percentage <= 100)")
+        if not in_lock:
+            self.lock.acquire()
+        self.progress = percentage
+        if not in_lock:
+            self.lock.release()