From: Joshua Schmid Date: Tue, 14 Jan 2020 17:54:10 +0000 (+0100) Subject: mgr/osd_support: new module for osd utility calls X-Git-Tag: v15.1.1~360^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b5c5ef185d17ae5e443a849649da0fc926940307;p=ceph.git mgr/osd_support: new module for osd utility calls it lets you start/stop and monitor osd draining processes Signed-off-by: Joshua Schmid --- diff --git a/src/pybind/mgr/osd_support/__init__.py b/src/pybind/mgr/osd_support/__init__.py new file mode 100644 index 000000000000..88ed2b9f57c1 --- /dev/null +++ b/src/pybind/mgr/osd_support/__init__.py @@ -0,0 +1 @@ +from .module import OSDSupport diff --git a/src/pybind/mgr/osd_support/module.py b/src/pybind/mgr/osd_support/module.py new file mode 100644 index 000000000000..bbdebc16f3d8 --- /dev/null +++ b/src/pybind/mgr/osd_support/module.py @@ -0,0 +1,326 @@ +from typing import List, Set, Optional +from mgr_module import MgrModule, HandleCommandResult +from threading import Event +import json +import errno + +""" +A module that holds a variety of useful utility to deal with OSDs + +Draining OSDs: + +This module can be used to drain osds from the commandline: + +$ ceph osd drain $osd_ids ([$osd_ids]) + +You can monitor the progress with + +$ ceph osd drain status + +Gives you the status of scheduled and running osd drain operations + +[{'osd_id': 0, 'pgs': 1234}, ..] + +$ ceph osd drain stop ([$osd_ids]) + +Stops all !SCHEDULED! osd drain operations (not the operations that have been started already) +if no $osd_ids are give. If $osd_ids are present it only operates on them. +To stop and reset the weight of already started operations we need to save the initial weight +(see 'Ideas for improvement') + + +Ideas for improvement: +- add health checks set_health_checks +- use objects to represent OSDs + - allows timestamps, trending information etc +- save osd drain state (at least the osd_ids in the mon store) + - resume after a mgr crash + - save the initial weight of a osd i.e. (set to initial weight on abort) +""" + + +class OSDSupport(MgrModule): + # these are CLI commands we implement + COMMANDS = [ + { + "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N", + "desc": "drain osd ids", + "perm": "r" + }, + { + "cmd": "osd drain status", + "desc": "show status", + "perm": "r" + }, + { + "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N", + "desc": "show status for osds. Stopping all if osd_ids are omitted", + "perm": "r" + }, + ] + + # These are module options we understand. These can be set with + # + # ceph config set global mgr/hello/ + # + # e.g., + # + # ceph config set global mgr/hello/place Earth + # + MODULE_OPTIONS: List[dict] = [] + + # These are "native" Ceph options that this module cares about. + NATIVE_OPTIONS: List[str] = [] + + osd_ids: Set[int] = set() + emptying_osds: Set[int] = set() + check_osds: Set[int] = set() + + def __init__(self, *args, **kwargs): + super(OSDSupport, self).__init__(*args, **kwargs) + + # set up some members to enable the serve() method and shutdown() + self.run = True + self.event = Event() + + # ensure config options members are initialized; see config_notify() + self.config_notify() + + def config_notify(self): + """ + This method is called whenever one of our config options is changed. + """ + # This is some boilerplate that stores MODULE_OPTIONS in a class + # member, so that, for instance, the 'emphatic' option is always + # available as 'self.emphatic'. + for opt in self.MODULE_OPTIONS: + setattr(self, + opt['name'], + self.get_module_option(opt['name']) or opt['default']) + self.log.debug(' mgr option %s = %s', + opt['name'], getattr(self, opt['name'])) + # Do the same for the native options. + for _opt in self.NATIVE_OPTIONS: + setattr(self, + _opt, + self.get_ceph_option(_opt)) + self.log.debug('native option %s = %s', _opt, getattr(self, _opt)) + + def handle_command(self, inbuf, cmd): + ret = 0 + err = '' + _ = inbuf + cmd_prefix = cmd.get('prefix', '') + osd_ids: List[int] = cmd.get('osd_ids', list()) + not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids) + if cmd_prefix == 'osd drain': + if not_found_osds: + return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster" + # add osd_ids to set + for osd_id in osd_ids: + if osd_id not in self.emptying_osds: + self.osd_ids.add(osd_id) + self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.') + out = 'Started draining OSDs. Query progress with ' + + elif cmd_prefix == 'osd drain status': + # re-initialize it with an empty set on invocation (long running processes) + self.check_osds = set() + # assemble a set of emptying osds and to_be_emptied osds + self.check_osds.update(self.emptying_osds) + self.check_osds.update(self.osd_ids) + + report = list() + for osd_id in self.check_osds: + pgs = self.get_pg_count(osd_id) + report.append(dict(osd_id=osd_id, pgs=pgs)) + out = f"{report}" + + elif cmd_prefix == 'osd drain stop': + if not osd_ids: + self.log.debug("No osd_ids provided, stop all pending drain operations)") + self.osd_ids = set() + self.emptying_osds = set() + + # this is just a poor-man's solution as it will not really stop draining + # the osds. It will just stop the queries and also prevent any scheduled OSDs + # from getting drained at a later point in time. + out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)" + + else: + if not_found_osds: + return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster" + + self.osd_ids = self.osd_ids.difference(osd_ids) + self.emptying_osds = self.emptying_osds.difference(osd_ids) + out = f"Stopped draining operations for OSD(s): {osd_ids}" + + else: + return HandleCommandResult( + retval=-errno.EINVAL, + stdout='', + stderr=f"Command not found <{cmd.get('prefix', '')}>") + return HandleCommandResult( + retval=ret, # exit code + stdout=out, # stdout + stderr=err) + + def serve(self): + """ + This method is called by the mgr when the module starts and can be + used for any background activity. + """ + self.log.info("Starting mgr/osd_support") + while self.run: + # Do some useful background work here. + + self.log.debug(f"Scheduled for draining: <{self.osd_ids}>") + self.log.debug(f"Currently being drained: <{self.emptying_osds}>") + # the state should be saved to the mon store in the actual call and + # then retrieved in serve() probably + + # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop + for x in self.find_osd_stop_threshold(self.osd_ids): + self.emptying_osds.add(x) + + # remove the emptying osds from the osd_ids since they don't need to be checked again. + self.osd_ids = self.osd_ids.difference(self.emptying_osds) + + # 2) reweight the ok-to-stop osds, ONCE + self.reweight_osds(self.emptying_osds) + + # 3) check for osds to be empty + empty_osds = self.empty_osds(self.emptying_osds) + + # remove osds that are marked as empty + self.emptying_osds = self.emptying_osds.difference(empty_osds) + + # fixed sleep interval of 10 seconds + sleep_interval = 10 + self.log.debug('Sleeping for %d seconds', sleep_interval) + self.event.wait(sleep_interval) + self.event.clear() + + def shutdown(self): + """ + This method is called by the mgr when the module needs to shut + down (i.e., when the serve() function needs to exit). + """ + self.log.info('Stopping') + self.run = False + self.event.set() + + def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]: + self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster") + osd_map = self.get_osdmap() + cluster_osds = [x.get('osd') for x in osd_map.dump().get('osds', [])] + not_in_cluster = set() + for osd_id in osd_ids: + if int(osd_id) not in cluster_osds: + self.log.error(f"Could not find {osd_id} in cluster") + not_in_cluster.add(osd_id) + return not_in_cluster + + def empty_osds(self, osd_ids: Set[int]) -> List[int]: + if not osd_ids: + return list() + osd_df_data = self.osd_df() + empty_osds = list() + for osd_id in osd_ids: + if self.is_empty(osd_id, osd_df=osd_df_data): + empty_osds.append(osd_id) + return empty_osds + + def osd_df(self) -> dict: + # TODO: this should be cached I think + base_cmd = 'osd df' + ret, out, err = self.mon_command({ + 'prefix': base_cmd, + 'format': 'json' + }) + return json.loads(out) + + def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool: + pgs = self.get_pg_count(osd_id, osd_df=osd_df) + if pgs != 0: + self.log.info(f"osd: {osd_id} still has {pgs} PGs.") + return False + self.log.info(f"osd: {osd_id} has no PGs anymore") + return True + + def reweight_osds(self, osd_ids: Set[int]) -> bool: + results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids] + return all(results) + + def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: + if not osd_df: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id', None) == int(osd_id): + return osd_node.get('pgs') + errmsg = f"Could not find field for osd_id: {osd_id} in osd_df data" + self.log.error(errmsg) + raise RuntimeError(errmsg) + + def get_osd_weight(self, osd_id: int) -> float: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id', None) == int(osd_id): + return float(osd_node.get('crush_weight')) + return -1.0 + + def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool: + if self.get_osd_weight(osd_id) == weight: + self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}") + return True + base_cmd = 'osd crush reweight' + ret, out, err = self.mon_command({ + 'prefix': base_cmd, + 'name': f'osd.{osd_id}', + 'weight': weight + }) + cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}" + self.log.debug(f"running cmd: {cmd}") + if ret != 0: + self.log.error(f"command: {cmd} failed with: {err}") + return False + self.log.info(f"command: {cmd} succeeded with: {out}") + return True + + def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]: + """ + Cut osd_id list in half until it's ok-to-stop + + :param osd_ids: list of osd_ids + :return: list of ods_ids that can be stopped at once + """ + if not osd_ids: + return set() + _osds: List[int] = list(osd_ids.copy()) + while not self.ok_to_stop(_osds): + if len(_osds) <= 1: + # can't even stop one OSD, aborting + self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..") + return set() + self.event.wait(1) + # splitting osd_ids in half until ok_to_stop yields success + # maybe popping ids off one by one is better here..depends on the cluster size I guess.. + # There's a lot of room for micro adjustments here + _osds = _osds[len(_osds)//2:] + return set(_osds) + + def ok_to_stop(self, osd_ids: List[int]) -> bool: + base_cmd = "osd ok-to-stop" + self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}") + ret, out, err = self.mon_command({ + 'prefix': base_cmd, + # apparently ok-to-stop allows strings only + 'ids': [str(x) for x in osd_ids] + }) + if ret != 0: + self.log.error(f"{osd_ids} are not ok-to-stop. {err}") + return False + self.log.info(f"OSDs <{osd_ids}> are ok-to-stop") + return True diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index d0c2fdf597a7..23b0573220b4 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -19,4 +19,5 @@ commands = mypy --config-file=../../mypy.ini \ orchestrator/__init__.py \ progress/module.py \ rook/module.py \ + osd_support/module.py \ test_orchestrator/module.py