]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: remove/replace osds 32983/head
authorJoshua Schmid <jschmid@suse.de>
Tue, 18 Feb 2020 15:14:28 +0000 (16:14 +0100)
committerJoshua Schmid <jschmid@suse.de>
Wed, 26 Feb 2020 21:08:48 +0000 (22:08 +0100)
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/cephadm/cephadm
src/mon/MgrMonitor.cc
src/pybind/mgr/cephadm/_utils.py [new file with mode: 0644]
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/mgr_module.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py
src/pybind/mgr/osd_support/module.py
src/pybind/mgr/tox.ini

index e2320f106fe21da3a12f832a303abbdf602ff0b8..5d746793c21e96241dfc0a883cc051382ec08e5a 100755 (executable)
@@ -2548,6 +2548,18 @@ def command_rm_daemon():
          verbose_on_failure=False)
     call(['systemctl', 'disable', unit_name],
          verbose_on_failure=False)
+    if daemon_type == 'osd':
+        CephContainer(
+            image=args.image,
+            entrypoint='/usr/sbin/ceph-volume',
+            args=[
+                'lvm', 'zap', '--osd-id',
+                str(daemon_id)
+                # not sure if --destroy is useful here
+            ],
+            container_args=['--privileged'],
+            volume_mounts=get_container_mounts(args.fsid, daemon_type, daemon_id)
+        ).run()
     data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
     call_throws(['rm', '-rf', data_dir])
 
index 48ce95dfd600f99a7e53a08e19b58ba43be996d9..5e4bcd98f819b92e72861b16c45450f577777725 100644 (file)
@@ -62,6 +62,7 @@ const static std::map<uint32_t, std::set<std::string>> always_on_modules = {
       "devicehealth",
       "orchestrator",
       "rbd_support",
+      "osd_support",
       "volumes",
       "pg_autoscaler",
       "telemetry",
diff --git a/src/pybind/mgr/cephadm/_utils.py b/src/pybind/mgr/cephadm/_utils.py
new file mode 100644 (file)
index 0000000..22c8ed4
--- /dev/null
@@ -0,0 +1,91 @@
+import json
+try:
+    from typing import List
+except ImportError:
+    pass
+
+from orchestrator import OrchestratorError
+
+
+class RemoveUtil(object):
+    def __init__(self, mgr):
+        self.mgr = mgr
+
+    def drain_osd(self, osd_id: str) -> bool:
+        """
+        Uses `osd_support` module to schedule a drain operation of an OSD
+        """
+        cmd_args = {
+            'prefix': 'osd drain',
+            'osd_ids': [int(osd_id)]
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def get_pg_count(self, osd_id: str) -> int:
+        """ Queries for PG count of an OSD """
+        self.mgr.log.debug("Querying for drain status")
+        ret, out, err = self.mgr.mon_command({
+            'prefix': 'osd drain status',
+        })
+        if ret != 0:
+            self.mgr.log.error(f"Calling osd drain status failed with {err}")
+            raise OrchestratorError("Could not query `osd drain status`")
+        out = json.loads(out)
+        for o in out:
+            if str(o.get('osd_id', '')) == str(osd_id):
+                return int(o.get('pgs', -1))
+        return -1
+
+    def is_empty(self, osd_id: str) -> bool:
+        """ Checks if an OSD is empty """
+        return self.get_pg_count(osd_id) == 0
+
+    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+        """ Queries the safe-to-destroy flag for OSDs """
+        cmd_args = {'prefix': 'osd safe-to-destroy',
+                    'ids': osd_ids}
+        return self._run_mon_cmd(cmd_args)
+
+    def destroy_osd(self, osd_id: int) -> bool:
+        """ Destroys an OSD (forcefully) """
+        cmd_args = {'prefix': 'osd destroy-actual',
+                    'id': int(osd_id),
+                    'yes_i_really_mean_it': True}
+        return self._run_mon_cmd(cmd_args)
+
+    def down_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `out` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd down',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def purge_osd(self, osd_id: int) -> bool:
+        """ Purges an OSD from the cluster (forcefully) """
+        cmd_args = {
+            'prefix': 'osd purge-actual',
+            'id': int(osd_id),
+            'yes_i_really_mean_it': True
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def out_osd(self, osd_ids: List[int]) -> bool:
+        """ Sets `down` flag to OSDs """
+        cmd_args = {
+            'prefix': 'osd out',
+            'ids': osd_ids,
+        }
+        return self._run_mon_cmd(cmd_args)
+
+    def _run_mon_cmd(self, cmd_args: dict) -> bool:
+        """
+        Generic command to run mon_command and evaluate/log the results
+        """
+        ret, out, err = self.mgr.mon_command(cmd_args)
+        if ret != 0:
+            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+            self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+            return False
+        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+        return True
index 61725d7ec386859ec0b32a4b55e11572dc5b117f..3c83e72f1b218719ca0092bde87627867e10490f 100644 (file)
@@ -10,7 +10,7 @@ from OpenSSL import crypto
 
 import string
 try:
-    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any
+    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple
     from typing import TYPE_CHECKING
 except ImportError:
     TYPE_CHECKING = False  # just for type checking
@@ -38,6 +38,8 @@ from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValid
     CLICommandMeta
 
 from . import remotes
+from ._utils import RemoveUtil
+
 
 try:
     import remoto
@@ -83,6 +85,20 @@ except ImportError:
         def __exit__(self, exc_type, exc_value, traceback):
             self.cleanup()
 
+class OSDRemoval(NamedTuple):
+    osd_id: int
+    replace: bool
+    force: bool
+    nodename: str
+    fullname: str
+    started_at: datetime.datetime
+
+    # needed due to changing 'started_at' attr
+    def __eq__(self, other):
+        return self.osd_id == other.osd_id
+
+    def __hash__(self):
+        return hash(self.osd_id)
 
 # high-level TODO:
 #  - bring over some of the protections from ceph-deploy that guard against
@@ -402,7 +418,6 @@ def ssh_completion(cls=AsyncCompletion, **c_kwargs):
                 else:
                     return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
 
-
         return wrapper
     return decorator
 
@@ -561,6 +576,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         self.cache = HostCache(self)
         self.cache.load()
+        self.to_remove_osds: set = set()
+        self.osd_removal_report: dict = dict()
+        self.rm_util = RemoveUtil(self)
 
         # ensure the host lists are in sync
         for h in self.inventory.keys():
@@ -932,6 +950,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info("serve starting")
         while self.run:
             self._check_hosts()
+            self._remove_osds_bg()
 
             # refresh daemons
             self.log.debug('refreshing hosts')
@@ -1664,13 +1683,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             raise OrchestratorError('Unable to find daemon(s) %s' % (names))
         return self._remove_daemon(args)
 
-    def remove_service(self, service_name):
+    def remove_service(self, service_name, force=False):
         args = []
         for host, dm in self.cache.daemons.items():
             for name, d in dm.items():
                 if d.matches_service(service_name):
                     args.append(
-                        ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
+                        ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname, force)
                     )
         if not args:
             raise OrchestratorError('Unable to find daemons in %s service' % (
@@ -2503,6 +2522,105 @@ datasources:
         self.event.set()
         return trivial_result('Stopped upgrade to %s' % target_name)
 
+    def remove_osds(self, osd_ids: List[str],
+                    replace: bool = False,
+                    force: bool = False) -> orchestrator.Completion:
+        """
+        Takes a list of OSDs and schedules them for removal.
+        The function that takes care of the actual removal is
+        _remove_osds_bg().
+        """
+
+        daemons = self.cache.get_daemons_by_service('osd')
+        found = set()
+        for daemon in daemons:
+            if daemon.daemon_id not in osd_ids:
+                continue
+            found.add(OSDRemoval(daemon.daemon_id, replace, force,
+                                 daemon.hostname, daemon.name(),
+                                 datetime.datetime.utcnow()))
+
+        not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
+        if not_found:
+            raise OrchestratorError('Unable to find OSD: %s' % not_found)
+
+        for osd in found:
+            self.to_remove_osds.add(osd)
+            # trigger the serve loop to initiate the removal
+        self._kick_serve_loop()
+        return trivial_result(f"Scheduled OSD(s) for removal")
+
+    def _remove_osds_bg(self) -> None:
+        """
+        Performs actions in the _serve() loop to remove an OSD
+        when criteria is met.
+        """
+        self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+        self.osd_removal_report = self._generate_osd_removal_status()
+        remove_osds: set = self.to_remove_osds.copy()
+        for osd in remove_osds:
+            if not osd.force:
+                self.rm_util.drain_osd(osd.osd_id)
+                # skip criteria
+                if not self.rm_util.is_empty(osd.osd_id):
+                    self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    continue
+
+            if not self.rm_util.ok_to_destroy([osd.osd_id]):
+                self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                continue
+
+            # abort criteria
+            if not self.rm_util.down_osd([osd.osd_id]):
+                # also remove it from the remove_osd list and set a health_check warning?
+                raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+            if osd.replace:
+                if not self.rm_util.destroy_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>")
+            else:
+                if not self.rm_util.purge_osd(osd.osd_id):
+                    # also remove it from the remove_osd list and set a health_check warning?
+                    raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+            completion = self._remove_daemon([(osd.fullname, osd.nodename, True)])
+            completion.add_progress('Removing OSDs', self)
+            completion.update_progress = True
+            if completion:
+                while not completion.has_result:
+                    self.process([completion])
+                    if completion.needs_result:
+                        time.sleep(1)
+                    else:
+                        break
+                if completion.exception is not None:
+                    self.log.error(str(completion.exception))
+            else:
+                raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon")
+
+            self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
+            self.log.debug(f"Removing {osd.osd_id} from the queue.")
+            self.to_remove_osds.remove(osd)
+
+    def _generate_osd_removal_status(self) -> Dict[Any, object]:
+        """
+        Generate a OSD report that can be printed to the CLI
+        """
+        self.log.debug("Assembling report for osd rm status")
+        report = {}
+        for osd in self.to_remove_osds:
+            pg_count = self.rm_util.get_pg_count(osd.osd_id)
+            report[osd] = pg_count if pg_count != -1 else 'n/a'
+        self.log.debug(f"Reporting: {report}")
+        return report
+
+    def remove_osds_status(self) -> orchestrator.Completion:
+        """
+        The CLI call to retrieve an osd removal report
+        """
+        return trivial_result(self.osd_removal_report)
+
 
 class BaseScheduler(object):
     """
index 1681e7c998ae3fcd17a8fc7a91c0d8eae61e4812..fb386df0172eb52843d9cb7b71a6e3c8d61debb9 100644 (file)
@@ -5,7 +5,6 @@ try:
 except ImportError:
     # just for type checking
     pass
-import datetime
 import logging
 import errno
 import json
index 6a521744d8fba37e1c6396118f7c957e1a886c1e..66905ac3300abc88e503c7c84caa83b76bc611a8 100644 (file)
@@ -975,6 +975,25 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def remove_osds(self, osd_ids: List[str],
+                    replace: bool = False,
+                    force: bool = False) -> Completion:
+        """
+        :param osd_ids: list of OSD IDs
+        :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
+        :param force: Forces the OSD removal process without waiting for the data to be drained first.
+        Note that this can only remove OSDs that were successfully
+        created (i.e. got an OSD ID).
+        """
+        raise NotImplementedError()
+
+    def remove_osds_status(self):
+        # type: () -> Completion
+        """
+        Returns a status of the ongoing OSD removal operations.
+        """
+        raise NotImplementedError()
+
     def blink_device_light(self, ident_fault, on, locations):
         # type: (str, bool, List[DeviceLightLoc]) -> Completion
         """
index 4aaea608928358f5a5c4d7abff5ee5b5536a942a..c1185fc8bb64baef2ed9ee63c9c2e12402f102d5 100644 (file)
@@ -457,6 +457,42 @@ Usage:
         self._orchestrator_wait([completion])
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())
+    
+    @_cli_write_command(
+        'orch osd rm',
+        "name=svc_id,type=CephString,n=N "
+        "name=replace,type=CephBool,req=false "
+        "name=force,type=CephBool,req=false",
+        'Remove OSD services')
+    def _osd_rm(self, svc_id: List[str],
+                replace: bool = False,
+                force: bool = False) -> HandleCommandResult:
+        completion = self.remove_osds(svc_id, replace, force)
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
+    @_cli_write_command(
+        'orch osd rm status',
+        desc='status of OSD removal operation')
+    def _osd_rm_status(self) -> HandleCommandResult:
+        completion = self.remove_osds_status()
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        report = completion.result
+        if len(report) == 0:
+            return HandleCommandResult(stdout="No OSD remove/replace operations reported")
+        table = PrettyTable(
+            ['NAME', 'HOST', 'PGS', 'STARTED_AT'],
+            border=False)
+        table.align = 'l'
+        table.left_padding_width = 0
+        table.right_padding_width = 1
+        # TODO: re-add sorted and sort by pg_count
+        for osd, status in report.items():
+            table.add_row((osd.fullname, osd.nodename, status, osd.started_at))
+
+        return HandleCommandResult(stdout=table.get_string())
 
     @_cli_write_command(
         'orch daemon add mon',
index eba8259abd6b1192b31089fcb9b1a9833dcb4602..14779d3f324ece2d9bfcb576d17c21ac20dbd9ad 100644 (file)
@@ -33,6 +33,7 @@ class OSDSupport(MgrModule):
     osd_ids: Set[int] = set()
     emptying_osds: Set[int] = set()
     check_osds: Set[int] = set()
+    empty: Set[int] = set()
 
     def __init__(self, *args, **kwargs):
         super(OSDSupport, self).__init__(*args, **kwargs)
@@ -79,7 +80,7 @@ class OSDSupport(MgrModule):
                 if osd_id not in self.emptying_osds:
                     self.osd_ids.add(osd_id)
             self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
-            out = 'Started draining OSDs. Query progress with <ceph drain status>'
+            out = 'Started draining OSDs. Query progress with <ceph osd drain status>'
 
         elif cmd_prefix == 'osd drain status':
             # re-initialize it with an empty set on invocation (long running processes)
@@ -87,12 +88,13 @@ class OSDSupport(MgrModule):
             # assemble a set of emptying osds and to_be_emptied osds
             self.check_osds.update(self.emptying_osds)
             self.check_osds.update(self.osd_ids)
+            self.check_osds.update(self.empty)
 
             report = list()
             for osd_id in self.check_osds:
                 pgs = self.get_pg_count(osd_id)
                 report.append(dict(osd_id=osd_id, pgs=pgs))
-            out = f"{report}"
+            out = f"{json.dumps(report)}"
 
         elif cmd_prefix == 'osd drain stop':
             if not osd_ids:
@@ -130,7 +132,6 @@ class OSDSupport(MgrModule):
         """
         self.log.info("Starting mgr/osd_support")
         while self.run:
-            # Do some useful background work here.
 
             self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
             self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
@@ -153,12 +154,31 @@ class OSDSupport(MgrModule):
             # remove osds that are marked as empty
             self.emptying_osds = self.emptying_osds.difference(empty_osds)
 
+            # move empty osds in the done queue until they disappear from ceph's view
+            # other modules need to know when OSDs are empty
+            for osd in empty_osds:
+                self.log.debug(f"Adding {osd} to list of empty OSDs")
+                self.empty.add(osd)
+
+            # remove from queue if no longer part of ceph cluster
+            self.cleanup()
+
             # fixed sleep interval of 10 seconds
             sleep_interval = 10
             self.log.debug('Sleeping for %d seconds', sleep_interval)
             self.event.wait(sleep_interval)
             self.event.clear()
 
+    def cleanup(self):
+        """
+        Remove OSDs that are no longer in the ceph cluster from the
+        'done' list.
+        :return:
+        """
+        for osd in self.osds_not_in_cluster(list(self.empty)):
+            self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing")
+            self.empty.remove(osd)
+
     def shutdown(self):
         """
         This method is called by the mgr when the module needs to shut
@@ -216,10 +236,8 @@ class OSDSupport(MgrModule):
         osd_nodes = osd_df.get('nodes', [])
         for osd_node in osd_nodes:
             if osd_node.get('id', None) == int(osd_id):
-                return osd_node.get('pgs')
-        errmsg = f"Could not find <pgs> field for osd_id: {osd_id} in osd_df data"
-        self.log.error(errmsg)
-        raise RuntimeError(errmsg)
+                return osd_node.get('pgs', -1)
+        return -1
 
     def get_osd_weight(self, osd_id: int) -> float:
         osd_df = self.osd_df()
index 4e365b441c4c0f9d1d30736de8a7cd20d06633e3..0fc10103f0585deaf10a40b5e1ae49a9fa464835 100644 (file)
@@ -14,6 +14,7 @@ deps =
     mypy
 commands = mypy --config-file=../../mypy.ini \
            cephadm/module.py \
+           cephadm/_utils.py \
            mgr_module.py \
            dashboard/module.py \
            mgr_util.py \