]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: schedule for running trash purge operations
authorMykola Golub <mgolub@suse.com>
Tue, 18 Feb 2020 13:27:22 +0000 (13:27 +0000)
committerMykola Golub <mgolub@suse.com>
Mon, 24 Feb 2020 13:40:28 +0000 (13:40 +0000)
Signed-off-by: Mykola Golub <mgolub@suse.com>
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/trash_purge_schedule.py [new file with mode: 0644]

index 5cf1d54d4049d723582a012019e6196a8965c373..f7ee2c8ae8a923250e753d50d0d843011d5fc630 100644 (file)
@@ -13,6 +13,7 @@ from .common import NotAuthorizedError
 from .mirror_snapshot_schedule import MirrorSnapshotScheduleHandler
 from .perf import PerfHandler
 from .task import TaskHandler
+from .trash_purge_schedule import TrashPurgeScheduleHandler
 
 
 class Module(MgrModule):
@@ -112,21 +113,52 @@ class Module(MgrModule):
                    "name=task_id,type=CephString,req=false ",
             "desc": "List pending or running asynchronous tasks",
             "perm": "r"
+        },
+        {
+            "cmd": "rbd trash purge schedule add "
+                   "name=level_spec,type=CephString "
+                   "name=interval,type=CephString "
+                   "name=start_time,type=CephString,req=false ",
+            "desc": "Add rbd trash purge schedule",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd trash purge schedule remove "
+                   "name=level_spec,type=CephString "
+                   "name=interval,type=CephString,req=false "
+                   "name=start_time,type=CephString,req=false ",
+            "desc": "Remove rbd trash purge schedule",
+            "perm": "w"
+        },
+        {
+            "cmd": "rbd trash purge schedule list "
+                   "name=level_spec,type=CephString,req=false ",
+            "desc": "List rbd trash purge schedule",
+            "perm": "r"
+        },
+        {
+            "cmd": "rbd trash purge schedule status "
+                   "name=level_spec,type=CephString,req=false ",
+            "desc": "Show rbd trash purge schedule status",
+            "perm": "r"
         }
     ]
     MODULE_OPTIONS = [
         {'name': MirrorSnapshotScheduleHandler.MODULE_OPTION_NAME},
+        {'name': TrashPurgeScheduleHandler.MODULE_OPTION_NAME},
     ]
 
     mirror_snapshot_schedule = None
     perf = None
     task = None
+    trash_purge_schedule = None
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
         self.perf = PerfHandler(self)
         self.task = TaskHandler(self)
+        self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
 
     def handle_command(self, inbuf, cmd):
         # ensure we have latest pools available
@@ -142,6 +174,9 @@ class Module(MgrModule):
                     return self.perf.handle_command(inbuf, prefix[9:], cmd)
                 elif prefix.startswith('rbd task '):
                     return self.task.handle_command(inbuf, prefix[9:], cmd)
+                elif prefix.startswith('rbd trash purge schedule '):
+                    return self.trash_purge_schedule.handle_command(
+                        inbuf, prefix[25:], cmd)
 
             except NotAuthorizedError:
                 raise
diff --git a/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/src/pybind/mgr/rbd_support/trash_purge_schedule.py
new file mode 100644 (file)
index 0000000..de7a09b
--- /dev/null
@@ -0,0 +1,272 @@
+import errno
+import json
+import rados
+import rbd
+import re
+import traceback
+
+from datetime import datetime
+from threading import Condition, Lock, Thread
+
+from .common import get_rbd_pools
+from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+
+
+class TrashPurgeScheduleHandler:
+    MODULE_OPTION_NAME = "trash_purge_schedule"
+    SCHEDULE_OID = "rbd_trash_trash_purge_schedule"
+
+    lock = Lock()
+    condition = Condition(lock)
+    thread = None
+
+    def __init__(self, module):
+        self.module = module
+        self.log = module.log
+        self.last_refresh_pools = datetime(1970, 1, 1)
+
+        self.init_schedule_queue()
+
+        self.thread = Thread(target=self.run)
+        self.thread.start()
+
+    def run(self):
+        try:
+            self.log.info("TrashPurgeScheduleHandler: starting")
+            while True:
+                self.refresh_pools()
+                with self.lock:
+                    (ns_spec, wait_time) = self.dequeue()
+                    if not ns_spec:
+                        self.condition.wait(min(wait_time, 60))
+                        continue
+                pool_id, namespace = ns_spec
+                self.trash_purge(pool_id, namespace)
+                with self.lock:
+                    self.enqueue(datetime.now(), pool_id, namespace)
+
+        except Exception as ex:
+            self.log.fatal("Fatal runtime error: {}\n{}".format(
+                ex, traceback.format_exc()))
+
+    def trash_purge(self, pool_id, namespace):
+        try:
+            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                ioctx.set_namespace(namespace)
+                rbd.RBD().trash_purge(ioctx, datetime.now())
+        except Exception as e:
+            self.log.error("exception when purgin {}/{}: {}".format(
+                pool_id, namespace, e))
+
+
+    def init_schedule_queue(self):
+        self.queue = {}
+        self.pools = {}
+        self.refresh_pools()
+        self.log.debug("scheduler queue is initialized")
+
+    def load_schedules(self):
+        self.log.info("TrashPurgeScheduleHandler: load_schedules")
+
+        schedules = Schedules(self)
+        schedules.load()
+        with self.lock:
+            self.schedules = schedules
+
+    def refresh_pools(self):
+        if (datetime.now() - self.last_refresh_pools).seconds < 60:
+            return
+
+        self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
+
+        self.load_schedules()
+
+        with self.lock:
+            if not self.schedules:
+                self.images = {}
+                self.queue = {}
+                self.last_refresh_pools = datetime.now()
+                return
+
+        pools = {}
+
+        for pool_id, pool_name in get_rbd_pools(self.module).items():
+            with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
+                self.load_pool(ioctx, pools)
+
+        with self.lock:
+            self.refresh_queue(pools)
+            self.pools = pools
+
+        self.last_refresh_pools = datetime.now()
+
+    def load_pool(self, ioctx, pools):
+        pool_id = str(ioctx.get_pool_id())
+        pool_name = ioctx.get_pool_name()
+        pools[pool_id] = {}
+        pool_namespaces = ['']
+
+        try:
+            pool_namespaces += rbd.RBD().namespace_list(ioctx)
+        except rbd.OperationNotSupported:
+            self.log.debug("namespaces not supported")
+        except Exception as e:
+            self.log.error("exception when scanning pool {}: {}".format(
+                pool_name, e))
+
+        for namespace in pool_namespaces:
+            pools[pool_id][namespace] = pool_name
+
+    def rebuild_queue(self):
+        with self.lock:
+            now = datetime.now()
+
+            # don't remove from queue "due" images
+            now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+
+            for schedule_time in list(self.queue):
+                if schedule_time > now_string:
+                    del self.queue[schedule_time]
+
+            if not self.schedules:
+                return
+
+            for pool_id, namespaces in self.pools.items():
+                for namespace in namespaces:
+                    self.enqueue(now, pool_id, namespace)
+
+            self.condition.notify()
+
+    def refresh_queue(self, current_pools):
+        now = datetime.now()
+
+        for pool_id, namespaces in self.pools.items():
+            for namespace in namespaces:
+                if pool_id not in current_pools or \
+                   namespace not in current_pools[pool_id]:
+                    self.remove_from_queue(pool_id, namespace)
+
+        for pool_id, namespaces in current_pools.items():
+            for namespace in namespaces:
+                if pool_id not in self.pools or \
+                   namespace not in self.pools[pool_id]:
+                    self.enqueue(now, pool_id, namespace)
+
+        self.condition.notify()
+
+    def enqueue(self, now, pool_id, namespace):
+
+        schedule = self.schedules.find(pool_id, namespace)
+        if not schedule:
+            return
+
+        schedule_time = schedule.next_run(now)
+        if schedule_time not in self.queue:
+            self.queue[schedule_time] = []
+        self.log.debug("schedule image {}/{} at {}".format(
+            pool_id, namespace, schedule_time))
+        ns_spec = (pool_id, namespace)
+        if ns_spec not in self.queue[schedule_time]:
+            self.queue[schedule_time].append((pool_id, namespace))
+
+    def dequeue(self):
+        if not self.queue:
+            return None, 1000
+
+        now = datetime.now()
+        schedule_time = sorted(self.queue)[0]
+
+        if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
+            wait_time = (datetime.strptime(schedule_time,
+                                           "%Y-%m-%d %H:%M:%S") - now)
+            return None, wait_time.total_seconds()
+
+        namespaces = self.queue[schedule_time]
+        namespace = namespaces.pop(0)
+        if not namespaces:
+            del self.queue[schedule_time]
+        return namespace, 0
+
+    def remove_from_queue(self, pool_id, namespace):
+        empty_slots = []
+        for schedule_time, namespaces in self.queue.items():
+            if (pool_id, namespace) in namespaces:
+                namespaces.remove((pool_id, namespace))
+                if not namespaces:
+                    empty_slots.append(schedule_time)
+        for schedule_time in empty_slots:
+            del self.queue[schedule_time]
+
+    def add_schedule(self, level_spec, interval, start_time):
+        self.log.debug(
+            "add_schedule: level_spec={}, interval={}, start_time={}".format(
+                level_spec.name, interval, start_time))
+
+        with self.lock:
+            self.schedules.add(level_spec, interval, start_time)
+
+        # TODO: optimize to rebuild only affected part of the queue
+        self.rebuild_queue()
+        return 0, "", ""
+
+    def remove_schedule(self, level_spec, interval, start_time):
+        self.log.debug(
+            "remove_schedule: level_spec={}, interval={}, start_time={}".format(
+                level_spec.name, interval, start_time))
+
+        with self.lock:
+            self.schedules.remove(level_spec, interval, start_time)
+
+        # TODO: optimize to rebuild only affected part of the queue
+        self.rebuild_queue()
+        return 0, "", ""
+
+    def list(self, level_spec):
+        self.log.debug("list: level_spec={}".format(level_spec.name))
+
+        with self.lock:
+            result = self.schedules.to_list(level_spec)
+
+        return 0, json.dumps(result, indent=4, sort_keys=True), ""
+
+    def status(self, level_spec):
+        self.log.debug("status: level_spec={}".format(level_spec.name))
+
+        scheduled = []
+        with self.lock:
+            for schedule_time in sorted(self.queue):
+                for pool_id, namespace in self.queue[schedule_time]:
+                    if not level_spec.matches(pool_id, namespace):
+                        continue
+                    pool_name = self.pools[pool_id][namespace]
+                    scheduled.append({
+                        'schedule_time' : schedule_time,
+                        'pool_id' : pool_id,
+                        'pool_name' : pool_name,
+                        'namespace' : namespace
+                    })
+        return 0, json.dumps({'scheduled' : scheduled}, indent=4,
+                             sort_keys=True), ""
+
+    def handle_command(self, inbuf, prefix, cmd):
+        level_spec_name = cmd.get('level_spec', "")
+
+        try:
+            level_spec = LevelSpec.from_name(self, level_spec_name,
+                                             allow_image_level=False)
+        except ValueError as e:
+            return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
+                level_spec_name, e)
+
+        if prefix == 'add':
+            return self.add_schedule(level_spec, cmd['interval'],
+                                     cmd.get('start_time'))
+        elif prefix == 'remove':
+            return self.remove_schedule(level_spec, cmd.get('interval'),
+                                        cmd.get('start_time'))
+        elif prefix == 'list':
+            return self.list(level_spec)
+        elif prefix == 'status':
+            return self.status(level_spec)
+
+        raise NotImplementedError(cmd['prefix'])