From: Ricardo Dias Date: Thu, 8 Mar 2018 13:47:47 +0000 (+0000) Subject: mgr/dashboard: Asynchronous tasks implementation X-Git-Tag: v13.1.0~469^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=404e2516a8b49f7e05acd696dea9449c8b2907c9;p=ceph.git mgr/dashboard: Asynchronous tasks implementation Signed-off-by: Ricardo Dias --- diff --git a/src/pybind/mgr/dashboard/module.py b/src/pybind/mgr/dashboard/module.py index 6877bd86b64d9..e8bbea3fce5fe 100644 --- a/src/pybind/mgr/dashboard/module.py +++ b/src/pybind/mgr/dashboard/module.py @@ -28,7 +28,7 @@ if 'COVERAGE_ENABLED' in os.environ: from . import logger, mgr from .controllers.auth import Auth from .tools import load_controllers, json_error_page, SessionExpireAtBrowserCloseTool, \ - NotificationQueue, RequestLoggingTool + NotificationQueue, RequestLoggingTool, TaskManager from .settings import options_command_list, handle_option_command @@ -151,6 +151,7 @@ class Module(MgrModule): cherrypy.engine.start() NotificationQueue.start_queue() + TaskManager.init() logger.info('Waiting for engine...') cherrypy.engine.block() if 'COVERAGE_ENABLED' in os.environ: diff --git a/src/pybind/mgr/dashboard/tools.py b/src/pybind/mgr/dashboard/tools.py index 1f0293a821175..6d273f721cb57 100644 --- a/src/pybind/mgr/dashboard/tools.py +++ b/src/pybind/mgr/dashboard/tools.py @@ -3,7 +3,8 @@ from __future__ import absolute_import import collections -import datetime +from datetime import datetime, timedelta +import fnmatch import importlib import inspect import json @@ -327,7 +328,7 @@ class ViewCache(object): with self._view.lock: self._view.latency = t1 - t0 self._view.value = val - self._view.value_when = datetime.datetime.now() + self._view.value_when = datetime.now() self._view.getter_thread = None self._view.exception = None @@ -363,8 +364,8 @@ class ViewCache(object): :return: 2-tuple of value status code, value """ with self.lock: - now = datetime.datetime.now() - if self.value_when and now - self.value_when < datetime.timedelta( + now = datetime.now() + if self.value_when and now - self.value_when < timedelta( seconds=self.STALE_PERIOD): return ViewCache.VALUE_OK, self.value @@ -710,3 +711,245 @@ class NotificationQueue(threading.Thread): self._notify_listeners(self._queue) self._queue.clear() logger.debug("notification queue finished") + + +# pylint: disable=too-many-arguments +class TaskManager(object): + FINISHED_TASK_SIZE = 10 + FINISHED_TASK_TTL = 60.0 + + VALUE_DONE = "done" + VALUE_EXECUTING = "executing" + + _executing_tasks = set() + _finished_tasks = set() + _lock = threading.Lock() + + _task_local_data = threading.local() + + @classmethod + def init(cls): + NotificationQueue.register(cls._handle_finished_task, 'cd_task_finished') + + @classmethod + def _handle_finished_task(cls, task): + logger.info("TM: finished %s", task) + with cls._lock: + cls._executing_tasks.remove(task) + cls._finished_tasks.add(task) + + @classmethod + def run(cls, name, metadata, fn, args=None, kwargs=None, executor=None): + if not args: + args = [] + if not kwargs: + kwargs = {} + if not executor: + executor = ThreadedExecutor() + task = Task(name, metadata, fn, args, kwargs, executor) + with cls._lock: + if task in cls._executing_tasks: + logger.debug("TM: task already executing: %s", task) + for t in cls._executing_tasks: + if t == task: + return t + logger.debug("TM: created %s", task) + cls._executing_tasks.add(task) + logger.info("TM: running %s", task) + task._run() + return task + + @classmethod + def current_task(cls): + """ + Returns the current task object. + This method should only be called from a threaded task operation code. + """ + return cls._task_local_data.task + + @classmethod + def _cleanup_old_tasks(cls, task_list): + """ + The cleanup rule is: maintain the FINISHED_TASK_SIZE more recent + finished tasks, and the rest is maintained up to the FINISHED_TASK_TTL + value. + """ + now = datetime.now() + # list of finished tasks that are older than TTL + to_remove = [t for t in task_list + if now - datetime.fromtimestamp(t.end_time) > + timedelta(seconds=cls.FINISHED_TASK_TTL)] + to_remove.sort(key=lambda t: t.end_time, reverse=True) + for task in to_remove[cls.FINISHED_TASK_SIZE:]: + cls._finished_tasks.remove(task) + + @classmethod + def list(cls, name_glob=None): + executing_tasks = [] + finished_tasks = [] + with cls._lock: + for task in cls._executing_tasks: + if not name_glob or fnmatch.fnmatch(task.name, name_glob): + executing_tasks.append(task) + for task in cls._finished_tasks: + if not name_glob or fnmatch.fnmatch(task.name, name_glob): + finished_tasks.append(task) + cls._cleanup_old_tasks(finished_tasks) + executing_tasks.sort(key=lambda t: t.begin_time, reverse=True) + finished_tasks.sort(key=lambda t: t.end_time, reverse=True) + return executing_tasks, finished_tasks + + @classmethod + def list_serializable(cls, ns_glob=None): + ex_t, fn_t = cls.list(ns_glob) + return [{ + 'name': t.name, + 'metadata': t.metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), + 'progress': t.progress + } for t in ex_t if t.begin_time], [{ + 'name': t.name, + 'metadata': t.metadata, + 'begin_time': "{}Z".format(datetime.fromtimestamp(t.begin_time).isoformat()), + 'end_time': "{}Z".format(datetime.fromtimestamp(t.end_time).isoformat()), + 'duration': t.duration, + 'progress': t.progress, + 'success': not t.exception, + 'ret_value': t.ret_value, + 'exception': t.exception + } for t in fn_t] + + +class TaskExecutor(object): + def __init__(self): + self.task = None + + def init(self, task): + self.task = task + + # pylint: disable=broad-except + def start(self): + logger.debug("EX: executing task %s", self.task) + try: + self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) + except Exception as ex: + logger.exception("Error while calling %s", self.task) + self.finish(None, ex) + + def finish(self, ret_value, exception): + if not exception: + logger.debug("EX: successfully finished task: %s", self.task) + else: + logger.debug("EX: task finished with exception: %s", self.task) + self.task._complete(ret_value, exception) + + +class ThreadedExecutor(TaskExecutor): + def __init__(self): + super(ThreadedExecutor, self).__init__() + self._thread = threading.Thread(target=self._run) + + def start(self): + self._thread.start() + + # pylint: disable=broad-except + def _run(self): + TaskManager._task_local_data.task = self.task + try: + logger.debug("TEX: executing task %s", self.task) + val = self.task.fn(*self.task.fn_args, **self.task.fn_kwargs) + except Exception as ex: + logger.exception("Error while calling %s", self.task) + self.finish(None, ex) + else: + self.finish(val, None) + + +class Task(object): + def __init__(self, name, metadata, fn, args, kwargs, executor): + self.name = name + self.metadata = metadata + self.fn = fn + self.fn_args = args + self.fn_kwargs = kwargs + self.executor = executor + self.running = False + self.event = threading.Event() + self.progress = None + self.ret_value = None + self.begin_time = None + self.end_time = None + self.duration = 0 + self.exception = None + self.lock = threading.Lock() + + def __hash__(self): + return hash((self.name, tuple(sorted(self.metadata.items())))) + + def __eq__(self, other): + return self.name == self.name and self.metadata == self.metadata + + def __str__(self): + return "Task(ns={}, md={})" \ + .format(self.name, self.metadata) + + def _run(self): + with self.lock: + assert not self.running + self.executor.init(self) + self.set_progress(0, in_lock=True) + self.begin_time = time.time() + self.running = True + self.executor.start() + + def _complete(self, ret_value, exception=None): + now = time.time() + with self.lock: + assert self.running, "_complete cannot be called before _run" + self.end_time = now + self.ret_value = ret_value + self.exception = exception + self.duration = now - self.begin_time + if not self.exception: + self.set_progress(100, True) + NotificationQueue.new_notification('cd_task_finished', self) + self.event.set() + logger.debug("TK: execution of %s finished in: %s s", self, + self.duration) + + def wait(self, timeout=None): + with self.lock: + assert self.running, "wait cannot be called before _run" + ev = self.event + + success = ev.wait(timeout=timeout) + with self.lock: + if success: + # the action executed within the timeout + if self.exception: + # pylint: disable=raising-bad-type + # execution raised an exception + raise self.exception + return TaskManager.VALUE_DONE, self.ret_value + # the action is still executing + return TaskManager.VALUE_EXECUTING, None + + def inc_progress(self, delta, in_lock=False): + if not isinstance(delta, int) or delta < 0: + raise Exception("Progress delta value must be a positive integer") + if not in_lock: + self.lock.acquire() + prog = self.progress + delta + self.progress = prog if prog <= 100 else 100 + if not in_lock: + self.lock.release() + + def set_progress(self, percentage, in_lock=False): + if not isinstance(percentage, int) or percentage < 0 or percentage > 100: + raise Exception("Progress value must be in percentage " + "(0 <= percentage <= 100)") + if not in_lock: + self.lock.acquire() + self.progress = percentage + if not in_lock: + self.lock.release()