import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
import orchestrator
from cephadm.registry import Registry
# from ceph_fs.h
CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
+CEPH_MDSMAP_NOT_JOINABLE = (1 << 0)
def normalize_image_digest(digest: str, default_registry: str) -> str:
target_version: Optional[str] = None,
error: Optional[str] = None,
paused: Optional[bool] = None,
+ fail_fs: bool = False,
fs_original_max_mds: Optional[Dict[str, int]] = None,
fs_original_allow_standby_replay: Optional[Dict[str, bool]] = None,
daemon_types: Optional[List[str]] = None,
self.fs_original_max_mds: Optional[Dict[str, int]] = fs_original_max_mds
self.fs_original_allow_standby_replay: Optional[Dict[str,
bool]] = fs_original_allow_standby_replay
+ self.fail_fs = fail_fs
self.daemon_types = daemon_types
self.hosts = hosts
self.services = services
'target_id': self.target_id,
'target_digests': self.target_digests,
'target_version': self.target_version,
+ 'fail_fs': self.fail_fs,
'fs_original_max_mds': self.fs_original_max_mds,
'fs_original_allow_standby_replay': self.fs_original_allow_standby_replay,
'error': self.error,
def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
+ fail_fs_value = cast(bool, self.mgr.get_module_option_ex(
+ 'orchestrator', 'fail_fs', False))
if self.mgr.mode != 'root':
raise OrchestratorError('upgrade is not supported in %s mode' % (
self.mgr.mode))
self.upgrade_state = UpgradeState(
target_name=target_name,
progress_id=str(uuid.uuid4()),
+ fail_fs=fail_fs_value,
daemon_types=daemon_types,
hosts=hosts,
services=services,
# scale down this filesystem?
if mdsmap["max_mds"] > 1:
- self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
- fs_name
- ))
- if fscid not in self.upgrade_state.fs_original_max_mds:
- self.upgrade_state.fs_original_max_mds[fscid] = mdsmap['max_mds']
- self._save_upgrade_state()
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'fs set',
- 'fs_name': fs_name,
- 'var': 'max_mds',
- 'val': '1',
- })
- continue_upgrade = False
- continue
+ if self.upgrade_state.fail_fs:
+ if not (mdsmap['flags'] & CEPH_MDSMAP_NOT_JOINABLE) and \
+ len(mdsmap['up']) > 0:
+ self.mgr.log.info(f'Upgrade: failing fs {fs_name} for '
+ f'rapid multi-rank mds upgrade')
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'fs fail',
+ 'fs_name': fs_name
+ })
+ if ret != 0:
+ continue_upgrade = False
+ continue
+ else:
+ self.mgr.log.info('Upgrade: Scaling down filesystem %s' % (
+ fs_name
+ ))
+ if fscid not in self.upgrade_state.fs_original_max_mds:
+ self.upgrade_state.fs_original_max_mds[fscid] = \
+ mdsmap['max_mds']
+ self._save_upgrade_state()
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'max_mds',
+ 'val': '1',
+ })
+ continue_upgrade = False
+ continue
- if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
- self.mgr.log.info(
- 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (fs_name))
- time.sleep(10)
- continue_upgrade = False
- continue
+ if not self.upgrade_state.fail_fs:
+ if not (mdsmap['in'] == [0] and len(mdsmap['up']) <= 1):
+ self.mgr.log.info(
+ 'Upgrade: Waiting for fs %s to scale down to reach 1 MDS' % (
+ fs_name))
+ time.sleep(10)
+ continue_upgrade = False
+ continue
if len(mdsmap['up']) == 0:
self.mgr.log.warning(
return False, to_upgrade
if d.daemon_type == 'mds' and self._enough_mds_for_ok_to_stop(d):
- if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ # when fail_fs is set to true, all MDS daemons will be moved to
+ # up:standby state, so Cephadm won't be able to upgrade due to
+ # this check and and will warn with "It is NOT safe to stop
+ # mds.<daemon_name> at this time: one or more filesystems is
+ # currently degraded", therefore we bypass this check for that
+ # case.
+ assert self.upgrade_state is not None
+ if not self.upgrade_state.fail_fs \
+ and not self._wait_for_ok_to_stop(d, known_ok_to_stop):
return False, to_upgrade
to_upgrade.append(d_entry)
def _complete_mds_upgrade(self) -> None:
assert self.upgrade_state is not None
- if self.upgrade_state.fs_original_max_mds:
+ if self.upgrade_state.fail_fs:
+ for fs in self.mgr.get("fs_map")['filesystems']:
+ fs_name = fs['mdsmap']['fs_name']
+ self.mgr.log.info('Upgrade: Setting filesystem '
+ f'{fs_name} Joinable')
+ try:
+ ret, _, err = self.mgr.check_mon_command({
+ 'prefix': 'fs set',
+ 'fs_name': fs_name,
+ 'var': 'joinable',
+ 'val': 'true',
+ })
+ except Exception as e:
+ logger.error("Failed to set fs joinable "
+ f"true due to {e}")
+ raise OrchestratorError("Failed to set"
+ "fs joinable true"
+ f"due to {e}")
+ elif self.upgrade_state.fs_original_max_mds:
for fs in self.mgr.get("fs_map")['filesystems']:
fscid = fs["id"]
fs_name = fs['mdsmap']['fs_name']
desc='Orchestrator backend',
enum_allowed=['cephadm', 'rook', 'test_orchestrator'],
runtime=True,
- )
+ ),
+ Option(
+ 'fail_fs',
+ type='bool',
+ default=False,
+ desc='Fail filesystem for rapid multi-rank mds upgrade'
+ ),
]
NATIVE_OPTIONS = [] # type: List[dict]
def _select_orchestrator(self) -> str:
return cast(str, self.get_module_option("orchestrator"))
+ def _get_fail_fs_value(self) -> bool:
+ return bool(self.get_module_option("fail_fs"))
+
@_cli_write_command('orch host add')
def _add_host(self,
hostname: str,
self._set_backend('')
assert self._select_orchestrator() is None
self._set_backend(old_orch)
+ old_fs_fail_value = self._get_fail_fs_value()
+ self.set_module_option("fail_fs", True)
+ assert self._get_fail_fs_value() is True
+ self.set_module_option("fail_fs", False)
+ assert self._get_fail_fs_value() is False
+ self.set_module_option("fail_fs", old_fs_fail_value)
e1 = self.remote('selftest', 'remote_from_orchestrator_cli_self_test', "ZeroDivisionError")
try: