--- /dev/null
+from typing import List, Set, Optional
+from mgr_module import MgrModule, HandleCommandResult
+from threading import Event
+import json
+import errno
+
+"""
+A module that holds a variety of useful utility to deal with OSDs
+
+Draining OSDs:
+
+This module can be used to drain osds from the commandline:
+
+$ ceph osd drain $osd_ids ([$osd_ids])
+
+You can monitor the progress with
+
+$ ceph osd drain status
+
+Gives you the status of scheduled and running osd drain operations
+
+[{'osd_id': 0, 'pgs': 1234}, ..]
+
+$ ceph osd drain stop ([$osd_ids])
+
+Stops all !SCHEDULED! osd drain operations (not the operations that have been started already)
+if no $osd_ids are give. If $osd_ids are present it only operates on them.
+To stop and reset the weight of already started operations we need to save the initial weight
+(see 'Ideas for improvement')
+
+
+Ideas for improvement:
+- add health checks set_health_checks
+- use objects to represent OSDs
+ - allows timestamps, trending information etc
+- save osd drain state (at least the osd_ids in the mon store)
+ - resume after a mgr crash
+ - save the initial weight of a osd i.e. (set to initial weight on abort)
+"""
+
+
+class OSDSupport(MgrModule):
+ # these are CLI commands we implement
+ COMMANDS = [
+ {
+ "cmd": "osd drain name=osd_ids,type=CephInt,req=true,n=N",
+ "desc": "drain osd ids",
+ "perm": "r"
+ },
+ {
+ "cmd": "osd drain status",
+ "desc": "show status",
+ "perm": "r"
+ },
+ {
+ "cmd": "osd drain stop name=osd_ids,type=CephInt,req=false,n=N",
+ "desc": "show status for osds. Stopping all if osd_ids are omitted",
+ "perm": "r"
+ },
+ ]
+
+ # These are module options we understand. These can be set with
+ #
+ # ceph config set global mgr/hello/<name> <value>
+ #
+ # e.g.,
+ #
+ # ceph config set global mgr/hello/place Earth
+ #
+ MODULE_OPTIONS: List[dict] = []
+
+ # These are "native" Ceph options that this module cares about.
+ NATIVE_OPTIONS: List[str] = []
+
+ osd_ids: Set[int] = set()
+ emptying_osds: Set[int] = set()
+ check_osds: Set[int] = set()
+
+ def __init__(self, *args, **kwargs):
+ super(OSDSupport, self).__init__(*args, **kwargs)
+
+ # set up some members to enable the serve() method and shutdown()
+ self.run = True
+ self.event = Event()
+
+ # ensure config options members are initialized; see config_notify()
+ self.config_notify()
+
+ def config_notify(self):
+ """
+ This method is called whenever one of our config options is changed.
+ """
+ # This is some boilerplate that stores MODULE_OPTIONS in a class
+ # member, so that, for instance, the 'emphatic' option is always
+ # available as 'self.emphatic'.
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'],
+ self.get_module_option(opt['name']) or opt['default'])
+ self.log.debug(' mgr option %s = %s',
+ opt['name'], getattr(self, opt['name']))
+ # Do the same for the native options.
+ for _opt in self.NATIVE_OPTIONS:
+ setattr(self,
+ _opt,
+ self.get_ceph_option(_opt))
+ self.log.debug('native option %s = %s', _opt, getattr(self, _opt))
+
+ def handle_command(self, inbuf, cmd):
+ ret = 0
+ err = ''
+ _ = inbuf
+ cmd_prefix = cmd.get('prefix', '')
+ osd_ids: List[int] = cmd.get('osd_ids', list())
+ not_found_osds: Set[int] = self.osds_not_in_cluster(osd_ids)
+ if cmd_prefix == 'osd drain':
+ if not_found_osds:
+ return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
+ # add osd_ids to set
+ for osd_id in osd_ids:
+ if osd_id not in self.emptying_osds:
+ self.osd_ids.add(osd_id)
+ self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
+ out = 'Started draining OSDs. Query progress with <ceph drain status>'
+
+ elif cmd_prefix == 'osd drain status':
+ # re-initialize it with an empty set on invocation (long running processes)
+ self.check_osds = set()
+ # assemble a set of emptying osds and to_be_emptied osds
+ self.check_osds.update(self.emptying_osds)
+ self.check_osds.update(self.osd_ids)
+
+ report = list()
+ for osd_id in self.check_osds:
+ pgs = self.get_pg_count(osd_id)
+ report.append(dict(osd_id=osd_id, pgs=pgs))
+ out = f"{report}"
+
+ elif cmd_prefix == 'osd drain stop':
+ if not osd_ids:
+ self.log.debug("No osd_ids provided, stop all pending drain operations)")
+ self.osd_ids = set()
+ self.emptying_osds = set()
+
+ # this is just a poor-man's solution as it will not really stop draining
+ # the osds. It will just stop the queries and also prevent any scheduled OSDs
+ # from getting drained at a later point in time.
+ out = "Stopped all future draining operations (not resetting the weight for already reweighted OSDs)"
+
+ else:
+ if not_found_osds:
+ return -errno.EINVAL, '', f"OSDs <{not_found_osds}> not found in cluster"
+
+ self.osd_ids = self.osd_ids.difference(osd_ids)
+ self.emptying_osds = self.emptying_osds.difference(osd_ids)
+ out = f"Stopped draining operations for OSD(s): {osd_ids}"
+
+ else:
+ return HandleCommandResult(
+ retval=-errno.EINVAL,
+ stdout='',
+ stderr=f"Command not found <{cmd.get('prefix', '')}>")
+ return HandleCommandResult(
+ retval=ret, # exit code
+ stdout=out, # stdout
+ stderr=err)
+
+ def serve(self):
+ """
+ This method is called by the mgr when the module starts and can be
+ used for any background activity.
+ """
+ self.log.info("Starting mgr/osd_support")
+ while self.run:
+ # Do some useful background work here.
+
+ self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
+ self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
+ # the state should be saved to the mon store in the actual call and
+ # then retrieved in serve() probably
+
+ # 1) check if all provided osds can be stopped, if not, shrink list until ok-to-stop
+ for x in self.find_osd_stop_threshold(self.osd_ids):
+ self.emptying_osds.add(x)
+
+ # remove the emptying osds from the osd_ids since they don't need to be checked again.
+ self.osd_ids = self.osd_ids.difference(self.emptying_osds)
+
+ # 2) reweight the ok-to-stop osds, ONCE
+ self.reweight_osds(self.emptying_osds)
+
+ # 3) check for osds to be empty
+ empty_osds = self.empty_osds(self.emptying_osds)
+
+ # remove osds that are marked as empty
+ self.emptying_osds = self.emptying_osds.difference(empty_osds)
+
+ # fixed sleep interval of 10 seconds
+ sleep_interval = 10
+ self.log.debug('Sleeping for %d seconds', sleep_interval)
+ self.event.wait(sleep_interval)
+ self.event.clear()
+
+ def shutdown(self):
+ """
+ This method is called by the mgr when the module needs to shut
+ down (i.e., when the serve() function needs to exit).
+ """
+ self.log.info('Stopping')
+ self.run = False
+ self.event.set()
+
+ def osds_not_in_cluster(self, osd_ids: List[int]) -> Set[int]:
+ self.log.info(f"Checking if provided osds <{osd_ids}> exist in the cluster")
+ osd_map = self.get_osdmap()
+ cluster_osds = [x.get('osd') for x in osd_map.dump().get('osds', [])]
+ not_in_cluster = set()
+ for osd_id in osd_ids:
+ if int(osd_id) not in cluster_osds:
+ self.log.error(f"Could not find {osd_id} in cluster")
+ not_in_cluster.add(osd_id)
+ return not_in_cluster
+
+ def empty_osds(self, osd_ids: Set[int]) -> List[int]:
+ if not osd_ids:
+ return list()
+ osd_df_data = self.osd_df()
+ empty_osds = list()
+ for osd_id in osd_ids:
+ if self.is_empty(osd_id, osd_df=osd_df_data):
+ empty_osds.append(osd_id)
+ return empty_osds
+
+ def osd_df(self) -> dict:
+ # TODO: this should be cached I think
+ base_cmd = 'osd df'
+ ret, out, err = self.mon_command({
+ 'prefix': base_cmd,
+ 'format': 'json'
+ })
+ return json.loads(out)
+
+ def is_empty(self, osd_id: int, osd_df: Optional[dict] = None) -> bool:
+ pgs = self.get_pg_count(osd_id, osd_df=osd_df)
+ if pgs != 0:
+ self.log.info(f"osd: {osd_id} still has {pgs} PGs.")
+ return False
+ self.log.info(f"osd: {osd_id} has no PGs anymore")
+ return True
+
+ def reweight_osds(self, osd_ids: Set[int]) -> bool:
+ results = [(self.reweight_osd(osd_id)) for osd_id in osd_ids]
+ return all(results)
+
+ def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
+ if not osd_df:
+ osd_df = self.osd_df()
+ osd_nodes = osd_df.get('nodes', [])
+ for osd_node in osd_nodes:
+ if osd_node.get('id', None) == int(osd_id):
+ return osd_node.get('pgs')
+ errmsg = f"Could not find <pgs> field for osd_id: {osd_id} in osd_df data"
+ self.log.error(errmsg)
+ raise RuntimeError(errmsg)
+
+ def get_osd_weight(self, osd_id: int) -> float:
+ osd_df = self.osd_df()
+ osd_nodes = osd_df.get('nodes', [])
+ for osd_node in osd_nodes:
+ if osd_node.get('id', None) == int(osd_id):
+ return float(osd_node.get('crush_weight'))
+ return -1.0
+
+ def reweight_osd(self, osd_id: int, weight: float = 0.0) -> bool:
+ if self.get_osd_weight(osd_id) == weight:
+ self.log.debug(f"OSD <{osd_id}> is already weighted with: {weight}")
+ return True
+ base_cmd = 'osd crush reweight'
+ ret, out, err = self.mon_command({
+ 'prefix': base_cmd,
+ 'name': f'osd.{osd_id}',
+ 'weight': weight
+ })
+ cmd = f"{base_cmd} on osd.{osd_id} to weight: {weight}"
+ self.log.debug(f"running cmd: {cmd}")
+ if ret != 0:
+ self.log.error(f"command: {cmd} failed with: {err}")
+ return False
+ self.log.info(f"command: {cmd} succeeded with: {out}")
+ return True
+
+ def find_osd_stop_threshold(self, osd_ids: Set[int]) -> Set[int]:
+ """
+ Cut osd_id list in half until it's ok-to-stop
+
+ :param osd_ids: list of osd_ids
+ :return: list of ods_ids that can be stopped at once
+ """
+ if not osd_ids:
+ return set()
+ _osds: List[int] = list(osd_ids.copy())
+ while not self.ok_to_stop(_osds):
+ if len(_osds) <= 1:
+ # can't even stop one OSD, aborting
+ self.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
+ return set()
+ self.event.wait(1)
+ # splitting osd_ids in half until ok_to_stop yields success
+ # maybe popping ids off one by one is better here..depends on the cluster size I guess..
+ # There's a lot of room for micro adjustments here
+ _osds = _osds[len(_osds)//2:]
+ return set(_osds)
+
+ def ok_to_stop(self, osd_ids: List[int]) -> bool:
+ base_cmd = "osd ok-to-stop"
+ self.log.debug(f"running cmd: {base_cmd} on ids {osd_ids}")
+ ret, out, err = self.mon_command({
+ 'prefix': base_cmd,
+ # apparently ok-to-stop allows strings only
+ 'ids': [str(x) for x in osd_ids]
+ })
+ if ret != 0:
+ self.log.error(f"{osd_ids} are not ok-to-stop. {err}")
+ return False
+ self.log.info(f"OSDs <{osd_ids}> are ok-to-stop")
+ return True