From d3122fbf48a63cef2f92d825ac7834b2488d7b03 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 28 Feb 2020 10:11:04 +0100 Subject: [PATCH] mgr/cephadm: Move remove osd code to osd.py Signed-off-by: Sebastian Wagner --- src/pybind/mgr/cephadm/_utils.py | 91 --------------- src/pybind/mgr/cephadm/module.py | 95 +--------------- src/pybind/mgr/cephadm/osd.py | 186 +++++++++++++++++++++++++++++++ src/pybind/mgr/tox.ini | 1 - 4 files changed, 192 insertions(+), 181 deletions(-) delete mode 100644 src/pybind/mgr/cephadm/_utils.py create mode 100644 src/pybind/mgr/cephadm/osd.py diff --git a/src/pybind/mgr/cephadm/_utils.py b/src/pybind/mgr/cephadm/_utils.py deleted file mode 100644 index 22c8ed4e93c..00000000000 --- a/src/pybind/mgr/cephadm/_utils.py +++ /dev/null @@ -1,91 +0,0 @@ -import json -try: - from typing import List -except ImportError: - pass - -from orchestrator import OrchestratorError - - -class RemoveUtil(object): - def __init__(self, mgr): - self.mgr = mgr - - def drain_osd(self, osd_id: str) -> bool: - """ - Uses `osd_support` module to schedule a drain operation of an OSD - """ - cmd_args = { - 'prefix': 'osd drain', - 'osd_ids': [int(osd_id)] - } - return self._run_mon_cmd(cmd_args) - - def get_pg_count(self, osd_id: str) -> int: - """ Queries for PG count of an OSD """ - self.mgr.log.debug("Querying for drain status") - ret, out, err = self.mgr.mon_command({ - 'prefix': 'osd drain status', - }) - if ret != 0: - self.mgr.log.error(f"Calling osd drain status failed with {err}") - raise OrchestratorError("Could not query `osd drain status`") - out = json.loads(out) - for o in out: - if str(o.get('osd_id', '')) == str(osd_id): - return int(o.get('pgs', -1)) - return -1 - - def is_empty(self, osd_id: str) -> bool: - """ Checks if an OSD is empty """ - return self.get_pg_count(osd_id) == 0 - - def ok_to_destroy(self, osd_ids: List[int]) -> bool: - """ Queries the safe-to-destroy flag for OSDs """ - cmd_args = {'prefix': 'osd safe-to-destroy', - 'ids': osd_ids} - return self._run_mon_cmd(cmd_args) - - def destroy_osd(self, osd_id: int) -> bool: - """ Destroys an OSD (forcefully) """ - cmd_args = {'prefix': 'osd destroy-actual', - 'id': int(osd_id), - 'yes_i_really_mean_it': True} - return self._run_mon_cmd(cmd_args) - - def down_osd(self, osd_ids: List[int]) -> bool: - """ Sets `out` flag to OSDs """ - cmd_args = { - 'prefix': 'osd down', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - - def purge_osd(self, osd_id: int) -> bool: - """ Purges an OSD from the cluster (forcefully) """ - cmd_args = { - 'prefix': 'osd purge-actual', - 'id': int(osd_id), - 'yes_i_really_mean_it': True - } - return self._run_mon_cmd(cmd_args) - - def out_osd(self, osd_ids: List[int]) -> bool: - """ Sets `down` flag to OSDs """ - cmd_args = { - 'prefix': 'osd out', - 'ids': osd_ids, - } - return self._run_mon_cmd(cmd_args) - - def _run_mon_cmd(self, cmd_args: dict) -> bool: - """ - Generic command to run mon_command and evaluate/log the results - """ - ret, out, err = self.mgr.mon_command(cmd_args) - if ret != 0: - self.mgr.log.debug(f"ran {cmd_args} with mon_command") - self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") - return False - self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") - return True diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 8c43daff2af..5db8f2d5442 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -10,7 +10,7 @@ from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException import string try: - from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator + from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator, Set from typing import TYPE_CHECKING except ImportError: TYPE_CHECKING = False # just for type checking @@ -37,7 +37,7 @@ from orchestrator import OrchestratorError, HostPlacementSpec, OrchestratorValid CLICommandMeta, ServiceSpec from . import remotes -from ._utils import RemoveUtil +from .osd import RemoveUtil, OSDRemoval try: @@ -85,20 +85,6 @@ except ImportError: def __exit__(self, exc_type, exc_value, traceback): self.cleanup() -class OSDRemoval(NamedTuple): - osd_id: int - replace: bool - force: bool - nodename: str - fullname: str - started_at: datetime.datetime - - # needed due to changing 'started_at' attr - def __eq__(self, other): - return self.osd_id == other.osd_id - - def __hash__(self): - return hash(self.osd_id) # high-level TODO: # - bring over some of the protections from ceph-deploy that guard against @@ -668,8 +654,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.cache = HostCache(self) self.cache.load() - self.to_remove_osds: set = set() - self.osd_removal_report: dict = dict() self.rm_util = RemoveUtil(self) self.spec_store = SpecStore(self) @@ -1068,8 +1052,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self._check_for_strays() - self._remove_osds_bg() - if self._apply_all_services(): continue # did something, refresh @@ -2856,7 +2838,7 @@ receivers: """ daemons = self.cache.get_daemons_by_service('osd') - found = set() + found: Set[OSDRemoval] = set() for daemon in daemons: if daemon.daemon_id not in osd_ids: continue @@ -2864,86 +2846,21 @@ receivers: daemon.hostname, daemon.name(), datetime.datetime.utcnow())) - not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} + not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]} if not_found: raise OrchestratorError('Unable to find OSD: %s' % not_found) for osd in found: - self.to_remove_osds.add(osd) + self.rm_util.to_remove_osds.add(osd) # trigger the serve loop to initiate the removal self._kick_serve_loop() return trivial_result(f"Scheduled OSD(s) for removal") - def _remove_osds_bg(self) -> None: - """ - Performs actions in the _serve() loop to remove an OSD - when criteria is met. - """ - self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") - self.osd_removal_report = self._generate_osd_removal_status() - remove_osds: set = self.to_remove_osds.copy() - for osd in remove_osds: - if not osd.force: - self.rm_util.drain_osd(osd.osd_id) - # skip criteria - if not self.rm_util.is_empty(osd.osd_id): - self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") - continue - - if not self.rm_util.ok_to_destroy([osd.osd_id]): - self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") - continue - - # abort criteria - if not self.rm_util.down_osd([osd.osd_id]): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'") - - if osd.replace: - if not self.rm_util.destroy_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>") - else: - if not self.rm_util.purge_osd(osd.osd_id): - # also remove it from the remove_osd list and set a health_check warning? - raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") - - completion = self._remove_daemons([(osd.fullname, osd.nodename, True)]) - completion.add_progress('Removing OSDs', self) - completion.update_progress = True - if completion: - while not completion.has_result: - self.process([completion]) - if completion.needs_result: - time.sleep(1) - else: - break - if completion.exception is not None: - self.log.error(str(completion.exception)) - else: - raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon") - - self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}") - self.log.debug(f"Removing {osd.osd_id} from the queue.") - self.to_remove_osds.remove(osd) - - def _generate_osd_removal_status(self) -> Dict[Any, object]: - """ - Generate a OSD report that can be printed to the CLI - """ - self.log.debug("Assembling report for osd rm status") - report = {} - for osd in self.to_remove_osds: - pg_count = self.rm_util.get_pg_count(osd.osd_id) - report[osd] = pg_count if pg_count != -1 else 'n/a' - self.log.debug(f"Reporting: {report}") - return report - def remove_osds_status(self) -> orchestrator.Completion: """ The CLI call to retrieve an osd removal report """ - return trivial_result(self.osd_removal_report) + return trivial_result(self.rm_util.osd_removal_report) def list_specs(self) -> orchestrator.Completion: """ diff --git a/src/pybind/mgr/cephadm/osd.py b/src/pybind/mgr/cephadm/osd.py new file mode 100644 index 00000000000..0a8d9c40879 --- /dev/null +++ b/src/pybind/mgr/cephadm/osd.py @@ -0,0 +1,186 @@ +import datetime +import json +import logging +import time + +from typing import List, NamedTuple, Dict, Any, Set + +import orchestrator +from orchestrator import OrchestratorError + +logger = logging.getLogger(__name__) + + +class OSDRemoval(NamedTuple): + osd_id: int + replace: bool + force: bool + nodename: str + fullname: str + started_at: datetime.datetime + + # needed due to changing 'started_at' attr + def __eq__(self, other): + return self.osd_id == other.osd_id + + def __hash__(self): + return hash(self.osd_id) + + +class RemoveUtil(object): + def __init__(self, mgr): + self.mgr = mgr + self.to_remove_osds: Set[OSDRemoval] = set() + self.osd_removal_report: dict = dict() + self.log = logger + self.rm_util = self + + + def _remove_osds_bg(self) -> None: + """ + Performs actions in the _serve() loop to remove an OSD + when criteria is met. + """ + self.log.debug( + f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}") + self.osd_removal_report = self._generate_osd_removal_status() + remove_osds: set = self.to_remove_osds.copy() + for osd in remove_osds: + if not osd.force: + self.rm_util.drain_osd(osd.osd_id) + # skip criteria + if not self.rm_util.is_empty(osd.osd_id): + self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more") + continue + + if not self.rm_util.ok_to_destroy([osd.osd_id]): + self.log.info( + f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more") + continue + + # abort criteria + if not self.rm_util.down_osd([osd.osd_id]): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not set OSD <{osd.osd_id}> to 'down'") + + if osd.replace: + if not self.rm_util.destroy_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError( + f"Could not destroy OSD <{osd.osd_id}>") + else: + if not self.rm_util.purge_osd(osd.osd_id): + # also remove it from the remove_osd list and set a health_check warning? + raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>") + + completion = self.mgr._remove_daemon([(osd.fullname, osd.nodename, True)]) + completion.add_progress('Removing OSDs', self) + completion.update_progress = True + if completion: + while not completion.has_result: + self.mgr.process([completion]) + if completion.needs_result: + time.sleep(1) + else: + break + if completion.exception is not None: + self.log.error(str(completion.exception)) + else: + raise orchestrator.OrchestratorError( + "Did not receive a completion from _remove_daemon") + + self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}") + self.log.debug(f"Removing {osd.osd_id} from the queue.") + self.to_remove_osds.remove(osd) + + def _generate_osd_removal_status(self) -> Dict[Any, object]: + """ + Generate a OSD report that can be printed to the CLI + """ + self.log.debug("Assembling report for osd rm status") + report = {} + for osd in self.to_remove_osds: + pg_count = self.get_pg_count(str(osd.osd_id)) + report[osd] = pg_count if pg_count != -1 else 'n/a' + self.log.debug(f"Reporting: {report}") + return report + + def drain_osd(self, osd_id: str) -> bool: + """ + Uses `osd_support` module to schedule a drain operation of an OSD + """ + cmd_args = { + 'prefix': 'osd drain', + 'osd_ids': [int(osd_id)] + } + return self._run_mon_cmd(cmd_args) + + def get_pg_count(self, osd_id: str) -> int: + """ Queries for PG count of an OSD """ + self.mgr.log.debug("Querying for drain status") + ret, out, err = self.mgr.mon_command({ + 'prefix': 'osd drain status', + }) + if ret != 0: + self.mgr.log.error(f"Calling osd drain status failed with {err}") + raise OrchestratorError("Could not query `osd drain status`") + out = json.loads(out) + for o in out: + if str(o.get('osd_id', '')) == str(osd_id): + return int(o.get('pgs', -1)) + return -1 + + def is_empty(self, osd_id: str) -> bool: + """ Checks if an OSD is empty """ + return self.get_pg_count(osd_id) == 0 + + def ok_to_destroy(self, osd_ids: List[int]) -> bool: + """ Queries the safe-to-destroy flag for OSDs """ + cmd_args = {'prefix': 'osd safe-to-destroy', + 'ids': osd_ids} + return self._run_mon_cmd(cmd_args) + + def destroy_osd(self, osd_id: int) -> bool: + """ Destroys an OSD (forcefully) """ + cmd_args = {'prefix': 'osd destroy-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True} + return self._run_mon_cmd(cmd_args) + + def down_osd(self, osd_ids: List[int]) -> bool: + """ Sets `out` flag to OSDs """ + cmd_args = { + 'prefix': 'osd down', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def purge_osd(self, osd_id: int) -> bool: + """ Purges an OSD from the cluster (forcefully) """ + cmd_args = { + 'prefix': 'osd purge-actual', + 'id': int(osd_id), + 'yes_i_really_mean_it': True + } + return self._run_mon_cmd(cmd_args) + + def out_osd(self, osd_ids: List[int]) -> bool: + """ Sets `down` flag to OSDs """ + cmd_args = { + 'prefix': 'osd out', + 'ids': osd_ids, + } + return self._run_mon_cmd(cmd_args) + + def _run_mon_cmd(self, cmd_args: dict) -> bool: + """ + Generic command to run mon_command and evaluate/log the results + """ + ret, out, err = self.mgr.mon_command(cmd_args) + if ret != 0: + self.mgr.log.debug(f"ran {cmd_args} with mon_command") + self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") + return False + self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") + return True diff --git a/src/pybind/mgr/tox.ini b/src/pybind/mgr/tox.ini index 22803ab3eba..ee20bbf51ce 100644 --- a/src/pybind/mgr/tox.ini +++ b/src/pybind/mgr/tox.ini @@ -14,7 +14,6 @@ deps = mypy commands = mypy --config-file=../../mypy.ini \ cephadm/module.py \ - cephadm/_utils.py \ mgr_module.py \ dashboard/module.py \ mgr_util.py \ -- 2.39.5