From a0f68c488f74c13534b1b6452a8018f4bb2802d1 Mon Sep 17 00:00:00 2001 From: Joshua Schmid Date: Fri, 24 Jul 2020 15:24:38 +0200 Subject: [PATCH] mgr/cephadm: implement osd remove/replace logic in cephadm Fixes: https://tracker.ceph.com/issues/44548 Fixes: https://tracker.ceph.com/issues/45594 Signed-off-by: Joshua Schmid --- src/pybind/mgr/cephadm/module.py | 74 +++- src/pybind/mgr/cephadm/services/osd.py | 436 ++++++++++++++++------ src/pybind/mgr/orchestrator/_interface.py | 6 + src/pybind/mgr/orchestrator/module.py | 50 ++- 4 files changed, 429 insertions(+), 137 deletions(-) diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 1cd36d271297..185585288436 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -38,7 +38,7 @@ from .services.cephadmservice import MonService, MgrService, MdsService, RgwServ RbdMirrorService, CrashService, CephadmService from .services.iscsi import IscsiService from .services.nfs import NFSService -from .services.osd import RemoveUtil, OSDRemoval, OSDService +from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService from .schedule import HostAssignment, HostPlacementSpec @@ -318,7 +318,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.cache = HostCache(self) self.cache.load() + self.rm_util = RemoveUtil(self) + self.to_remove_osds = OSDQueue() + self.rm_util.load_from_store() self.spec_store = SpecStore(self) self.spec_store.load() @@ -495,7 +498,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self._update_paused_health() if not self.paused: - self.rm_util._remove_osds_bg() + self.rm_util.process_removal_queue() self.migration.migrate() if self.migration.is_migration_ongoing(): @@ -564,6 +567,21 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, monmap['modified'], CEPH_DATEFMT) if self.last_monmap and self.last_monmap > datetime.datetime.utcnow(): self.last_monmap = None # just in case clocks are skewed + self.cache.distribute_new_etc_ceph_ceph_conf() + if notify_type == "pg_summary": + self._trigger_osd_removal() + + def _trigger_osd_removal(self): + data = self.get("osd_stats") + for osd in data.get('osd_stats', []): + if osd.get('num_pgs') == 0: + # if _ANY_ osd that is currently in the queue appears to be empty, + # start the removal process + if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids(): + self.log.debug(f"Found empty osd. Starting removal process") + # if the osd that is now empty is also part of the removal queue + # start the process + self.rm_util.process_removal_queue() def pause(self): if not self.paused: @@ -2461,31 +2479,53 @@ you may want to run: """ Takes a list of OSDs and schedules them for removal. The function that takes care of the actual removal is - _remove_osds_bg(). + process_removal_queue(). """ - daemons = self.cache.get_daemons_by_service('osd') - found: Set[OSDRemoval] = set() + daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_service('osd') + to_remove_daemons = list() for daemon in daemons: - if daemon.daemon_id not in osd_ids: - continue - found.add(OSDRemoval(daemon.daemon_id, replace, force, - daemon.hostname, daemon.name(), - datetime.datetime.utcnow(), -1)) - - not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} - if not_found: - raise OrchestratorError('Unable to find OSD: %s' % not_found) + if daemon.daemon_id in osd_ids: + to_remove_daemons.append(daemon) + if not to_remove_daemons: + return f"Unable to find OSDs: {osd_ids}" - self.rm_util.queue_osds_for_removal(found) + for daemon in to_remove_daemons: + try: + self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), + replace=replace, + force=force, + hostname=daemon.hostname, + fullname=daemon.name(), + process_started_at=datetime.datetime.utcnow(), + remove_util=self.rm_util)) + except NotFoundError: + return f"Unable to find OSDs: {osd_ids}" # trigger the serve loop to initiate the removal self._kick_serve_loop() return "Scheduled OSD(s) for removal" @trivial_completion - def remove_osds_status(self) -> Set[OSDRemoval]: + def stop_remove_osds(self, osd_ids: List[str]): + """ + Stops a `removal` process for a List of OSDs. + This will revert their weight and remove it from the osds_to_remove queue + """ + for osd_id in osd_ids: + try: + self.to_remove_osds.rm(OSD(osd_id=int(osd_id), + remove_util=self.rm_util)) + except (NotFoundError, KeyError): + return f'Unable to find OSD in the queue: {osd_id}' + + # trigger the serve loop to halt the removal + self._kick_serve_loop() + return "Stopped OSD(s) removal" + + @trivial_completion + def remove_osds_status(self): """ The CLI call to retrieve an osd removal report """ - return self.rm_util.report + return self.to_remove_osds.all_osds() diff --git a/src/pybind/mgr/cephadm/services/osd.py b/src/pybind/mgr/cephadm/services/osd.py index 21038c467b26..3a7297bdc442 100644 --- a/src/pybind/mgr/cephadm/services/osd.py +++ b/src/pybind/mgr/cephadm/services/osd.py @@ -1,4 +1,3 @@ -import datetime import json import logging from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional @@ -7,6 +6,7 @@ from ceph.deployment import translate from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.drive_selection import DriveSelection +from datetime import datetime import orchestrator from cephadm.utils import forall_hosts from orchestrator import OrchestratorError @@ -15,6 +15,7 @@ from mgr_module import MonCommandFailed from cephadm.services.cephadmservice import CephadmService, CephadmDaemonSpec logger = logging.getLogger(__name__) +DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' class OSDService(CephadmService): @@ -280,139 +281,148 @@ class OSDService(CephadmService): return osd_host_map -class OSDRemoval(object): - def __init__(self, - osd_id: str, - replace: bool, - force: bool, - nodename: str, - fullname: str, - start_at: datetime.datetime, - pg_count: int): - self.osd_id = osd_id - self.replace = replace - self.force = force - self.nodename = nodename - self.fullname = fullname - self.started_at = start_at - self.pg_count = pg_count - - # needed due to changing 'started_at' attr - def __eq__(self, other): - return self.osd_id == other.osd_id - - def __hash__(self): - return hash(self.osd_id) - - def __repr__(self): - return ('(osd_id={}, replace={}, force={}, nodename={}' - ', fullname={}, started_at={}, pg_count={})').format( - self.osd_id, self.replace, self.force, self.nodename, - self.fullname, self.started_at, self.pg_count) - - @property - def pg_count_str(self) -> str: - return 'n/a' if self.pg_count < 0 else str(self.pg_count) - - class RemoveUtil(object): def __init__(self, mgr): self.mgr = mgr - self.to_remove_osds: Set[OSDRemoval] = set() - self.osd_removal_report: Dict[OSDRemoval, Union[int,str]] = dict() - - @property - def report(self) -> Set[OSDRemoval]: - return self.to_remove_osds.copy() - - def queue_osds_for_removal(self, osds: Set[OSDRemoval]): - self.to_remove_osds.update(osds) - def _remove_osds_bg(self) -> None: + def process_removal_queue(self) -> None: """ Performs actions in the _serve() loop to remove an OSD when criteria is met. """ + + # make sure that we don't run on OSDs that are not in the cluster anymore. + self.cleanup() + logger.debug( - f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") - self._update_osd_removal_status() - remove_osds: set = self.to_remove_osds.copy() - for osd in remove_osds: + f"{self.mgr.to_remove_osds.queue_size()} OSDs are scheduled " + f"for removal: {self.mgr.to_remove_osds.all_osds()}") + + # find osds that are ok-to-stop and not yet draining + ok_to_stop_osds = self.find_osd_stop_threshold(self.mgr.to_remove_osds.idling_osds()) + if ok_to_stop_osds: + # start draining those + _ = [osd.start_draining() for osd in ok_to_stop_osds] + + # Check all osds for their state and take action (remove, purge etc) + to_remove_osds = self.mgr.to_remove_osds.all_osds() + new_queue = set() + for osd in to_remove_osds: if not osd.force: - self.drain_osd(osd.osd_id) # skip criteria - if not self.is_empty(osd.osd_id): + if not osd.is_empty: logger.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + new_queue.add(osd) continue - if not self.ok_to_destroy([osd.osd_id]): + if not osd.safe_to_destroy(): logger.info( f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + new_queue.add(osd) continue # abort criteria - if not self.down_osd([osd.osd_id]): + if not osd.down(): # also remove it from the remove_osd list and set a health_check warning? raise orchestrator.OrchestratorError( f"Could not set OSD <{osd.osd_id}> to 'down'") if osd.replace: - if not self.destroy_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? + if not osd.destroy(): raise orchestrator.OrchestratorError( f"Could not destroy OSD <{osd.osd_id}>") else: - if not self.purge_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? + if not osd.purge(): raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + if not osd.exists: + continue self.mgr._remove_daemon(osd.fullname, osd.nodename) logger.info(f"Successfully removed OSD <{osd.osd_id}> on {osd.nodename}") logger.debug(f"Removing {osd.osd_id} from the queue.") - self.to_remove_osds.remove(osd) - def _update_osd_removal_status(self): - """ - Generate a OSD report that can be printed to the CLI - """ - logger.debug("Update OSD removal status") - for osd in self.to_remove_osds: - osd.pg_count = self.get_pg_count(str(osd.osd_id)) - logger.debug(f"OSD removal status: {self.to_remove_osds}") + # self.mgr.to_remove_osds could change while this is processing (osds get added from the CLI) + # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and + # osds that were added while this method was executed' + self.mgr.to_remove_osds.intersection_update(new_queue) + self.save_to_store() + + def cleanup(self): + # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs + not_in_cluster_osds = self.mgr.to_remove_osds.not_in_cluster() + [self.mgr.to_remove_osds.remove(osd) for osd in not_in_cluster_osds] - def drain_osd(self, osd_id: str) -> bool: + def get_osds_in_cluster(self) -> List[str]: + osd_map = self.mgr.get_osdmap() + return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])] + + def osd_df(self) -> dict: + base_cmd = 'osd df' + ret, out, err = self.mgr.mon_command({ + 'prefix': base_cmd, + 'format': 'json' + }) + return json.loads(out) + + def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int: + if not osd_df: + osd_df = self.osd_df() + osd_nodes = osd_df.get('nodes', []) + for osd_node in osd_nodes: + if osd_node.get('id') == int(osd_id): + return osd_node.get('pgs', -1) + return -1 + + def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]: """ - Uses `osd_support` module to schedule a drain operation of an OSD + Cut osd_id list in half until it's ok-to-stop + + :param osds: list of osd_ids + :return: list of ods_ids that can be stopped at once """ + if not osds: + return [] + while not self.ok_to_stop(osds): + if len(osds) <= 1: + # can't even stop one OSD, aborting + self.mgr.log.info("Can't even stop one OSD. Cluster is probably busy. Retrying later..") + return [] + + # This potentially prolongs the global wait time. + self.mgr.event.wait(1) + # splitting osd_ids in half until ok_to_stop yields success + # maybe popping ids off one by one is better here..depends on the cluster size I guess.. + # There's a lot of room for micro adjustments here + osds = osds[len(osds) // 2:] + return osds + + # todo start draining + # return all([osd.start_draining() for osd in osds]) + + def ok_to_stop(self, osds: List["OSD"]) -> bool: cmd_args = { - 'prefix': 'osd drain', - 'osd_ids': [int(osd_id)] + 'prefix': "osd ok-to-stop", + 'ids': [str(osd.osd_id) for osd in osds] } return self._run_mon_cmd(cmd_args) - def get_pg_count(self, osd_id: str) -> int: - """ Queries for PG count of an OSD """ - self.mgr.log.debug("Querying for drain status") + def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool: + base_cmd = f"osd {flag}" + self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}") ret, out, err = self.mgr.mon_command({ - 'prefix': 'osd drain status', + 'prefix': base_cmd, + 'ids': [str(osd.osd_id) for osd in osds] }) if ret != 0: - self.mgr.log.error(f"Calling osd drain status failed with {err}") - raise OrchestratorError("Could not query `osd drain status`") - out = json.loads(out) - for o in out: - if str(o.get('osd_id', '')) == str(osd_id): - return int(o.get('pgs', -1)) - return -1 - - def is_empty(self, osd_id: str) -> bool: - """ Checks if an OSD is empty """ - return self.get_pg_count(osd_id) == 0 + self.mgr.log.error(f"Could not set <{flag}> flag for osds: {osds}. <{err}>") + return False + self.mgr.log.info(f"OSDs <{osds}> are now <{flag}>") + return True - def ok_to_destroy(self, osd_ids: List[int]) -> bool: + def safe_to_destroy(self, osd_ids: List[int]) -> bool: """ Queries the safe-to-destroy flag for OSDs """ cmd_args = {'prefix': 'osd safe-to-destroy', - 'ids': osd_ids} + 'ids': [str(x) for x in osd_ids]} return self._run_mon_cmd(cmd_args) def destroy_osd(self, osd_id: int) -> bool: @@ -422,14 +432,6 @@ class RemoveUtil(object): 'yes_i_really_mean_it': True} return self._run_mon_cmd(cmd_args) - def down_osd(self, osd_ids: List[int]) -> bool: - """ Sets `out` flag to OSDs """ - cmd_args = { - 'prefix': 'osd down', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - def purge_osd(self, osd_id: int) -> bool: """ Purges an OSD from the cluster (forcefully) """ cmd_args = { @@ -439,14 +441,6 @@ class RemoveUtil(object): } return self._run_mon_cmd(cmd_args) - def out_osd(self, osd_ids: List[int]) -> bool: - """ Sets `down` flag to OSDs """ - cmd_args = { - 'prefix': 'osd out', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - def _run_mon_cmd(self, cmd_args: dict) -> bool: """ Generic command to run mon_command and evaluate/log the results @@ -458,3 +452,237 @@ class RemoveUtil(object): return False self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") return True + + def save_to_store(self): + osd_queue = [osd.to_json() for osd in self.mgr.to_remove_osds.all_osds()] + logger.debug(f"Saving {osd_queue} to store") + self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue)) + + def load_from_store(self): + for k, v in self.mgr.get_store_prefix('osd_remove_queue').items(): + for osd in json.loads(v): + logger.debug(f"Loading osd ->{osd} from store") + osd_obj = OSD.from_json(json.loads(osd), ctx=self) + self.mgr.to_remove_osds.add(osd_obj) + + +class NotFoundError(Exception): + pass + + +class OSD: + + def __init__(self, + osd_id: int, + remove_util: RemoveUtil, + drain_started_at: Optional[datetime] = None, + process_started_at: Optional[datetime] = None, + drain_stopped_at: Optional[datetime] = None, + drain_done_at: Optional[datetime] = None, + draining: bool = False, + started: bool = False, + stopped: bool = False, + replace: bool = False, + force: bool = False, + hostname: Optional[str] = None, + fullname: Optional[str] = None, + ): + # the ID of the OSD + self.osd_id = osd_id + + # when did process (not the actual draining) start + self.process_started_at = process_started_at + + # when did the drain start + self.drain_started_at = drain_started_at + + # when did the drain stop + self.drain_stopped_at = drain_stopped_at + + # when did the drain finish + self.drain_done_at = drain_done_at + + # did the draining start + self.draining = draining + + # was the operation started + self.started = started + + # was the operation stopped + self.stopped = stopped + + # If this is a replace or remove operation + self.replace = replace + # If we wait for the osd to be drained + self.force = force + # The name of the node + self.nodename = hostname + # The full name of the osd + self.fullname = fullname + + # mgr obj to make mgr/mon calls + self.rm_util = remove_util + + def start(self) -> None: + if self.started: + logger.debug(f"Already started draining {self}") + return None + self.started = True + self.stopped = False + + def start_draining(self) -> bool: + if self.stopped: + logger.debug(f"Won't start draining {self}. OSD draining is stopped.") + return False + self.rm_util.set_osd_flag([self], 'out') + self.drain_started_at = datetime.utcnow() + self.draining = True + logger.debug(f"Started draining {self}.") + return True + + def stop_draining(self) -> bool: + self.rm_util.set_osd_flag([self], 'in') + self.drain_stopped_at = datetime.utcnow() + self.draining = False + logger.debug(f"Stopped draining {self}.") + return True + + def stop(self) -> None: + if self.stopped: + logger.debug(f"Already stopped draining {self}") + return None + self.started = False + self.stopped = True + self.stop_draining() + + @property + def is_draining(self) -> bool: + """ + Consider an OSD draining when it is + actively draining but not yet empty + """ + return self.draining and not self.is_empty + + @property + def is_ok_to_stop(self) -> bool: + return self.rm_util.ok_to_stop([self]) + + @property + def is_empty(self) -> bool: + if self.get_pg_count() == 0: + if not self.drain_done_at: + self.drain_done_at = datetime.utcnow() + self.draining = False + return True + return False + + def safe_to_destroy(self) -> bool: + return self.rm_util.safe_to_destroy([self.osd_id]) + + def down(self) -> bool: + return self.rm_util.set_osd_flag([self], 'down') + + def destroy(self) -> bool: + return self.rm_util.destroy_osd(self.osd_id) + + def purge(self) -> bool: + return self.rm_util.purge_osd(self.osd_id) + + def get_pg_count(self) -> int: + return self.rm_util.get_pg_count(self.osd_id) + + @property + def exists(self) -> bool: + return str(self.osd_id) in self.rm_util.get_osds_in_cluster() + + def drain_status_human(self): + default_status = 'not started' + status = 'started' if self.started and not self.draining else default_status + status = 'draining' if self.draining else status + status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status + return status + + def pg_count_str(self): + return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count()) + + def to_json(self) -> str: + out = dict() + out['osd_id'] = self.osd_id + out['started'] = self.started + out['draining'] = self.draining + out['stopped'] = self.stopped + out['replace'] = self.replace + out['force'] = self.force + out['nodename'] = self.nodename # type: ignore + + for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if getattr(self, k): + out[k] = getattr(self, k).strftime(DATEFMT) + else: + out[k] = getattr(self, k) + return json.dumps(out) + + @classmethod + def from_json(cls, inp: Optional[Dict[str, Any]], ctx: Optional[RemoveUtil] = None) -> Optional["OSD"]: + if not inp: + return None + for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']: + if inp.get(date_field): + inp.update({date_field: datetime.strptime(inp.get(date_field, ''), DATEFMT)}) + inp.update({'remove_util': ctx}) + return cls(**inp) + + def __hash__(self): + return hash(self.osd_id) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, OSD): + return NotImplemented + return self.osd_id == other.osd_id + + def __repr__(self) -> str: + return f"(osd_id={self.osd_id}, is_draining={self.is_draining})" + + +class OSDQueue(Set): + + def __init__(self): + super().__init__() + + def as_osd_ids(self) -> List[int]: + return [osd.osd_id for osd in self] + + def queue_size(self) -> int: + return len(self) + + def draining_osds(self) -> List["OSD"]: + return [osd for osd in self if osd.is_draining] + + def idling_osds(self) -> List["OSD"]: + return [osd for osd in self if not osd.is_draining and not osd.is_empty] + + def empty_osds(self) -> List["OSD"]: + return [osd for osd in self if osd.is_empty] + + def all_osds(self) -> List["OSD"]: + return [osd for osd in self] + + def not_in_cluster(self) -> List["OSD"]: + return [osd for osd in self if not osd.exists] + + def enqueue(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + self.add(osd) + osd.start() + + def rm(self, osd: "OSD") -> None: + if not osd.exists: + raise NotFoundError() + osd.stop() + try: + logger.debug(f'Removing {osd} from the queue.') + self.remove(osd) + except KeyError: + logger.debug(f"Could not find {osd} in queue.") + raise KeyError diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 667cb7bc6592..ef6325a30e85 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -1017,6 +1017,12 @@ class Orchestrator(object): """ raise NotImplementedError() + def stop_remove_osds(self, osd_ids: List[str]) -> Completion: + """ + TODO + """ + raise NotImplementedError() + def remove_osds_status(self): # type: () -> Completion """ diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index f1000fe18d1a..024a3a8c5336 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -856,35 +856,53 @@ Usage: "name=replace,type=CephBool,req=false " "name=force,type=CephBool,req=false", 'Remove OSD services') - def _osd_rm(self, svc_id: List[str], - replace: bool = False, - force: bool = False) -> HandleCommandResult: - completion = self.remove_osds(svc_id, replace, force) + def _osd_rm_start(self, + svc_id: List[str], + replace: bool = False, + force: bool = False) -> HandleCommandResult: + completion = self.remove_osds(svc_id, replace=replace, force=force) + self._orchestrator_wait([completion]) + raise_if_exception(completion) + return HandleCommandResult(stdout=completion.result_str()) + + @_cli_write_command( + 'orch osd rm stop', + "name=svc_id,type=CephString,n=N", + 'Remove OSD services') + def _osd_rm_stop(self, svc_id: List[str]) -> HandleCommandResult: + completion = self.stop_remove_osds(svc_id) self._orchestrator_wait([completion]) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str()) @_cli_write_command( 'orch osd rm status', + "name=format,type=CephChoices,strings=plain|json|json-pretty|yaml,req=false", desc='status of OSD removal operation') - def _osd_rm_status(self) -> HandleCommandResult: + def _osd_rm_status(self, format='plain') -> HandleCommandResult: completion = self.remove_osds_status() self._orchestrator_wait([completion]) raise_if_exception(completion) report = completion.result + if not report: return HandleCommandResult(stdout="No OSD remove/replace operations reported") - table = PrettyTable( - ['NAME', 'HOST', 'PGS', 'STARTED_AT'], - border=False) - table.align = 'l' - table.left_padding_width = 0 - table.right_padding_width = 1 - # TODO: re-add sorted and sort by pg_count - for osd in report: - table.add_row((osd.fullname, osd.nodename, osd.pg_count_str, osd.started_at)) - - return HandleCommandResult(stdout=table.get_string()) + + if format != 'plain': + out = to_format(report, format, many=True, cls=None) + else: + table = PrettyTable( + ['OSD_ID', 'HOST', 'STATE', 'PG_COUNT', 'REPLACE', 'FORCE', 'DRAIN_STARTED_AT'], + border=False) + table.align = 'l' + table.left_padding_width = 0 + table.right_padding_width = 2 + for osd in sorted(report, key=lambda o: o.osd_id): + table.add_row([osd.osd_id, osd.nodename, osd.drain_status_human(), + osd.get_pg_count(), osd.replace, osd.replace, osd.drain_started_at]) + out = table.get_string() + + return HandleCommandResult(stdout=out) @_cli_write_command( 'orch daemon add', -- 2.47.3