From 789990e16eb73c62414415902d53b7eaa7dff62d Mon Sep 17 00:00:00 2001 From: Joshua Schmid Date: Tue, 18 Feb 2020 16:14:28 +0100 Subject: [PATCH] mgr/cephadm: remove/replace osds Signed-off-by: Joshua Schmid --- src/cephadm/cephadm | 12 +++ src/mon/MgrMonitor.cc | 1 + src/pybind/mgr/cephadm/_utils.py | 91 ++++++++++++++++ src/pybind/mgr/cephadm/module.py | 126 +++++++++++++++++++++- src/pybind/mgr/mgr_module.py | 1 - src/pybind/mgr/orchestrator/_interface.py | 19 ++++ src/pybind/mgr/orchestrator/module.py | 36 +++++++ src/pybind/mgr/osd_support/module.py | 32 ++++-- src/pybind/mgr/tox.ini | 1 + 9 files changed, 307 insertions(+), 12 deletions(-) create mode 100644 src/pybind/mgr/cephadm/_utils.py diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index e2320f106fe21..5d746793c21e9 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -2548,6 +2548,18 @@ def command_rm_daemon(): verbose_on_failure=False) call(['systemctl', 'disable', unit_name], verbose_on_failure=False) + if daemon_type == 'osd': + CephContainer( + image=args.image, + entrypoint='/usr/sbin/ceph-volume', + args=[ + 'lvm', 'zap', '--osd-id', + str(daemon_id) + # not sure if --destroy is useful here + ], + container_args=['--privileged'], + volume_mounts=get_container_mounts(args.fsid, daemon_type, daemon_id) + ).run() data_dir = get_data_dir(args.fsid, daemon_type, daemon_id) call_throws(['rm', '-rf', data_dir]) diff --git a/src/mon/MgrMonitor.cc b/src/mon/MgrMonitor.cc index 48ce95dfd600f..5e4bcd98f819b 100644 --- a/src/mon/MgrMonitor.cc +++ b/src/mon/MgrMonitor.cc @@ -62,6 +62,7 @@ const static std::map> always_on_modules = { "devicehealth", "orchestrator", "rbd_support", + "osd_support", "volumes", "pg_autoscaler", "telemetry", diff --git a/src/pybind/mgr/cephadm/_utils.py b/src/pybind/mgr/cephadm/_utils.py new file mode 100644 index 0000000000000..22c8ed4e93c24 --- /dev/null +++ b/src/pybind/mgr/cephadm/_utils.py @@ -0,0 +1,91 @@ +import json +try: + from typing import List +except ImportError: + pass + +from orchestrator import OrchestratorError + + +class RemoveUtil(object): + def __init__(self, mgr): + self.mgr = mgr + + def drain_osd(self, osd_id: str) -> bool: + """ + Uses `osd_support` module to schedule a drain operation of an OSD + """ + cmd_args = { + 'prefix': 'osd drain', + 'osd_ids': [int(osd_id)] + } + return self._run_mon_cmd(cmd_args) + + def get_pg_count(self, osd_id: str) -> int: + """ Queries for PG count of an OSD """ + self.mgr.log.debug("Querying for drain status") + ret, out, err = self.mgr.mon_command({ + 'prefix': 'osd drain status', + }) + if ret != 0: + self.mgr.log.error(f"Calling osd drain status failed with {err}") + raise OrchestratorError("Could not query `osd drain status`") + out = json.loads(out) + for o in out: + if str(o.get('osd_id', '')) == str(osd_id): + return int(o.get('pgs', -1)) + return -1 + + def is_empty(self, osd_id: str) -> bool: + """ Checks if an OSD is empty """ + return self.get_pg_count(osd_id) == 0 + + def ok_to_destroy(self, osd_ids: List[int]) -> bool: + """ Queries the safe-to-destroy flag for OSDs """ + cmd_args = {'prefix': 'osd safe-to-destroy', + 'ids': osd_ids} + return self._run_mon_cmd(cmd_args) + + def destroy_osd(self, osd_id: int) -> bool: + """ Destroys an OSD (forcefully) """ + cmd_args = {'prefix': 'osd destroy-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True} + return self._run_mon_cmd(cmd_args) + + def down_osd(self, osd_ids: List[int]) -> bool: + """ Sets `out` flag to OSDs """ + cmd_args = { + 'prefix': 'osd down', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def purge_osd(self, osd_id: int) -> bool: + """ Purges an OSD from the cluster (forcefully) """ + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True + } + return self._run_mon_cmd(cmd_args) + + def out_osd(self, osd_ids: List[int]) -> bool: + """ Sets `down` flag to OSDs """ + cmd_args = { + 'prefix': 'osd out', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def _run_mon_cmd(self, cmd_args: dict) -> bool: + """ + Generic command to run mon_command and evaluate/log the results + """ + ret, out, err = self.mgr.mon_command(cmd_args) + if ret != 0: + self.mgr.log.debug(f"ran {cmd_args} with mon_command") + self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + return False + self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + return True diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 61725d7ec3868..3c83e72f1b218 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -10,7 +10,7 @@ from OpenSSL import crypto import string try: - from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any + from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple from typing import TYPE_CHECKING except ImportError: TYPE_CHECKING = False # just for type checking @@ -38,6 +38,8 @@ from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValid CLICommandMeta from . import remotes +from ._utils import RemoveUtil + try: import remoto @@ -83,6 +85,20 @@ except ImportError: def __exit__(self, exc_type, exc_value, traceback): self.cleanup() +class OSDRemoval(NamedTuple): + osd_id: int + replace: bool + force: bool + nodename: str + fullname: str + started_at: datetime.datetime + + # needed due to changing 'started_at' attr + def __eq__(self, other): + return self.osd_id == other.osd_id + + def __hash__(self): + return hash(self.osd_id) # high-level TODO: # - bring over some of the protections from ceph-deploy that guard against @@ -402,7 +418,6 @@ def ssh_completion(cls=AsyncCompletion, **c_kwargs): else: return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs) - return wrapper return decorator @@ -561,6 +576,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache = HostCache(self) self.cache.load() + self.to_remove_osds: set = set() + self.osd_removal_report: dict = dict() + self.rm_util = RemoveUtil(self) # ensure the host lists are in sync for h in self.inventory.keys(): @@ -932,6 +950,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.log.info("serve starting") while self.run: self._check_hosts() + self._remove_osds_bg() # refresh daemons self.log.debug('refreshing hosts') @@ -1664,13 +1683,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): raise OrchestratorError('Unable to find daemon(s) %s' % (names)) return self._remove_daemon(args) - def remove_service(self, service_name): + def remove_service(self, service_name, force=False): args = [] for host, dm in self.cache.daemons.items(): for name, d in dm.items(): if d.matches_service(service_name): args.append( - ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname) + ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname, force) ) if not args: raise OrchestratorError('Unable to find daemons in %s service' % ( @@ -2503,6 +2522,105 @@ datasources: self.event.set() return trivial_result('Stopped upgrade to %s' % target_name) + def remove_osds(self, osd_ids: List[str], + replace: bool = False, + force: bool = False) -> orchestrator.Completion: + """ + Takes a list of OSDs and schedules them for removal. + The function that takes care of the actual removal is + _remove_osds_bg(). + """ + + daemons = self.cache.get_daemons_by_service('osd') + found = set() + for daemon in daemons: + if daemon.daemon_id not in osd_ids: + continue + found.add(OSDRemoval(daemon.daemon_id, replace, force, + daemon.hostname, daemon.name(), + datetime.datetime.utcnow())) + + not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} + if not_found: + raise OrchestratorError('Unable to find OSD: %s' % not_found) + + for osd in found: + self.to_remove_osds.add(osd) + # trigger the serve loop to initiate the removal + self._kick_serve_loop() + return trivial_result(f"Scheduled OSD(s) for removal") + + def _remove_osds_bg(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + """ + self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") + self.osd_removal_report = self._generate_osd_removal_status() + remove_osds: set = self.to_remove_osds.copy() + for osd in remove_osds: + if not osd.force: + self.rm_util.drain_osd(osd.osd_id) + # skip criteria + if not self.rm_util.is_empty(osd.osd_id): + self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + continue + + if not self.rm_util.ok_to_destroy([osd.osd_id]): + self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + continue + + # abort criteria + if not self.rm_util.down_osd([osd.osd_id]): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'") + + if osd.replace: + if not self.rm_util.destroy_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>") + else: + if not self.rm_util.purge_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + + completion = self._remove_daemon([(osd.fullname, osd.nodename, True)]) + completion.add_progress('Removing OSDs', self) + completion.update_progress = True + if completion: + while not completion.has_result: + self.process([completion]) + if completion.needs_result: + time.sleep(1) + else: + break + if completion.exception is not None: + self.log.error(str(completion.exception)) + else: + raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon") + + self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}") + self.log.debug(f"Removing {osd.osd_id} from the queue.") + self.to_remove_osds.remove(osd) + + def _generate_osd_removal_status(self) -> Dict[Any, object]: + """ + Generate a OSD report that can be printed to the CLI + """ + self.log.debug("Assembling report for osd rm status") + report = {} + for osd in self.to_remove_osds: + pg_count = self.rm_util.get_pg_count(osd.osd_id) + report[osd] = pg_count if pg_count != -1 else 'n/a' + self.log.debug(f"Reporting: {report}") + return report + + def remove_osds_status(self) -> orchestrator.Completion: + """ + The CLI call to retrieve an osd removal report + """ + return trivial_result(self.osd_removal_report) + class BaseScheduler(object): """ diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 1681e7c998ae3..fb386df0172eb 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -5,7 +5,6 @@ try: except ImportError: # just for type checking pass -import datetime import logging import errno import json diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 6a521744d8fba..66905ac3300ab 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -975,6 +975,25 @@ class Orchestrator(object): """ raise NotImplementedError() + def remove_osds(self, osd_ids: List[str], + replace: bool = False, + force: bool = False) -> Completion: + """ + :param osd_ids: list of OSD IDs + :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` + :param force: Forces the OSD removal process without waiting for the data to be drained first. + Note that this can only remove OSDs that were successfully + created (i.e. got an OSD ID). + """ + raise NotImplementedError() + + def remove_osds_status(self): + # type: () -> Completion + """ + Returns a status of the ongoing OSD removal operations. + """ + raise NotImplementedError() + def blink_device_light(self, ident_fault, on, locations): # type: (str, bool, List[DeviceLightLoc]) -> Completion """ diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 4aaea60892835..c1185fc8bb64b 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -457,6 +457,42 @@ Usage: self._orchestrator_wait([completion]) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch osd rm', + "name=svc_id,type=CephString,n=N " + "name=replace,type=CephBool,req=false " + "name=force,type=CephBool,req=false", + 'Remove OSD services') + def _osd_rm(self, svc_id: List[str], + replace: bool = False, + force: bool = False) -> HandleCommandResult: + completion = self.remove_osds(svc_id, replace, force) + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch osd rm status', + desc='status of OSD removal operation') + def _osd_rm_status(self) -> HandleCommandResult: + completion = self.remove_osds_status() + self._orchestrator_wait([completion]) + raise_if_exception(completion) + report = completion.result + if len(report) == 0: + return HandleCommandResult(stdout="No OSD remove/replace operations reported") + table = PrettyTable( + ['NAME', 'HOST', 'PGS', 'STARTED_AT'], + border=False) + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 1 + # TODO: re-add sorted and sort by pg_count + for osd, status in report.items(): + table.add_row((osd.fullname, osd.nodename, status, osd.started_at)) + + return HandleCommandResult(stdout=table.get_string()) @_cli_write_command( 'orch daemon add mon', diff --git a/src/pybind/mgr/osd_support/module.py b/src/pybind/mgr/osd_support/module.py index eba8259abd6b1..14779d3f324ec 100644 --- a/src/pybind/mgr/osd_support/module.py +++ b/src/pybind/mgr/osd_support/module.py @@ -33,6 +33,7 @@ class OSDSupport(MgrModule): osd_ids: Set[int] = set() emptying_osds: Set[int] = set() check_osds: Set[int] = set() + empty: Set[int] = set() def __init__(self, *args, **kwargs): super(OSDSupport, self).__init__(*args, **kwargs) @@ -79,7 +80,7 @@ class OSDSupport(MgrModule): if osd_id not in self.emptying_osds: self.osd_ids.add(osd_id) self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.') - out = 'Started draining OSDs. Query progress with ' + out = 'Started draining OSDs. Query progress with ' elif cmd_prefix == 'osd drain status': # re-initialize it with an empty set on invocation (long running processes) @@ -87,12 +88,13 @@ class OSDSupport(MgrModule): # assemble a set of emptying osds and to_be_emptied osds self.check_osds.update(self.emptying_osds) self.check_osds.update(self.osd_ids) + self.check_osds.update(self.empty) report = list() for osd_id in self.check_osds: pgs = self.get_pg_count(osd_id) report.append(dict(osd_id=osd_id, pgs=pgs)) - out = f"{report}" + out = f"{json.dumps(report)}" elif cmd_prefix == 'osd drain stop': if not osd_ids: @@ -130,7 +132,6 @@ class OSDSupport(MgrModule): """ self.log.info("Starting mgr/osd_support") while self.run: - # Do some useful background work here. self.log.debug(f"Scheduled for draining: <{self.osd_ids}>") self.log.debug(f"Currently being drained: <{self.emptying_osds}>") @@ -153,12 +154,31 @@ class OSDSupport(MgrModule): # remove osds that are marked as empty self.emptying_osds = self.emptying_osds.difference(empty_osds) + # move empty osds in the done queue until they disappear from ceph's view + # other modules need to know when OSDs are empty + for osd in empty_osds: + self.log.debug(f"Adding {osd} to list of empty OSDs") + self.empty.add(osd) + + # remove from queue if no longer part of ceph cluster + self.cleanup() + # fixed sleep interval of 10 seconds sleep_interval = 10 self.log.debug('Sleeping for %d seconds', sleep_interval) self.event.wait(sleep_interval) self.event.clear() + def cleanup(self): + """ + Remove OSDs that are no longer in the ceph cluster from the + 'done' list. + :return: + """ + for osd in self.osds_not_in_cluster(list(self.empty)): + self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing") + self.empty.remove(osd) + def shutdown(self): """ This method is called by the mgr when the module needs to shut @@ -216,10 +236,8 @@ class OSDSupport(MgrModule): osd_nodes = osd_df.get('nodes', []) for osd_node in osd_nodes: if osd_node.get('id', None) == int(osd_id): - return osd_node.get('pgs') - errmsg = f"Could not find field for osd_id: {osd_id} in osd_df data" - self.log.error(errmsg) - raise RuntimeError(errmsg) + return osd_node.get('pgs', -1) + return -1 def get_osd_weight(self, osd_id: int) -> float: osd_df = self.osd_df() diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 4e365b441c4c0..0fc10103f0585 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -14,6 +14,7 @@ deps = mypy commands = mypy --config-file=../../mypy.ini \ cephadm/module.py \ + cephadm/_utils.py \ mgr_module.py \ dashboard/module.py \ mgr_util.py \ -- 2.39.5