]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mgr/cephadm: implement osd remove/replace logic in cephadm
authorJoshua Schmid <jschmid@suse.de>
Fri, 24 Jul 2020 13:24:38 +0000 (15:24 +0200)
committerJoshua Schmid <jschmid@suse.de>
Fri, 31 Jul 2020 08:13:56 +0000 (10:13 +0200)
Fixes: https://tracker.ceph.com/issues/44548
Fixes: https://tracker.ceph.com/issues/45594
Signed-off-by: Joshua Schmid <jschmid@suse.de>
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/services/osd.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py

index 1cd36d271297547798ce9a58f1c6a0caf7eb2c71..185585288436dcc0aad7933042c605b57450dcc9 100644 (file)
@@ -38,7 +38,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ
     RbdMirrorService, CrashService, CephadmService
 from .services.iscsi import IscsiService
 from .services.nfs import NFSService
-from .services.osd import RemoveUtil, OSDRemoval, OSDService
+from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService
 from .schedule import HostAssignment, HostPlacementSpec
@@ -318,7 +318,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
 
         self.cache = HostCache(self)
         self.cache.load()
+
         self.rm_util = RemoveUtil(self)
+        self.to_remove_osds = OSDQueue()
+        self.rm_util.load_from_store()
 
         self.spec_store = SpecStore(self)
         self.spec_store.load()
@@ -495,7 +498,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 self._update_paused_health()
 
                 if not self.paused:
-                    self.rm_util._remove_osds_bg()
+                    self.rm_util.process_removal_queue()
 
                     self.migration.migrate()
                     if self.migration.is_migration_ongoing():
@@ -564,6 +567,21 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
                 monmap['modified'], CEPH_DATEFMT)
             if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
                 self.last_monmap = None  # just in case clocks are skewed
+            self.cache.distribute_new_etc_ceph_ceph_conf()
+        if notify_type == "pg_summary":
+            self._trigger_osd_removal()
+
+    def _trigger_osd_removal(self):
+        data = self.get("osd_stats")
+        for osd in data.get('osd_stats', []):
+            if osd.get('num_pgs') == 0:
+                # if _ANY_ osd that is currently in the queue appears to be empty,
+                # start the removal process
+                if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
+                    self.log.debug(f"Found empty osd. Starting removal process")
+                    # if the osd that is now empty is also part of the removal queue
+                    # start the process
+                    self.rm_util.process_removal_queue()
 
     def pause(self):
         if not self.paused:
@@ -2461,31 +2479,53 @@ you may want to run:
         """
         Takes a list of OSDs and schedules them for removal.
         The function that takes care of the actual removal is
-        _remove_osds_bg().
+        process_removal_queue().
         """
 
-        daemons = self.cache.get_daemons_by_service('osd')
-        found: Set[OSDRemoval] = set()
+        daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_service('osd')
+        to_remove_daemons = list()
         for daemon in daemons:
-            if daemon.daemon_id not in osd_ids:
-                continue
-            found.add(OSDRemoval(daemon.daemon_id, replace, force,
-                                 daemon.hostname, daemon.name(),
-                                 datetime.datetime.utcnow(), -1))
-
-        not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
-        if not_found:
-            raise OrchestratorError('Unable to find OSD: %s' % not_found)
+            if daemon.daemon_id in osd_ids:
+                to_remove_daemons.append(daemon)
+        if not to_remove_daemons:
+            return f"Unable to find OSDs: {osd_ids}"
 
-        self.rm_util.queue_osds_for_removal(found)
+        for daemon in to_remove_daemons:
+            try:
+                self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
+                                                replace=replace,
+                                                force=force,
+                                                hostname=daemon.hostname,
+                                                fullname=daemon.name(),
+                                                process_started_at=datetime.datetime.utcnow(),
+                                                remove_util=self.rm_util))
+            except NotFoundError:
+                return f"Unable to find OSDs: {osd_ids}"
 
         # trigger the serve loop to initiate the removal
         self._kick_serve_loop()
         return "Scheduled OSD(s) for removal"
 
     @trivial_completion
-    def remove_osds_status(self) -> Set[OSDRemoval]:
+    def stop_remove_osds(self, osd_ids: List[str]):
+        """
+        Stops a `removal` process for a List of OSDs.
+        This will revert their weight and remove it from the osds_to_remove queue
+        """
+        for osd_id in osd_ids:
+            try:
+                self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
+                                           remove_util=self.rm_util))
+            except (NotFoundError, KeyError):
+                return f'Unable to find OSD in the queue: {osd_id}'
+
+        # trigger the serve loop to halt the removal
+        self._kick_serve_loop()
+        return "Stopped OSD(s) removal"
+
+    @trivial_completion
+    def remove_osds_status(self):
         """
         The CLI call to retrieve an osd removal report
         """
-        return self.rm_util.report
+        return self.to_remove_osds.all_osds()
index 21038c467b26edce9bb0a02ff3bf536b0f19d4e2..3a7297bdc442f46a1e54a1934ed5e18b9197ba76 100644 (file)
@@ -1,4 +1,3 @@
-import datetime
 import json
 import logging
 from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional
@@ -7,6 +6,7 @@ from ceph.deployment import translate
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.drive_selection import DriveSelection
 
+from datetime import datetime
 import orchestrator
 from cephadm.utils import forall_hosts
 from orchestrator import OrchestratorError
@@ -15,6 +15,7 @@ from mgr_module import MonCommandFailed
 from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec
 
 logger = logging.getLogger(__name__)
+DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
 
 
 class OSDService(CephadmService):
@@ -280,139 +281,148 @@ class OSDService(CephadmService):
         return osd_host_map
 
 
-class OSDRemoval(object):
-    def __init__(self,
-                 osd_id: str,
-                 replace: bool,
-                 force: bool,
-                 nodename: str,
-                 fullname: str,
-                 start_at: datetime.datetime,
-                 pg_count: int):
-        self.osd_id = osd_id
-        self.replace = replace
-        self.force = force
-        self.nodename = nodename
-        self.fullname = fullname
-        self.started_at = start_at
-        self.pg_count = pg_count
-
-    # needed due to changing 'started_at' attr
-    def __eq__(self, other):
-        return self.osd_id == other.osd_id
-
-    def __hash__(self):
-        return hash(self.osd_id)
-
-    def __repr__(self):
-        return ('<OSDRemoval>(osd_id={}, replace={}, force={}, nodename={}'
-                ', fullname={}, started_at={}, pg_count={})').format(
-            self.osd_id, self.replace, self.force, self.nodename,
-            self.fullname, self.started_at, self.pg_count)
-
-    @property
-    def pg_count_str(self) -> str:
-        return 'n/a' if self.pg_count < 0 else str(self.pg_count)
-
-
 class RemoveUtil(object):
     def __init__(self, mgr):
         self.mgr = mgr
-        self.to_remove_osds: Set[OSDRemoval] = set()
-        self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict()
-
-    @property
-    def report(self) -> Set[OSDRemoval]:
-        return self.to_remove_osds.copy()
-
-    def queue_osds_for_removal(self, osds: Set[OSDRemoval]):
-        self.to_remove_osds.update(osds)
 
-    def _remove_osds_bg(self) -> None:
+    def process_removal_queue(self) -> None:
         """
         Performs actions in the _serve() loop to remove an OSD
         when criteria is met.
         """
+
+        # make sure that we don't run on OSDs that are not in the cluster anymore.
+        self.cleanup()
+
         logger.debug(
-            f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
-        self._update_osd_removal_status()
-        remove_osds: set = self.to_remove_osds.copy()
-        for osd in remove_osds:
+            f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled "
+            f"for removal: {self.mgr.to_remove_osds.all_osds()}")
+
+        # find osds that are ok-to-stop and not yet draining
+        ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds())
+        if ok_to_stop_osds:
+            # start draining those
+            _ = [osd.start_draining() for osd in ok_to_stop_osds]
+
+        # Check all osds for their state and take action (remove, purge etc)
+        to_remove_osds = self.mgr.to_remove_osds.all_osds()
+        new_queue = set()
+        for osd in to_remove_osds:
             if not osd.force:
-                self.drain_osd(osd.osd_id)
                 # skip criteria
-                if not self.is_empty(osd.osd_id):
+                if not osd.is_empty:
                     logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+                    new_queue.add(osd)
                     continue
 
-            if not self.ok_to_destroy([osd.osd_id]):
+            if not osd.safe_to_destroy():
                 logger.info(
                     f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+                new_queue.add(osd)
                 continue
 
             # abort criteria
-            if not self.down_osd([osd.osd_id]):
+            if not osd.down():
                 # also remove it from the remove_osd list and set a health_check warning?
                 raise orchestrator.OrchestratorError(
                     f"Could not set OSD <{osd.osd_id}> to 'down'")
 
             if osd.replace:
-                if not self.destroy_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
+                if not osd.destroy():
                     raise orchestrator.OrchestratorError(
                         f"Could not destroy OSD <{osd.osd_id}>")
             else:
-                if not self.purge_osd(osd.osd_id):
-                    # also remove it from the remove_osd list and set a health_check warning?
+                if not osd.purge():
                     raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
 
+            if not osd.exists:
+                continue
             self.mgr._remove_daemon(osd.fullname, osd.nodename)
             logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}")
             logger.debug(f"Removing {osd.osd_id} from the queue.")
-            self.to_remove_osds.remove(osd)
 
-    def _update_osd_removal_status(self):
-        """
-        Generate a OSD report that can be printed to the CLI
-        """
-        logger.debug("Update OSD removal status")
-        for osd in self.to_remove_osds:
-            osd.pg_count = self.get_pg_count(str(osd.osd_id))
-        logger.debug(f"OSD removal status: {self.to_remove_osds}")
+        # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI)
+        # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
+        # osds that were added while this method was executed'
+        self.mgr.to_remove_osds.intersection_update(new_queue)
+        self.save_to_store()
+
+    def cleanup(self):
+        # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
+        not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster()
+        [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds]
 
-    def drain_osd(self, osd_id: str) -> bool:
+    def get_osds_in_cluster(self) -> List[str]:
+        osd_map = self.mgr.get_osdmap()
+        return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]
+
+    def osd_df(self) -> dict:
+        base_cmd = 'osd df'
+        ret, out, err = self.mgr.mon_command({
+            'prefix': base_cmd,
+            'format': 'json'
+        })
+        return json.loads(out)
+
+    def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
+        if not osd_df:
+            osd_df = self.osd_df()
+        osd_nodes = osd_df.get('nodes', [])
+        for osd_node in osd_nodes:
+            if osd_node.get('id') == int(osd_id):
+                return osd_node.get('pgs', -1)
+        return -1
+
+    def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
         """
-        Uses `osd_support` module to schedule a drain operation of an OSD
+        Cut osd_id list in half until it's ok-to-stop
+
+        :param osds: list of osd_ids
+        :return: list of ods_ids that can be stopped at once
         """
+        if not osds:
+            return []
+        while not self.ok_to_stop(osds):
+            if len(osds) <= 1:
+                # can't even stop one OSD, aborting
+                self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..")
+                return []
+
+            # This potentially prolongs the global wait time.
+            self.mgr.event.wait(1)
+            # splitting osd_ids in half until ok_to_stop yields success
+            # maybe popping ids off one by one is better here..depends on the cluster size I guess..
+            # There's a lot of room for micro adjustments here
+            osds = osds[len(osds) // 2:]
+        return osds
+
+       # todo start draining
+       #  return all([osd.start_draining() for osd in osds])
+
+    def ok_to_stop(self, osds: List["OSD"]) -> bool:
         cmd_args = {
-            'prefix': 'osd drain',
-            'osd_ids': [int(osd_id)]
+            'prefix': "osd ok-to-stop",
+            'ids': [str(osd.osd_id) for osd in osds]
         }
         return self._run_mon_cmd(cmd_args)
 
-    def get_pg_count(self, osd_id: str) -> int:
-        """ Queries for PG count of an OSD """
-        self.mgr.log.debug("Querying for drain status")
+    def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
+        base_cmd = f"osd {flag}"
+        self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
         ret, out, err = self.mgr.mon_command({
-            'prefix': 'osd drain status',
+            'prefix': base_cmd,
+            'ids': [str(osd.osd_id) for osd in osds]
         })
         if ret != 0:
-            self.mgr.log.error(f"Calling osd drain status failed with {err}")
-            raise OrchestratorError("Could not query `osd drain status`")
-        out = json.loads(out)
-        for o in out:
-            if str(o.get('osd_id', '')) == str(osd_id):
-                return int(o.get('pgs', -1))
-        return -1
-
-    def is_empty(self, osd_id: str) -> bool:
-        """ Checks if an OSD is empty """
-        return self.get_pg_count(osd_id) == 0
+            self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>")
+            return False
+        self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>")
+        return True
 
-    def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+    def safe_to_destroy(self, osd_ids: List[int]) -> bool:
         """ Queries the safe-to-destroy flag for OSDs """
         cmd_args = {'prefix': 'osd safe-to-destroy',
-                    'ids': osd_ids}
+                    'ids': [str(x) for x in osd_ids]}
         return self._run_mon_cmd(cmd_args)
 
     def destroy_osd(self, osd_id: int) -> bool:
@@ -422,14 +432,6 @@ class RemoveUtil(object):
                     'yes_i_really_mean_it': True}
         return self._run_mon_cmd(cmd_args)
 
-    def down_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `out` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd down',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
     def purge_osd(self, osd_id: int) -> bool:
         """ Purges an OSD from the cluster (forcefully) """
         cmd_args = {
@@ -439,14 +441,6 @@ class RemoveUtil(object):
         }
         return self._run_mon_cmd(cmd_args)
 
-    def out_osd(self, osd_ids: List[int]) -> bool:
-        """ Sets `down` flag to OSDs """
-        cmd_args = {
-            'prefix': 'osd out',
-            'ids': osd_ids,
-        }
-        return self._run_mon_cmd(cmd_args)
-
     def _run_mon_cmd(self, cmd_args: dict) -> bool:
         """
         Generic command to run mon_command and evaluate/log the results
@@ -458,3 +452,237 @@ class RemoveUtil(object):
             return False
         self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
         return True
+
+    def save_to_store(self):
+        osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()]
+        logger.debug(f"Saving {osd_queue} to store")
+        self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))
+
+    def load_from_store(self):
+        for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
+            for osd in json.loads(v):
+                logger.debug(f"Loading osd ->{osd} from store")
+                osd_obj = OSD.from_json(json.loads(osd), ctx=self)
+                self.mgr.to_remove_osds.add(osd_obj)
+
+
+class NotFoundError(Exception):
+    pass
+
+
+class OSD:
+
+    def __init__(self,
+                 osd_id: int,
+                 remove_util: RemoveUtil,
+                 drain_started_at: Optional[datetime] = None,
+                 process_started_at: Optional[datetime] = None,
+                 drain_stopped_at: Optional[datetime] = None,
+                 drain_done_at: Optional[datetime] = None,
+                 draining: bool = False,
+                 started: bool = False,
+                 stopped: bool = False,
+                 replace: bool = False,
+                 force: bool = False,
+                 hostname: Optional[str] = None,
+                 fullname: Optional[str] = None,
+                 ):
+        # the ID of the OSD
+        self.osd_id = osd_id
+
+        # when did process (not the actual draining) start
+        self.process_started_at = process_started_at
+
+        # when did the drain start
+        self.drain_started_at = drain_started_at
+
+        # when did the drain stop
+        self.drain_stopped_at = drain_stopped_at
+
+        # when did the drain finish
+        self.drain_done_at = drain_done_at
+
+        # did the draining start
+        self.draining = draining
+
+        # was the operation started
+        self.started = started
+
+        # was the operation stopped
+        self.stopped = stopped
+
+        # If this is a replace or remove operation
+        self.replace = replace
+        # If we wait for the osd to be drained
+        self.force = force
+        # The name of the node
+        self.nodename = hostname
+        # The full name of the osd
+        self.fullname = fullname
+
+        # mgr obj to make mgr/mon calls
+        self.rm_util = remove_util
+
+    def start(self) -> None:
+        if self.started:
+            logger.debug(f"Already started draining {self}")
+            return None
+        self.started = True
+        self.stopped = False
+
+    def start_draining(self) -> bool:
+        if self.stopped:
+            logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
+            return False
+        self.rm_util.set_osd_flag([self], 'out')
+        self.drain_started_at = datetime.utcnow()
+        self.draining = True
+        logger.debug(f"Started draining {self}.")
+        return True
+
+    def stop_draining(self) -> bool:
+        self.rm_util.set_osd_flag([self], 'in')
+        self.drain_stopped_at = datetime.utcnow()
+        self.draining = False
+        logger.debug(f"Stopped draining {self}.")
+        return True
+
+    def stop(self) -> None:
+        if self.stopped:
+            logger.debug(f"Already stopped draining {self}")
+            return None
+        self.started = False
+        self.stopped = True
+        self.stop_draining()
+
+    @property
+    def is_draining(self) -> bool:
+        """
+        Consider an OSD draining when it is
+        actively draining but not yet empty
+        """
+        return self.draining and not self.is_empty
+
+    @property
+    def is_ok_to_stop(self) -> bool:
+        return self.rm_util.ok_to_stop([self])
+
+    @property
+    def is_empty(self) -> bool:
+        if self.get_pg_count() == 0:
+            if not self.drain_done_at:
+                self.drain_done_at = datetime.utcnow()
+                self.draining = False
+            return True
+        return False
+
+    def safe_to_destroy(self) -> bool:
+        return self.rm_util.safe_to_destroy([self.osd_id])
+
+    def down(self) -> bool:
+        return self.rm_util.set_osd_flag([self], 'down')
+
+    def destroy(self) -> bool:
+        return self.rm_util.destroy_osd(self.osd_id)
+
+    def purge(self) -> bool:
+        return self.rm_util.purge_osd(self.osd_id)
+
+    def get_pg_count(self) -> int:
+        return self.rm_util.get_pg_count(self.osd_id)
+
+    @property
+    def exists(self) -> bool:
+        return str(self.osd_id) in self.rm_util.get_osds_in_cluster()
+
+    def drain_status_human(self):
+        default_status = 'not started'
+        status = 'started' if self.started and not self.draining else default_status
+        status = 'draining' if self.draining else status
+        status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
+        return status
+
+    def pg_count_str(self):
+        return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())
+
+    def to_json(self) -> str:
+        out = dict()
+        out['osd_id'] = self.osd_id
+        out['started'] = self.started
+        out['draining'] = self.draining
+        out['stopped'] = self.stopped
+        out['replace'] = self.replace
+        out['force'] = self.force
+        out['nodename'] = self.nodename  # type: ignore
+
+        for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+            if getattr(self, k):
+                out[k] = getattr(self, k).strftime(DATEFMT)
+            else:
+                out[k] = getattr(self, k)
+        return json.dumps(out)
+
+    @classmethod
+    def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]:
+        if not inp:
+            return None
+        for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
+            if inp.get(date_field):
+                inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)})
+        inp.update({'remove_util': ctx})
+        return cls(**inp)
+
+    def __hash__(self):
+        return hash(self.osd_id)
+
+    def __eq__(self, other: object) -> bool:
+        if not isinstance(other, OSD):
+            return NotImplemented
+        return self.osd_id == other.osd_id
+
+    def __repr__(self) -> str:
+        return f"<OSD>(osd_id={self.osd_id}, is_draining={self.is_draining})"
+
+
+class OSDQueue(Set):
+
+    def __init__(self):
+        super().__init__()
+
+    def as_osd_ids(self) -> List[int]:
+        return [osd.osd_id for osd in self]
+
+    def queue_size(self) -> int:
+        return len(self)
+
+    def draining_osds(self) -> List["OSD"]:
+        return [osd for osd in self if osd.is_draining]
+
+    def idling_osds(self) -> List["OSD"]:
+        return [osd for osd in self if not osd.is_draining and not osd.is_empty]
+
+    def empty_osds(self) -> List["OSD"]:
+        return [osd for osd in self if osd.is_empty]
+
+    def all_osds(self) -> List["OSD"]:
+        return [osd for osd in self]
+
+    def not_in_cluster(self) -> List["OSD"]:
+        return [osd for osd in self if not osd.exists]
+
+    def enqueue(self, osd: "OSD") -> None:
+        if not osd.exists:
+            raise NotFoundError()
+        self.add(osd)
+        osd.start()
+
+    def rm(self, osd: "OSD") -> None:
+        if not osd.exists:
+            raise NotFoundError()
+        osd.stop()
+        try:
+            logger.debug(f'Removing {osd} from the queue.')
+            self.remove(osd)
+        except KeyError:
+            logger.debug(f"Could not find {osd} in queue.")
+            raise KeyError
index 667cb7bc659205c8b5df1bb8b1c7ea041c947397..ef6325a30e850377b986798f660030d0047d85a9 100644 (file)
@@ -1017,6 +1017,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+        """
+        TODO
+        """
+        raise NotImplementedError()
+
     def remove_osds_status(self):
         # type: () -> Completion
         """
index f1000fe18d1a631f09a8a27cce6d1065788faf44..024a3a8c5336f4d07cafa8b854d05df79b3d30b5 100644 (file)
@@ -856,35 +856,53 @@ Usage:
         "name=replace,type=CephBool,req=false "
         "name=force,type=CephBool,req=false",
         'Remove OSD services')
-    def _osd_rm(self, svc_id: List[str],
-                replace: bool = False,
-                force: bool = False) -> HandleCommandResult:
-        completion = self.remove_osds(svc_id, replace, force)
+    def _osd_rm_start(self,
+                      svc_id: List[str],
+                      replace: bool = False,
+                      force: bool = False) -> HandleCommandResult:
+        completion = self.remove_osds(svc_id, replace=replace, force=force)
+        self._orchestrator_wait([completion])
+        raise_if_exception(completion)
+        return HandleCommandResult(stdout=completion.result_str())
+
+    @_cli_write_command(
+        'orch osd rm stop',
+        "name=svc_id,type=CephString,n=N",
+        'Remove OSD services')
+    def _osd_rm_stop(self, svc_id: List[str]) -> HandleCommandResult:
+        completion = self.stop_remove_osds(svc_id)
         self._orchestrator_wait([completion])
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())
 
     @_cli_write_command(
         'orch osd rm status',
+        "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false",
         desc='status of OSD removal operation')
-    def _osd_rm_status(self) -> HandleCommandResult:
+    def _osd_rm_status(self, format='plain') -> HandleCommandResult:
         completion = self.remove_osds_status()
         self._orchestrator_wait([completion])
         raise_if_exception(completion)
         report = completion.result
+
         if not report:
             return HandleCommandResult(stdout="No OSD remove/replace operations reported")
-        table = PrettyTable(
-            ['NAME', 'HOST', 'PGS', 'STARTED_AT'],
-            border=False)
-        table.align = 'l'
-        table.left_padding_width = 0
-        table.right_padding_width = 1
-        # TODO: re-add sorted and sort by pg_count
-        for osd in report:
-            table.add_row((osd.fullname, osd.nodename, osd.pg_count_str, osd.started_at))
-
-        return HandleCommandResult(stdout=table.get_string())
+
+        if format != 'plain':
+            out = to_format(report, format, many=True, cls=None)
+        else:
+            table = PrettyTable(
+                ['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'],
+                border=False)
+            table.align = 'l'
+            table.left_padding_width = 0
+            table.right_padding_width = 2
+            for osd in sorted(report, key=lambda o: o.osd_id):
+                table.add_row([osd.osd_id, osd.nodename, osd.drain_status_human(),
+                               osd.get_pg_count(), osd.replace, osd.replace, osd.drain_started_at])
+            out = table.get_string()
+
+        return HandleCommandResult(stdout=out)
 
     @_cli_write_command(
         'orch daemon add',