]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/osd_support: new module for osd utility calls
authorJoshua Schmid <jschmid@suse.de>
Tue, 14 Jan 2020 17:54:10 +0000 (18:54 +0100)
committerJoshua Schmid <jschmid@suse.de>
Wed, 19 Feb 2020 07:58:19 +0000 (08:58 +0100)
it lets you start/stop and monitor osd draining processes

Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/osd_support/__init__.py [new file with mode: 0644]
src/pybind/mgr/osd_support/module.py [new file with mode: 0644]
src/pybind/mgr/tox.ini

diff --git a/src/pybind/mgr/osd_support/__init__.py b/src/pybind/mgr/osd_support/__init__.py
new file mode 100644 (file)
index 0000000..88ed2b9
--- /dev/null
@@ -0,0 +1 @@
+from .module import OSDSupport
diff --git a/src/pybind/mgr/osd_support/module.py b/src/pybind/mgr/osd_support/module.py
new file mode 100644 (file)
index 0000000..bbdebc1
--- /dev/null
@@ -0,0 +1,326 @@
+from typing import List, Set, Optional
+from mgr_module import MgrModule, HandleCommandResult
+from threading import Event
+import json
+import errno
+
+"""
+A module that holds a variety of useful utility to deal with OSDs
+
+Draining OSDs:
+
+This module can be used to drain osds from the commandline:
+
+$ ceph osd drain $osd_ids ([$osd_ids])
+
+You can monitor the progress with
+
+$ ceph osd drain status
+
+Gives you the status of scheduled and running osd drain operations
+
+[{'osd_id': 0, 'pgs': 1234}, ..]
+
+$ ceph osd drain stop ([$osd_ids])
+
+Stops all !SCHEDULED! osd drain operations (not the operations that have been started already)
+if no $osd_ids are give. If $osd_ids are present it only operates on them.
+To stop and reset the weight of already started operations we need to save the initial weight 
+(see 'Ideas for improvement')
+
+
+Ideas for improvement:
+- add health checks set_health_checks
+- use objects to represent OSDs
+  - allows timestamps, trending information etc
+- save osd drain state (at least the osd_ids in the mon store)
+  - resume after a mgr crash
+  - save the initial weight of a osd i.e. (set to initial weight on abort)
+"""
+
+
+class OSDSupport(MgrModule):
+    # these are CLI commands we implement
+    COMMANDS = [
+        {
+            "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
+            "desc": "drain osd ids",
+            "perm": "r"
+        },
+        {
+            "cmd": "osd drain status",
+            "desc": "show status",
+            "perm": "r"
+        },
+        {
+            "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
+            "desc": "show status for osds. Stopping all if osd_ids are omitted",
+            "perm": "r"
+        },
+    ]
+
+    # These are module options we understand.  These can be set with
+    #
+    #   ceph config set global mgr/hello/<name> <value>
+    #
+    # e.g.,
+    #
+    #   ceph config set global mgr/hello/place Earth
+    #
+    MODULE_OPTIONS: List[dict] = []
+
+    # These are "native" Ceph options that this module cares about.
+    NATIVE_OPTIONS: List[str] = []
+
+    osd_ids: Set[int] = set()
+    emptying_osds: Set[int] = set()
+    check_osds: Set[int] = set()
+
+    def __init__(self, *args, **kwargs):
+        super(OSDSupport, self).__init__(*args, **kwargs)
+
+        # set up some members to enable the serve() method and shutdown()
+        self.run = True
+        self.event = Event()
+
+        # ensure config options members are initialized; see config_notify()
+        self.config_notify()
+
+    def config_notify(self):
+        """
+        This method is called whenever one of our config options is changed.
+        """
+        # This is some boilerplate that stores MODULE_OPTIONS in a class
+        # member, so that, for instance, the 'emphatic' option is always
+        # available as 'self.emphatic'.
+        for opt in self.MODULE_OPTIONS:
+            setattr(self,
+                    opt['name'],
+                    self.get_module_option(opt['name']) or opt['default'])
+            self.log.debug(' mgr option %s = %s',
+                           opt['name'], getattr(self, opt['name']))
+        # Do the same for the native options.
+        for _opt in self.NATIVE_OPTIONS:
+            setattr(self,
+                    _opt,
+                    self.get_ceph_option(_opt))
+            self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
+
+    def handle_command(self, inbuf, cmd):
+        ret = 0
+        err = ''
+        _ = inbuf
+        cmd_prefix = cmd.get('prefix', '')
+        osd_ids: List[int] = cmd.get('osd_ids', list())
+        not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
+        if cmd_prefix == 'osd drain':
+            if not_found_osds:
+                return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
+            # add osd_ids to set
+            for osd_id in osd_ids:
+                if osd_id not in self.emptying_osds:
+                    self.osd_ids.add(osd_id)
+            self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
+            out = 'Started draining OSDs. Query progress with <ceph drain status>'
+
+        elif cmd_prefix == 'osd drain status':
+            # re-initialize it with an empty set on invocation (long running processes)
+            self.check_osds = set()
+            # assemble a set of emptying osds and to_be_emptied osds
+            self.check_osds.update(self.emptying_osds)
+            self.check_osds.update(self.osd_ids)
+
+            report = list()
+            for osd_id in self.check_osds:
+                pgs = self.get_pg_count(osd_id)
+                report.append(dict(osd_id=osd_id, pgs=pgs))
+            out = f"{report}"
+
+        elif cmd_prefix == 'osd drain stop':
+            if not osd_ids:
+                self.log.debug("No osd_ids provided, stop all pending drain operations)")
+                self.osd_ids = set()
+                self.emptying_osds = set()
+
+                # this is just a poor-man's solution as it will not really stop draining
+                # the osds. It will just stop the queries and also prevent any scheduled OSDs
+                # from getting drained at a later point in time.
+                out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
+
+            else:
+                if not_found_osds:
+                    return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
+
+                self.osd_ids = self.osd_ids.difference(osd_ids)
+                self.emptying_osds = self.emptying_osds.difference(osd_ids)
+                out = f"Stopped draining operations for OSD(s): {osd_ids}"
+
+        else:
+            return HandleCommandResult(
+                retval=-errno.EINVAL,
+                stdout='',
+                stderr=f"Command not found <{cmd.get('prefix', '')}>")
+        return HandleCommandResult(
+            retval=ret,   # exit code
+            stdout=out,   # stdout
+            stderr=err)
+
+    def serve(self):
+        """
+        This method is called by the mgr when the module starts and can be
+        used for any background activity.
+        """
+        self.log.info("Starting mgr/osd_support")
+        while self.run:
+            # Do some useful background work here.
+
+            self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
+            self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
+            # the state should be saved to the mon store in the actual call and
+            # then retrieved in serve() probably
+
+            # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
+            for x in self.find_osd_stop_threshold(self.osd_ids):
+                self.emptying_osds.add(x)
+
+            # remove the emptying osds from the osd_ids since they don't need to be checked again.
+            self.osd_ids = self.osd_ids.difference(self.emptying_osds)
+
+            # 2) reweight the ok-to-stop osds, ONCE
+            self.reweight_osds(self.emptying_osds)
+
+            # 3) check for osds to be empty
+            empty_osds = self.empty_osds(self.emptying_osds)
+
+            # remove osds that are marked as empty
+            self.emptying_osds = self.emptying_osds.difference(empty_osds)
+
+            # fixed sleep interval of 10 seconds
+            sleep_interval = 10
+            self.log.debug('Sleeping for %d seconds', sleep_interval)
+            self.event.wait(sleep_interval)
+            self.event.clear()
+
+    def shutdown(self):
+        """
+        This method is called by the mgr when the module needs to shut
+        down (i.e., when the serve() function needs to exit).
+        """
+        self.log.info('Stopping')
+        self.run = False
+        self.event.set()
+
+    def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
+        self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
+        osd_map = self.get_osdmap()
+        cluster_osds = [x.get('osd') for x in osd_map.dump().get('osds', [])]
+        not_in_cluster = set()
+        for osd_id in osd_ids:
+            if int(osd_id) not in cluster_osds:
+                self.log.error(f"Could not find {osd_id} in cluster")
+                not_in_cluster.add(osd_id)
+        return not_in_cluster
+
+    def empty_osds(self, osd_ids: Set[int]) -> List[int]:
+        if not osd_ids:
+            return list()
+        osd_df_data = self.osd_df()
+        empty_osds = list()
+        for osd_id in osd_ids:
+            if self.is_empty(osd_id, osd_df=osd_df_data):
+                empty_osds.append(osd_id)
+        return empty_osds
+
+    def osd_df(self) -> dict:
+        # TODO: this should be cached I think
+        base_cmd = 'osd df'
+        ret, out, err = self.mon_command({
+            'prefix': base_cmd,
+            'format': 'json'
+        })
+        return json.loads(out)
+
+    def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
+        pgs = self.get_pg_count(osd_id, osd_df=osd_df)
+        if pgs != 0:
+            self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
+            return False
+        self.log.info(f"osd: {osd_id} has no PGs anymore")
+        return True
+
+    def reweight_osds(self, osd_ids: Set[int]) -> bool:
+        results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
+        return all(results)
+
+    def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
+        if not osd_df:
+            osd_df = self.osd_df()
+        osd_nodes = osd_df.get('nodes', [])
+        for osd_node in osd_nodes:
+            if osd_node.get('id', None) == int(osd_id):
+                return osd_node.get('pgs')
+        errmsg = f"Could not find <pgs> field for osd_id: {osd_id} in osd_df data"
+        self.log.error(errmsg)
+        raise RuntimeError(errmsg)
+
+    def get_osd_weight(self, osd_id: int) -> float:
+        osd_df = self.osd_df()
+        osd_nodes = osd_df.get('nodes', [])
+        for osd_node in osd_nodes:
+            if osd_node.get('id', None) == int(osd_id):
+                return float(osd_node.get('crush_weight'))
+        return -1.0
+
+    def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
+        if self.get_osd_weight(osd_id) == weight:
+            self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
+            return True
+        base_cmd = 'osd crush reweight'
+        ret, out, err = self.mon_command({
+            'prefix': base_cmd,
+            'name': f'osd.{osd_id}',
+            'weight': weight
+        })
+        cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
+        self.log.debug(f"running cmd: {cmd}")
+        if ret != 0:
+            self.log.error(f"command: {cmd} failed with: {err}")
+            return False
+        self.log.info(f"command: {cmd} succeeded with: {out}")
+        return True
+
+    def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
+        """
+        Cut osd_id list in half until it's ok-to-stop
+
+        :param osd_ids: list of osd_ids
+        :return: list of ods_ids that can be stopped at once
+        """
+        if not osd_ids:
+            return set()
+        _osds: List[int] = list(osd_ids.copy())
+        while not self.ok_to_stop(_osds):
+            if len(_osds) <= 1:
+                # can't even stop one OSD, aborting
+                self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
+                return set()
+            self.event.wait(1)
+            # splitting osd_ids in half until ok_to_stop yields success
+            # maybe popping ids off one by one is better here..depends on the cluster size I guess..
+            # There's a lot of room for micro adjustments here
+            _osds = _osds[len(_osds)//2:]
+        return set(_osds)
+
+    def ok_to_stop(self, osd_ids: List[int]) -> bool:
+        base_cmd = "osd ok-to-stop"
+        self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
+        ret, out, err = self.mon_command({
+            'prefix': base_cmd,
+            # apparently ok-to-stop allows strings only
+            'ids': [str(x) for x in osd_ids]
+        })
+        if ret != 0:
+            self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
+            return False
+        self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
+        return True
index d0c2fdf597a78e4bb92915594900c5a97efddb91..23b0573220b4a4a866b734254d2c1c49139a3707 100644 (file)
@@ -19,4 +19,5 @@ commands = mypy --config-file=../../mypy.ini \
            orchestrator/__init__.py \
            progress/module.py \
            rook/module.py \
+           osd_support/module.py \
            test_orchestrator/module.py