+++ /dev/null
-import json
-try:
- from typing import List
-except ImportError:
- pass
-
-from orchestrator import OrchestratorError
-
-
-class RemoveUtil(object):
- def __init__(self, mgr):
- self.mgr = mgr
-
- def drain_osd(self, osd_id: str) -> bool:
- """
- Uses `osd_support` module to schedule a drain operation of an OSD
- """
- cmd_args = {
- 'prefix': 'osd drain',
- 'osd_ids': [int(osd_id)]
- }
- return self._run_mon_cmd(cmd_args)
-
- def get_pg_count(self, osd_id: str) -> int:
- """ Queries for PG count of an OSD """
- self.mgr.log.debug("Querying for drain status")
- ret, out, err = self.mgr.mon_command({
- 'prefix': 'osd drain status',
- })
- if ret != 0:
- self.mgr.log.error(f"Calling osd drain status failed with {err}")
- raise OrchestratorError("Could not query `osd drain status`")
- out = json.loads(out)
- for o in out:
- if str(o.get('osd_id', '')) == str(osd_id):
- return int(o.get('pgs', -1))
- return -1
-
- def is_empty(self, osd_id: str) -> bool:
- """ Checks if an OSD is empty """
- return self.get_pg_count(osd_id) == 0
-
- def ok_to_destroy(self, osd_ids: List[int]) -> bool:
- """ Queries the safe-to-destroy flag for OSDs """
- cmd_args = {'prefix': 'osd safe-to-destroy',
- 'ids': osd_ids}
- return self._run_mon_cmd(cmd_args)
-
- def destroy_osd(self, osd_id: int) -> bool:
- """ Destroys an OSD (forcefully) """
- cmd_args = {'prefix': 'osd destroy-actual',
- 'id': int(osd_id),
- 'yes_i_really_mean_it': True}
- return self._run_mon_cmd(cmd_args)
-
- def down_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `out` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd down',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
- def purge_osd(self, osd_id: int) -> bool:
- """ Purges an OSD from the cluster (forcefully) """
- cmd_args = {
- 'prefix': 'osd purge-actual',
- 'id': int(osd_id),
- 'yes_i_really_mean_it': True
- }
- return self._run_mon_cmd(cmd_args)
-
- def out_osd(self, osd_ids: List[int]) -> bool:
- """ Sets `down` flag to OSDs """
- cmd_args = {
- 'prefix': 'osd out',
- 'ids': osd_ids,
- }
- return self._run_mon_cmd(cmd_args)
-
- def _run_mon_cmd(self, cmd_args: dict) -> bool:
- """
- Generic command to run mon_command and evaluate/log the results
- """
- ret, out, err = self.mgr.mon_command(cmd_args)
- if ret != 0:
- self.mgr.log.debug(f"ran {cmd_args} with mon_command")
- self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
- return False
- self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
- return True
import string
try:
- from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator
+ from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple, Iterator, Set
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False # just for type checking
CLICommandMeta, ServiceSpec
from . import remotes
-from ._utils import RemoveUtil
+from .osd import RemoveUtil, OSDRemoval
try:
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
-class OSDRemoval(NamedTuple):
- osd_id: int
- replace: bool
- force: bool
- nodename: str
- fullname: str
- started_at: datetime.datetime
-
- # needed due to changing 'started_at' attr
- def __eq__(self, other):
- return self.osd_id == other.osd_id
-
- def __hash__(self):
- return hash(self.osd_id)
# high-level TODO:
# - bring over some of the protections from ceph-deploy that guard against
self.cache = HostCache(self)
self.cache.load()
- self.to_remove_osds: set = set()
- self.osd_removal_report: dict = dict()
self.rm_util = RemoveUtil(self)
self.spec_store = SpecStore(self)
self._check_for_strays()
- self._remove_osds_bg()
-
if self._apply_all_services():
continue # did something, refresh
"""
daemons = self.cache.get_daemons_by_service('osd')
- found = set()
+ found: Set[OSDRemoval] = set()
for daemon in daemons:
if daemon.daemon_id not in osd_ids:
continue
daemon.hostname, daemon.name(),
datetime.datetime.utcnow()))
- not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
+ not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
if not_found:
raise OrchestratorError('Unable to find OSD: %s' % not_found)
for osd in found:
- self.to_remove_osds.add(osd)
+ self.rm_util.to_remove_osds.add(osd)
# trigger the serve loop to initiate the removal
self._kick_serve_loop()
return trivial_result(f"Scheduled OSD(s) for removal")
- def _remove_osds_bg(self) -> None:
- """
- Performs actions in the _serve() loop to remove an OSD
- when criteria is met.
- """
- self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
- self.osd_removal_report = self._generate_osd_removal_status()
- remove_osds: set = self.to_remove_osds.copy()
- for osd in remove_osds:
- if not osd.force:
- self.rm_util.drain_osd(osd.osd_id)
- # skip criteria
- if not self.rm_util.is_empty(osd.osd_id):
- self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
- continue
-
- if not self.rm_util.ok_to_destroy([osd.osd_id]):
- self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
- continue
-
- # abort criteria
- if not self.rm_util.down_osd([osd.osd_id]):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'")
-
- if osd.replace:
- if not self.rm_util.destroy_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>")
- else:
- if not self.rm_util.purge_osd(osd.osd_id):
- # also remove it from the remove_osd list and set a health_check warning?
- raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
-
- completion = self._remove_daemons([(osd.fullname, osd.nodename, True)])
- completion.add_progress('Removing OSDs', self)
- completion.update_progress = True
- if completion:
- while not completion.has_result:
- self.process([completion])
- if completion.needs_result:
- time.sleep(1)
- else:
- break
- if completion.exception is not None:
- self.log.error(str(completion.exception))
- else:
- raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon")
-
- self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
- self.log.debug(f"Removing {osd.osd_id} from the queue.")
- self.to_remove_osds.remove(osd)
-
- def _generate_osd_removal_status(self) -> Dict[Any, object]:
- """
- Generate a OSD report that can be printed to the CLI
- """
- self.log.debug("Assembling report for osd rm status")
- report = {}
- for osd in self.to_remove_osds:
- pg_count = self.rm_util.get_pg_count(osd.osd_id)
- report[osd] = pg_count if pg_count != -1 else 'n/a'
- self.log.debug(f"Reporting: {report}")
- return report
-
def remove_osds_status(self) -> orchestrator.Completion:
"""
The CLI call to retrieve an osd removal report
"""
- return trivial_result(self.osd_removal_report)
+ return trivial_result(self.rm_util.osd_removal_report)
def list_specs(self) -> orchestrator.Completion:
"""
--- /dev/null
+import datetime
+import json
+import logging
+import time
+
+from typing import List, NamedTuple, Dict, Any, Set
+
+import orchestrator
+from orchestrator import OrchestratorError
+
+logger = logging.getLogger(__name__)
+
+
+class OSDRemoval(NamedTuple):
+ osd_id: int
+ replace: bool
+ force: bool
+ nodename: str
+ fullname: str
+ started_at: datetime.datetime
+
+ # needed due to changing 'started_at' attr
+ def __eq__(self, other):
+ return self.osd_id == other.osd_id
+
+ def __hash__(self):
+ return hash(self.osd_id)
+
+
+class RemoveUtil(object):
+ def __init__(self, mgr):
+ self.mgr = mgr
+ self.to_remove_osds: Set[OSDRemoval] = set()
+ self.osd_removal_report: dict = dict()
+ self.log = logger
+ self.rm_util = self
+
+
+ def _remove_osds_bg(self) -> None:
+ """
+ Performs actions in the _serve() loop to remove an OSD
+ when criteria is met.
+ """
+ self.log.debug(
+ f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+ self.osd_removal_report = self._generate_osd_removal_status()
+ remove_osds: set = self.to_remove_osds.copy()
+ for osd in remove_osds:
+ if not osd.force:
+ self.rm_util.drain_osd(osd.osd_id)
+ # skip criteria
+ if not self.rm_util.is_empty(osd.osd_id):
+ self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ continue
+
+ if not self.rm_util.ok_to_destroy([osd.osd_id]):
+ self.log.info(
+ f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ continue
+
+ # abort criteria
+ if not self.rm_util.down_osd([osd.osd_id]):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+ if osd.replace:
+ if not self.rm_util.destroy_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(
+ f"Could not destroy OSD <{osd.osd_id}>")
+ else:
+ if not self.rm_util.purge_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+ completion = self.mgr._remove_daemon([(osd.fullname, osd.nodename, True)])
+ completion.add_progress('Removing OSDs', self)
+ completion.update_progress = True
+ if completion:
+ while not completion.has_result:
+ self.mgr.process([completion])
+ if completion.needs_result:
+ time.sleep(1)
+ else:
+ break
+ if completion.exception is not None:
+ self.log.error(str(completion.exception))
+ else:
+ raise orchestrator.OrchestratorError(
+ "Did not receive a completion from _remove_daemon")
+
+ self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
+ self.log.debug(f"Removing {osd.osd_id} from the queue.")
+ self.to_remove_osds.remove(osd)
+
+ def _generate_osd_removal_status(self) -> Dict[Any, object]:
+ """
+ Generate a OSD report that can be printed to the CLI
+ """
+ self.log.debug("Assembling report for osd rm status")
+ report = {}
+ for osd in self.to_remove_osds:
+ pg_count = self.get_pg_count(str(osd.osd_id))
+ report[osd] = pg_count if pg_count != -1 else 'n/a'
+ self.log.debug(f"Reporting: {report}")
+ return report
+
+ def drain_osd(self, osd_id: str) -> bool:
+ """
+ Uses `osd_support` module to schedule a drain operation of an OSD
+ """
+ cmd_args = {
+ 'prefix': 'osd drain',
+ 'osd_ids': [int(osd_id)]
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def get_pg_count(self, osd_id: str) -> int:
+ """ Queries for PG count of an OSD """
+ self.mgr.log.debug("Querying for drain status")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'osd drain status',
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Calling osd drain status failed with {err}")
+ raise OrchestratorError("Could not query `osd drain status`")
+ out = json.loads(out)
+ for o in out:
+ if str(o.get('osd_id', '')) == str(osd_id):
+ return int(o.get('pgs', -1))
+ return -1
+
+ def is_empty(self, osd_id: str) -> bool:
+ """ Checks if an OSD is empty """
+ return self.get_pg_count(osd_id) == 0
+
+ def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+ """ Queries the safe-to-destroy flag for OSDs """
+ cmd_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': osd_ids}
+ return self._run_mon_cmd(cmd_args)
+
+ def destroy_osd(self, osd_id: int) -> bool:
+ """ Destroys an OSD (forcefully) """
+ cmd_args = {'prefix': 'osd destroy-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True}
+ return self._run_mon_cmd(cmd_args)
+
+ def down_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `out` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd down',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def purge_osd(self, osd_id: int) -> bool:
+ """ Purges an OSD from the cluster (forcefully) """
+ cmd_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def out_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `down` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd out',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def _run_mon_cmd(self, cmd_args: dict) -> bool:
+ """
+ Generic command to run mon_command and evaluate/log the results
+ """
+ ret, out, err = self.mgr.mon_command(cmd_args)
+ if ret != 0:
+ self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+ self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ return False
+ self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+ return True
mypy
commands = mypy --config-file=../../mypy.ini \
cephadm/module.py \
- cephadm/_utils.py \
mgr_module.py \
dashboard/module.py \
mgr_util.py \