From 3094553ffb1c5555e7c6ff559a5ee25f66404aad Mon Sep 17 00:00:00 2001 From: Mykola Golub Date: Tue, 18 Feb 2020 13:27:22 +0000 Subject: [PATCH] mgr/rbd_support: schedule for running trash purge operations Signed-off-by: Mykola Golub --- src/pybind/mgr/rbd_support/module.py | 35 +++ .../mgr/rbd_support/trash_purge_schedule.py | 272 ++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 src/pybind/mgr/rbd_support/trash_purge_schedule.py diff --git a/src/pybind/mgr/rbd_support/module.py b/src/pybind/mgr/rbd_support/module.py index 5cf1d54d404..f7ee2c8ae8a 100644 --- a/src/pybind/mgr/rbd_support/module.py +++ b/src/pybind/mgr/rbd_support/module.py @@ -13,6 +13,7 @@ from .common import NotAuthorizedError from .mirror_snapshot_schedule import MirrorSnapshotScheduleHandler from .perf import PerfHandler from .task import TaskHandler +from .trash_purge_schedule import TrashPurgeScheduleHandler class Module(MgrModule): @@ -112,21 +113,52 @@ class Module(MgrModule): "name=task_id,type=CephString,req=false ", "desc": "List pending or running asynchronous tasks", "perm": "r" + }, + { + "cmd": "rbd trash purge schedule add " + "name=level_spec,type=CephString " + "name=interval,type=CephString " + "name=start_time,type=CephString,req=false ", + "desc": "Add rbd trash purge schedule", + "perm": "w" + }, + { + "cmd": "rbd trash purge schedule remove " + "name=level_spec,type=CephString " + "name=interval,type=CephString,req=false " + "name=start_time,type=CephString,req=false ", + "desc": "Remove rbd trash purge schedule", + "perm": "w" + }, + { + "cmd": "rbd trash purge schedule list " + "name=level_spec,type=CephString,req=false ", + "desc": "List rbd trash purge schedule", + "perm": "r" + }, + { + "cmd": "rbd trash purge schedule status " + "name=level_spec,type=CephString,req=false ", + "desc": "Show rbd trash purge schedule status", + "perm": "r" } ] MODULE_OPTIONS = [ {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME}, + {'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME}, ] mirror_snapshot_schedule = None perf = None task = None + trash_purge_schedule = None def __init__(self, *args, **kwargs): super(Module, self).__init__(*args, **kwargs) self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self) self.perf = PerfHandler(self) self.task = TaskHandler(self) + self.trash_purge_schedule = TrashPurgeScheduleHandler(self) def handle_command(self, inbuf, cmd): # ensure we have latest pools available @@ -142,6 +174,9 @@ class Module(MgrModule): return self.perf.handle_command(inbuf, prefix[9:], cmd) elif prefix.startswith('rbd task '): return self.task.handle_command(inbuf, prefix[9:], cmd) + elif prefix.startswith('rbd trash purge schedule '): + return self.trash_purge_schedule.handle_command( + inbuf, prefix[25:], cmd) except NotAuthorizedError: raise diff --git a/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/src/pybind/mgr/rbd_support/trash_purge_schedule.py new file mode 100644 index 00000000000..de7a09b0dda --- /dev/null +++ b/src/pybind/mgr/rbd_support/trash_purge_schedule.py @@ -0,0 +1,272 @@ +import errno +import json +import rados +import rbd +import re +import traceback + +from datetime import datetime +from threading import Condition, Lock, Thread + +from .common import get_rbd_pools +from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules + + +class TrashPurgeScheduleHandler: + MODULE_OPTION_NAME = "trash_purge_schedule" + SCHEDULE_OID = "rbd_trash_trash_purge_schedule" + + lock = Lock() + condition = Condition(lock) + thread = None + + def __init__(self, module): + self.module = module + self.log = module.log + self.last_refresh_pools = datetime(1970, 1, 1) + + self.init_schedule_queue() + + self.thread = Thread(target=self.run) + self.thread.start() + + def run(self): + try: + self.log.info("TrashPurgeScheduleHandler: starting") + while True: + self.refresh_pools() + with self.lock: + (ns_spec, wait_time) = self.dequeue() + if not ns_spec: + self.condition.wait(min(wait_time, 60)) + continue + pool_id, namespace = ns_spec + self.trash_purge(pool_id, namespace) + with self.lock: + self.enqueue(datetime.now(), pool_id, namespace) + + except Exception as ex: + self.log.fatal("Fatal runtime error: {}\n{}".format( + ex, traceback.format_exc())) + + def trash_purge(self, pool_id, namespace): + try: + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + ioctx.set_namespace(namespace) + rbd.RBD().trash_purge(ioctx, datetime.now()) + except Exception as e: + self.log.error("exception when purgin {}/{}: {}".format( + pool_id, namespace, e)) + + + def init_schedule_queue(self): + self.queue = {} + self.pools = {} + self.refresh_pools() + self.log.debug("scheduler queue is initialized") + + def load_schedules(self): + self.log.info("TrashPurgeScheduleHandler: load_schedules") + + schedules = Schedules(self) + schedules.load() + with self.lock: + self.schedules = schedules + + def refresh_pools(self): + if (datetime.now() - self.last_refresh_pools).seconds < 60: + return + + self.log.debug("TrashPurgeScheduleHandler: refresh_pools") + + self.load_schedules() + + with self.lock: + if not self.schedules: + self.images = {} + self.queue = {} + self.last_refresh_pools = datetime.now() + return + + pools = {} + + for pool_id, pool_name in get_rbd_pools(self.module).items(): + with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: + self.load_pool(ioctx, pools) + + with self.lock: + self.refresh_queue(pools) + self.pools = pools + + self.last_refresh_pools = datetime.now() + + def load_pool(self, ioctx, pools): + pool_id = str(ioctx.get_pool_id()) + pool_name = ioctx.get_pool_name() + pools[pool_id] = {} + pool_namespaces = [''] + + try: + pool_namespaces += rbd.RBD().namespace_list(ioctx) + except rbd.OperationNotSupported: + self.log.debug("namespaces not supported") + except Exception as e: + self.log.error("exception when scanning pool {}: {}".format( + pool_name, e)) + + for namespace in pool_namespaces: + pools[pool_id][namespace] = pool_name + + def rebuild_queue(self): + with self.lock: + now = datetime.now() + + # don't remove from queue "due" images + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] + + if not self.schedules: + return + + for pool_id, namespaces in self.pools.items(): + for namespace in namespaces: + self.enqueue(now, pool_id, namespace) + + self.condition.notify() + + def refresh_queue(self, current_pools): + now = datetime.now() + + for pool_id, namespaces in self.pools.items(): + for namespace in namespaces: + if pool_id not in current_pools or \ + namespace not in current_pools[pool_id]: + self.remove_from_queue(pool_id, namespace) + + for pool_id, namespaces in current_pools.items(): + for namespace in namespaces: + if pool_id not in self.pools or \ + namespace not in self.pools[pool_id]: + self.enqueue(now, pool_id, namespace) + + self.condition.notify() + + def enqueue(self, now, pool_id, namespace): + + schedule = self.schedules.find(pool_id, namespace) + if not schedule: + return + + schedule_time = schedule.next_run(now) + if schedule_time not in self.queue: + self.queue[schedule_time] = [] + self.log.debug("schedule image {}/{} at {}".format( + pool_id, namespace, schedule_time)) + ns_spec = (pool_id, namespace) + if ns_spec not in self.queue[schedule_time]: + self.queue[schedule_time].append((pool_id, namespace)) + + def dequeue(self): + if not self.queue: + return None, 1000 + + now = datetime.now() + schedule_time = sorted(self.queue)[0] + + if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: + wait_time = (datetime.strptime(schedule_time, + "%Y-%m-%d %H:%M:%S") - now) + return None, wait_time.total_seconds() + + namespaces = self.queue[schedule_time] + namespace = namespaces.pop(0) + if not namespaces: + del self.queue[schedule_time] + return namespace, 0 + + def remove_from_queue(self, pool_id, namespace): + empty_slots = [] + for schedule_time, namespaces in self.queue.items(): + if (pool_id, namespace) in namespaces: + namespaces.remove((pool_id, namespace)) + if not namespaces: + empty_slots.append(schedule_time) + for schedule_time in empty_slots: + del self.queue[schedule_time] + + def add_schedule(self, level_spec, interval, start_time): + self.log.debug( + "add_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + with self.lock: + self.schedules.add(level_spec, interval, start_time) + + # TODO: optimize to rebuild only affected part of the queue + self.rebuild_queue() + return 0, "", "" + + def remove_schedule(self, level_spec, interval, start_time): + self.log.debug( + "remove_schedule: level_spec={}, interval={}, start_time={}".format( + level_spec.name, interval, start_time)) + + with self.lock: + self.schedules.remove(level_spec, interval, start_time) + + # TODO: optimize to rebuild only affected part of the queue + self.rebuild_queue() + return 0, "", "" + + def list(self, level_spec): + self.log.debug("list: level_spec={}".format(level_spec.name)) + + with self.lock: + result = self.schedules.to_list(level_spec) + + return 0, json.dumps(result, indent=4, sort_keys=True), "" + + def status(self, level_spec): + self.log.debug("status: level_spec={}".format(level_spec.name)) + + scheduled = [] + with self.lock: + for schedule_time in sorted(self.queue): + for pool_id, namespace in self.queue[schedule_time]: + if not level_spec.matches(pool_id, namespace): + continue + pool_name = self.pools[pool_id][namespace] + scheduled.append({ + 'schedule_time' : schedule_time, + 'pool_id' : pool_id, + 'pool_name' : pool_name, + 'namespace' : namespace + }) + return 0, json.dumps({'scheduled' : scheduled}, indent=4, + sort_keys=True), "" + + def handle_command(self, inbuf, prefix, cmd): + level_spec_name = cmd.get('level_spec', "") + + try: + level_spec = LevelSpec.from_name(self, level_spec_name, + allow_image_level=False) + except ValueError as e: + return -errno.EINVAL, '', "Invalid level spec {}: {}".format( + level_spec_name, e) + + if prefix == 'add': + return self.add_schedule(level_spec, cmd['interval'], + cmd.get('start_time')) + elif prefix == 'remove': + return self.remove_schedule(level_spec, cmd.get('interval'), + cmd.get('start_time')) + elif prefix == 'list': + return self.list(level_spec) + elif prefix == 'status': + return self.status(level_spec) + + raise NotImplementedError(cmd['prefix']) -- 2.39.5