verbose_on_failure=False)
call(['systemctl', 'disable', unit_name],
verbose_on_failure=False)
+ if daemon_type == 'osd':
+ CephContainer(
+ image=args.image,
+ entrypoint='/usr/sbin/ceph-volume',
+ args=[
+ 'lvm', 'zap', '--osd-id',
+ str(daemon_id)
+ # not sure if --destroy is useful here
+ ],
+ container_args=['--privileged'],
+ volume_mounts=get_container_mounts(args.fsid, daemon_type, daemon_id)
+ ).run()
data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
call_throws(['rm', '-rf', data_dir])
"devicehealth",
"orchestrator",
"rbd_support",
+ "osd_support",
"volumes",
"pg_autoscaler",
"telemetry",
--- /dev/null
+import json
+try:
+ from typing import List
+except ImportError:
+ pass
+
+from orchestrator import OrchestratorError
+
+
+class RemoveUtil(object):
+ def __init__(self, mgr):
+ self.mgr = mgr
+
+ def drain_osd(self, osd_id: str) -> bool:
+ """
+ Uses `osd_support` module to schedule a drain operation of an OSD
+ """
+ cmd_args = {
+ 'prefix': 'osd drain',
+ 'osd_ids': [int(osd_id)]
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def get_pg_count(self, osd_id: str) -> int:
+ """ Queries for PG count of an OSD """
+ self.mgr.log.debug("Querying for drain status")
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'osd drain status',
+ })
+ if ret != 0:
+ self.mgr.log.error(f"Calling osd drain status failed with {err}")
+ raise OrchestratorError("Could not query `osd drain status`")
+ out = json.loads(out)
+ for o in out:
+ if str(o.get('osd_id', '')) == str(osd_id):
+ return int(o.get('pgs', -1))
+ return -1
+
+ def is_empty(self, osd_id: str) -> bool:
+ """ Checks if an OSD is empty """
+ return self.get_pg_count(osd_id) == 0
+
+ def ok_to_destroy(self, osd_ids: List[int]) -> bool:
+ """ Queries the safe-to-destroy flag for OSDs """
+ cmd_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': osd_ids}
+ return self._run_mon_cmd(cmd_args)
+
+ def destroy_osd(self, osd_id: int) -> bool:
+ """ Destroys an OSD (forcefully) """
+ cmd_args = {'prefix': 'osd destroy-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True}
+ return self._run_mon_cmd(cmd_args)
+
+ def down_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `out` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd down',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def purge_osd(self, osd_id: int) -> bool:
+ """ Purges an OSD from the cluster (forcefully) """
+ cmd_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(osd_id),
+ 'yes_i_really_mean_it': True
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def out_osd(self, osd_ids: List[int]) -> bool:
+ """ Sets `down` flag to OSDs """
+ cmd_args = {
+ 'prefix': 'osd out',
+ 'ids': osd_ids,
+ }
+ return self._run_mon_cmd(cmd_args)
+
+ def _run_mon_cmd(self, cmd_args: dict) -> bool:
+ """
+ Generic command to run mon_command and evaluate/log the results
+ """
+ ret, out, err = self.mgr.mon_command(cmd_args)
+ if ret != 0:
+ self.mgr.log.debug(f"ran {cmd_args} with mon_command")
+ self.mgr.log.error(f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
+ return False
+ self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
+ return True
import string
try:
- from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any
+ from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, Any, NamedTuple
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False # just for type checking
CLICommandMeta
from . import remotes
+from ._utils import RemoveUtil
+
try:
import remoto
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
+class OSDRemoval(NamedTuple):
+ osd_id: int
+ replace: bool
+ force: bool
+ nodename: str
+ fullname: str
+ started_at: datetime.datetime
+
+ # needed due to changing 'started_at' attr
+ def __eq__(self, other):
+ return self.osd_id == other.osd_id
+
+ def __hash__(self):
+ return hash(self.osd_id)
# high-level TODO:
# - bring over some of the protections from ceph-deploy that guard against
else:
return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
-
return wrapper
return decorator
self.cache = HostCache(self)
self.cache.load()
+ self.to_remove_osds: set = set()
+ self.osd_removal_report: dict = dict()
+ self.rm_util = RemoveUtil(self)
# ensure the host lists are in sync
for h in self.inventory.keys():
self.log.info("serve starting")
while self.run:
self._check_hosts()
+ self._remove_osds_bg()
# refresh daemons
self.log.debug('refreshing hosts')
raise OrchestratorError('Unable to find daemon(s) %s' % (names))
return self._remove_daemon(args)
- def remove_service(self, service_name):
+ def remove_service(self, service_name, force=False):
args = []
for host, dm in self.cache.daemons.items():
for name, d in dm.items():
if d.matches_service(service_name):
args.append(
- ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname)
+ ('%s.%s' % (d.daemon_type, d.daemon_id), d.hostname, force)
)
if not args:
raise OrchestratorError('Unable to find daemons in %s service' % (
self.event.set()
return trivial_result('Stopped upgrade to %s' % target_name)
+ def remove_osds(self, osd_ids: List[str],
+ replace: bool = False,
+ force: bool = False) -> orchestrator.Completion:
+ """
+ Takes a list of OSDs and schedules them for removal.
+ The function that takes care of the actual removal is
+ _remove_osds_bg().
+ """
+
+ daemons = self.cache.get_daemons_by_service('osd')
+ found = set()
+ for daemon in daemons:
+ if daemon.daemon_id not in osd_ids:
+ continue
+ found.add(OSDRemoval(daemon.daemon_id, replace, force,
+ daemon.hostname, daemon.name(),
+ datetime.datetime.utcnow()))
+
+ not_found: set = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
+ if not_found:
+ raise OrchestratorError('Unable to find OSD: %s' % not_found)
+
+ for osd in found:
+ self.to_remove_osds.add(osd)
+ # trigger the serve loop to initiate the removal
+ self._kick_serve_loop()
+ return trivial_result(f"Scheduled OSD(s) for removal")
+
+ def _remove_osds_bg(self) -> None:
+ """
+ Performs actions in the _serve() loop to remove an OSD
+ when criteria is met.
+ """
+ self.log.debug(f"{len(self.to_remove_osds)} OSDs are scheduled for removal: {list(self.to_remove_osds)}")
+ self.osd_removal_report = self._generate_osd_removal_status()
+ remove_osds: set = self.to_remove_osds.copy()
+ for osd in remove_osds:
+ if not osd.force:
+ self.rm_util.drain_osd(osd.osd_id)
+ # skip criteria
+ if not self.rm_util.is_empty(osd.osd_id):
+ self.log.info(f"OSD <{osd.osd_id}> is not empty yet. Waiting a bit more")
+ continue
+
+ if not self.rm_util.ok_to_destroy([osd.osd_id]):
+ self.log.info(f"OSD <{osd.osd_id}> is not safe-to-destroy yet. Waiting a bit more")
+ continue
+
+ # abort criteria
+ if not self.rm_util.down_osd([osd.osd_id]):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(f"Could not set OSD <{osd.osd_id}> to 'down'")
+
+ if osd.replace:
+ if not self.rm_util.destroy_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(f"Could not destroy OSD <{osd.osd_id}>")
+ else:
+ if not self.rm_util.purge_osd(osd.osd_id):
+ # also remove it from the remove_osd list and set a health_check warning?
+ raise orchestrator.OrchestratorError(f"Could not purge OSD <{osd.osd_id}>")
+
+ completion = self._remove_daemon([(osd.fullname, osd.nodename, True)])
+ completion.add_progress('Removing OSDs', self)
+ completion.update_progress = True
+ if completion:
+ while not completion.has_result:
+ self.process([completion])
+ if completion.needs_result:
+ time.sleep(1)
+ else:
+ break
+ if completion.exception is not None:
+ self.log.error(str(completion.exception))
+ else:
+ raise orchestrator.OrchestratorError("Did not receive a completion from _remove_daemon")
+
+ self.log.info(f"Successfully removed removed OSD <{osd.osd_id}> on {osd.nodename}")
+ self.log.debug(f"Removing {osd.osd_id} from the queue.")
+ self.to_remove_osds.remove(osd)
+
+ def _generate_osd_removal_status(self) -> Dict[Any, object]:
+ """
+ Generate a OSD report that can be printed to the CLI
+ """
+ self.log.debug("Assembling report for osd rm status")
+ report = {}
+ for osd in self.to_remove_osds:
+ pg_count = self.rm_util.get_pg_count(osd.osd_id)
+ report[osd] = pg_count if pg_count != -1 else 'n/a'
+ self.log.debug(f"Reporting: {report}")
+ return report
+
+ def remove_osds_status(self) -> orchestrator.Completion:
+ """
+ The CLI call to retrieve an osd removal report
+ """
+ return trivial_result(self.osd_removal_report)
+
class BaseScheduler(object):
"""
except ImportError:
# just for type checking
pass
-import datetime
import logging
import errno
import json
"""
raise NotImplementedError()
+ def remove_osds(self, osd_ids: List[str],
+ replace: bool = False,
+ force: bool = False) -> Completion:
+ """
+ :param osd_ids: list of OSD IDs
+ :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
+ :param force: Forces the OSD removal process without waiting for the data to be drained first.
+ Note that this can only remove OSDs that were successfully
+ created (i.e. got an OSD ID).
+ """
+ raise NotImplementedError()
+
+ def remove_osds_status(self):
+ # type: () -> Completion
+ """
+ Returns a status of the ongoing OSD removal operations.
+ """
+ raise NotImplementedError()
+
def blink_device_light(self, ident_fault, on, locations):
# type: (str, bool, List[DeviceLightLoc]) -> Completion
"""
self._orchestrator_wait([completion])
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command(
+ 'orch osd rm',
+ "name=svc_id,type=CephString,n=N "
+ "name=replace,type=CephBool,req=false "
+ "name=force,type=CephBool,req=false",
+ 'Remove OSD services')
+ def _osd_rm(self, svc_id: List[str],
+ replace: bool = False,
+ force: bool = False) -> HandleCommandResult:
+ completion = self.remove_osds(svc_id, replace, force)
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command(
+ 'orch osd rm status',
+ desc='status of OSD removal operation')
+ def _osd_rm_status(self) -> HandleCommandResult:
+ completion = self.remove_osds_status()
+ self._orchestrator_wait([completion])
+ raise_if_exception(completion)
+ report = completion.result
+ if len(report) == 0:
+ return HandleCommandResult(stdout="No OSD remove/replace operations reported")
+ table = PrettyTable(
+ ['NAME', 'HOST', 'PGS', 'STARTED_AT'],
+ border=False)
+ table.align = 'l'
+ table.left_padding_width = 0
+ table.right_padding_width = 1
+ # TODO: re-add sorted and sort by pg_count
+ for osd, status in report.items():
+ table.add_row((osd.fullname, osd.nodename, status, osd.started_at))
+
+ return HandleCommandResult(stdout=table.get_string())
@_cli_write_command(
'orch daemon add mon',
osd_ids: Set[int] = set()
emptying_osds: Set[int] = set()
check_osds: Set[int] = set()
+ empty: Set[int] = set()
def __init__(self, *args, **kwargs):
super(OSDSupport, self).__init__(*args, **kwargs)
if osd_id not in self.emptying_osds:
self.osd_ids.add(osd_id)
self.log.info(f'Found OSD(s) <{self.osd_ids}> in the queue.')
- out = 'Started draining OSDs. Query progress with <ceph drain status>'
+ out = 'Started draining OSDs. Query progress with <ceph osd drain status>'
elif cmd_prefix == 'osd drain status':
# re-initialize it with an empty set on invocation (long running processes)
# assemble a set of emptying osds and to_be_emptied osds
self.check_osds.update(self.emptying_osds)
self.check_osds.update(self.osd_ids)
+ self.check_osds.update(self.empty)
report = list()
for osd_id in self.check_osds:
pgs = self.get_pg_count(osd_id)
report.append(dict(osd_id=osd_id, pgs=pgs))
- out = f"{report}"
+ out = f"{json.dumps(report)}"
elif cmd_prefix == 'osd drain stop':
if not osd_ids:
"""
self.log.info("Starting mgr/osd_support")
while self.run:
- # Do some useful background work here.
self.log.debug(f"Scheduled for draining: <{self.osd_ids}>")
self.log.debug(f"Currently being drained: <{self.emptying_osds}>")
# remove osds that are marked as empty
self.emptying_osds = self.emptying_osds.difference(empty_osds)
+ # move empty osds in the done queue until they disappear from ceph's view
+ # other modules need to know when OSDs are empty
+ for osd in empty_osds:
+ self.log.debug(f"Adding {osd} to list of empty OSDs")
+ self.empty.add(osd)
+
+ # remove from queue if no longer part of ceph cluster
+ self.cleanup()
+
# fixed sleep interval of 10 seconds
sleep_interval = 10
self.log.debug('Sleeping for %d seconds', sleep_interval)
self.event.wait(sleep_interval)
self.event.clear()
+ def cleanup(self):
+ """
+ Remove OSDs that are no longer in the ceph cluster from the
+ 'done' list.
+ :return:
+ """
+ for osd in self.osds_not_in_cluster(list(self.empty)):
+ self.log.info(f"OSD: {osd} is not found in the cluster anymore. Removing")
+ self.empty.remove(osd)
+
def shutdown(self):
"""
This method is called by the mgr when the module needs to shut
osd_nodes = osd_df.get('nodes', [])
for osd_node in osd_nodes:
if osd_node.get('id', None) == int(osd_id):
- return osd_node.get('pgs')
- errmsg = f"Could not find <pgs> field for osd_id: {osd_id} in osd_df data"
- self.log.error(errmsg)
- raise RuntimeError(errmsg)
+ return osd_node.get('pgs', -1)
+ return -1
def get_osd_weight(self, osd_id: int) -> float:
osd_df = self.osd_df()
mypy
commands = mypy --config-file=../../mypy.ini \
cephadm/module.py \
+ cephadm/_utils.py \
mgr_module.py \
dashboard/module.py \
mgr_util.py \