from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from cephadm import CephadmOrchestrator
-from cephadm.upgrade import CephadmUpgrade, UpgradeState
+from cephadm.upgrade import (
+ CephadmUpgrade,
+ UpgradeState,
+ parse_ok_to_upgrade_mon_json,
+ request_osd_ok_to_upgrade_report,
+)
from cephadm.ssh import HostConnectionError
from cephadm.utils import ContainerInspectInfo
from orchestrator import OrchestratorError, DaemonDescription
) == 'Stopped upgrade to image_id'
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_start_hosts_mutually_exclusive_with_bucket(cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ with with_host(cephadm_module, 'test2'):
+ with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), status_running=True):
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade_start(
+ 'image_id', None,
+ daemon_types=['osd'],
+ host_placement='test',
+ bucket_type='rack',
+ bucket_name='rack-a',
+ )
+ assert str(err.value) == '--hosts cannot be combined with --crush_bucket_type or --crush_bucket_name'
+
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
def test_upgrade_start_offline_hosts(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
assert CephadmUpgrade(cephadm_module).upgrade_state is None
+def test_upgrade_state_crush_roundtrip():
+ u = UpgradeState(
+ 'target', 'pid', crush_bucket_type='rack', crush_bucket_name='rack1')
+ restored = UpgradeState.from_json(u.to_json())
+ assert restored
+ assert restored.crush_bucket_type == 'rack'
+ assert restored.crush_bucket_name == 'rack1'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_osd_only(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'target', 'pid',
+ target_digests=['digest1'],
+ daemon_types=['osd'],
+ crush_bucket_type='rack',
+ crush_bucket_name='rack1',
+ )
+ with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+ status = wait(cephadm_module, cephadm_module.upgrade_status())
+ assert status.which == 'Upgrading daemons of type(s) osd (OSDs in bucket scope)'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_osd_only_uppercase(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'target', 'pid',
+ target_digests=['digest1'],
+ daemon_types=['OSD'],
+ crush_bucket_type='rack',
+ crush_bucket_name='rack1',
+ )
+ with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+ status = wait(cephadm_module, cephadm_module.upgrade_status())
+ assert status.which == 'Upgrading daemons of type(s) OSD (OSDs in bucket scope)'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_mixed_daemon_types(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'target', 'pid',
+ target_digests=['digest1'],
+ daemon_types=['mon', 'osd'],
+ crush_bucket_type='rack',
+ crush_bucket_name='rack1',
+ )
+ with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+ status = wait(cephadm_module, cephadm_module.upgrade_status())
+ assert status.which == (
+ 'Upgrading daemons of type(s) mon,osd (OSDs in bucket scope)')
+
+
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_full_cluster_with_crush_bucket(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'target', 'pid',
+ target_digests=['digest1'],
+ crush_bucket_type='rack',
+ crush_bucket_name='rack1',
+ )
+ with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+ status = wait(cephadm_module, cephadm_module.upgrade_status())
+ assert status.which == (
+ 'Upgrading all daemon types on all hosts (OSDs only in bucket scope)')
+
+
+def test_parse_ok_to_upgrade_mon_json_nested_and_flat():
+ nested = '{"ok_to_upgrade": {"ok_to_upgrade": true, "all_osds_upgraded": false}}'
+ inner = parse_ok_to_upgrade_mon_json(nested)
+ assert inner['ok_to_upgrade'] is True
+ flat = '{"ok_to_upgrade": true}'
+ d = parse_ok_to_upgrade_mon_json(flat)
+ assert d['ok_to_upgrade'] is True
+
+
+def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator):
+ cephadm_module.check_mon_command = mock.MagicMock(
+ return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', ''))
+ rep = request_osd_ok_to_upgrade_report(
+ cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3)
+ assert rep['ok_to_upgrade'] is True
+ cephadm_module.check_mon_command.assert_called_once()
+ cmd = cephadm_module.check_mon_command.call_args[0][0]
+ assert cmd['prefix'] == 'osd ok-to-upgrade'
+ assert cmd['crush_bucket'] == 'mybucket'
+ assert cmd['ceph_version'] == '20.1.0-144.el9cp'
+ assert cmd['max'] == 3
+
+
+def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_chassis_ok(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'chassis', 'c1', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_host_ok(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'host', 'host1', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_invalid_type(cephadm_module: CephadmOrchestrator):
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'root', 'default', ['osd'])
+ assert str(err.value) == (
+ "Supported bucket types for OSD upgrade are: chassis, host, rack (specified: 'root')")
+
+
+def test_validate_failure_domain_upgrade_options_pairing(cephadm_module: CephadmOrchestrator):
+ both_msg = 'Both --crush_bucket_type and --crush_bucket_name must be specified together'
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', None, ['osd'])
+ assert str(err.value) == both_msg
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ None, 'rack-a', ['osd'])
+ assert str(err.value) == both_msg
+
+
+def test_validate_failure_domain_upgrade_options_comma_in_name(cephadm_module: CephadmOrchestrator):
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'a,b', ['osd'])
+ assert str(err.value) == (
+ 'Invalid --crush_bucket_name: use a single name token without commas')
+
+
+def test_validate_failure_domain_upgrade_options_multi_token_name(cephadm_module: CephadmOrchestrator):
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a rack-b', ['osd'])
+ assert str(err.value) == (
+ 'Invalid --crush_bucket_name: use a single name token without commas')
+
+
+def test_validate_failure_domain_upgrade_options_daemon_types(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a', None)
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a', ['osd', 'mds'])
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a', ['mgr', 'mon', 'osd'])
+ osd_msg = 'Bucket parameters for OSD upgrade require --daemon-types to include "osd"'
+ with pytest.raises(OrchestratorError) as err:
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'rack-a', ['mgr', 'mon'])
+ assert str(err.value) == osd_msg
+
+
+def test_validate_failure_domain_upgrade_options_name_not_consulting_crush_map(
+ cephadm_module: CephadmOrchestrator):
+ """Name existence in the CRUSH map is not validated here."""
+ cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+ 'rack', 'not-in-map', ['osd'])
+
+
+@mock.patch('cephadm.serve.CephadmServe._run_cephadm', _run_cephadm('{}'))
def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'host1'):
with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''):
+import errno
import json
import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set
from cephadm.services.service_registry import service_registry
import orchestrator
logger = logging.getLogger(__name__)
+# Underlying mon ok-to-upgrade command supports the following
+# OSD upgrade failure domain types relevant for parallelization.
+CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS = frozenset({'rack', 'chassis', 'host'})
+
# from ceph_fs.h
CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
CEPH_MDSMAP_NOT_JOINABLE = (1 << 0)
return digest
+def parse_ok_to_upgrade_mon_json(out: str) -> dict:
+ """
+ Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the
+ report under a top-level ``ok_to_upgrade`` object.
+ If the report is not nested, return the entire JSON object.
+ """
+ parsed = json.loads(out)
+ nested_ok_report = parsed.get('ok_to_upgrade')
+ if isinstance(nested_ok_report, dict):
+ return nested_ok_report
+ return parsed
+
+
+def request_osd_ok_to_upgrade_report(
+ mgr: "CephadmOrchestrator",
+ crush_bucket: str,
+ ceph_version_short: str,
+ max_osds: int,
+) -> dict:
+ """
+ Send ``osd ok-to-upgrade`` to the monitor.
+
+ *ceph_version_short* must be the **inspected** short token from the target
+ image's ``ceph version …`` line (same as ``upgrade_state.target_version``
+ after the first pull in ``_do_upgrade``). It is not necessarily the same
+ string as CLI ``--ceph-version`` (e.g. tag ``18.2.1`` vs build suffix).
+ """
+ cmd: Dict[str, Any] = {
+ 'prefix': 'osd ok-to-upgrade',
+ 'crush_bucket': crush_bucket,
+ 'ceph_version': ceph_version_short,
+ 'max': max_osds,
+ }
+ _return_code, mon_out, _stderr = mgr.check_mon_command(cmd)
+ report = parse_ok_to_upgrade_mon_json(mon_out)
+ logger.debug(
+ 'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r '
+ 'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s '
+ 'osds_in_crush_bucket=%s osds_upgraded=%s bad_no_version=%s',
+ max_osds,
+ crush_bucket,
+ ceph_version_short,
+ report.get('ok_to_upgrade'),
+ report.get('all_osds_upgraded'),
+ report.get('osds_ok_to_upgrade'),
+ report.get('osds_in_crush_bucket'),
+ report.get('osds_upgraded'),
+ report.get('bad_no_version'),
+ )
+ return report
+
+
class UpgradeState:
def __init__(self,
target_name: str,
services: Optional[List[str]] = None,
total_count: Optional[int] = None,
remaining_count: Optional[int] = None,
+ crush_bucket_type: Optional[str] = None,
+ crush_bucket_name: Optional[str] = None,
):
self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
self.services = services
self.total_count = total_count
self.remaining_count = remaining_count
+ self.crush_bucket_type = crush_bucket_type
+ self.crush_bucket_name = crush_bucket_name
def to_json(self) -> dict:
return {
'services': self.services,
'total_count': self.total_count,
'remaining_count': self.remaining_count,
+ 'crush_bucket_type': self.crush_bucket_type,
+ 'crush_bucket_name': self.crush_bucket_name,
}
@classmethod
'UPGRADE_REDEPLOY_DAEMON',
'UPGRADE_BAD_TARGET_VERSION',
'UPGRADE_EXCEPTION',
- 'UPGRADE_OFFLINE_HOST'
+ 'UPGRADE_OFFLINE_HOST',
+ 'UPGRADE_INVALID_CRUSH_BUCKET',
]
def __init__(self, mgr: "CephadmOrchestrator"):
else:
self.upgrade_state = None
self.upgrade_info_str: str = ''
+ # Set during _to_upgrade when last osd ok-to-upgrade call reported all bucket OSDs
+ # on target version (no batch ids); used for logging and to skip repeat mon RPCs.
+ self._ok_to_upgrade_all_osds_upgraded: bool = False
+ # osd.<id> names under the upgrade CRUSH bucket from the last osd ok-to-upgrade
+ # report (``osds_in_crush_bucket``). Used so ok-to-stop ``known`` (cluster-wide)
+ # cannot schedule OSDs outside the bucket.
+ self._ok_to_upgrade_osds_in_crush_bucket: Optional[Set[str]] = None
@property
def target_image(self) -> str:
# FIXME: we assume the first digest is the best one to use
return self.upgrade_state.target_digests[0]
+ def _upgrade_status_osd_bucket_scope_active(self) -> bool:
+ """True when upgrade state selects OSD bucket scope"""
+ st = self.upgrade_state
+ return bool(st and st.crush_bucket_name)
+
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
r = orchestrator.UpgradeStatusSpec()
if self.upgrade_state:
r.progress, r.services_complete = self._get_upgrade_info()
r.is_paused = self.upgrade_state.paused
+ osd_bucket_scope = self._upgrade_status_osd_bucket_scope_active()
if self.upgrade_state.daemon_types is not None:
- which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
+ daemon_types = self.upgrade_state.daemon_types
+ which_str = f'Upgrading daemons of type(s) {",".join(daemon_types)}'
+ types_norm = [(dt or '').strip().lower() for dt in daemon_types]
+ if osd_bucket_scope and 'osd' in types_norm:
+ which_str += ' (OSDs in bucket scope)'
if self.upgrade_state.hosts is not None:
which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
elif self.upgrade_state.services is not None:
which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
elif self.upgrade_state.hosts is not None:
which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
+ elif osd_bucket_scope:
+ which_str = 'Upgrading all daemon types on all hosts (OSDs only in bucket scope)'
else:
which_str = 'Upgrading all daemon types on all hosts'
if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
r["tags"] = sorted(ls)
return r
+ def _validate_failure_domain_upgrade_options(
+ self,
+ crush_bucket_type: Optional[str],
+ crush_bucket_name: Optional[str],
+ daemon_types: Optional[List[str]],
+ ) -> None:
+ # Validates syntax only. Bucket existence validated by monitor's
+ # osd ok-to-upgrade to avoid duplicate validation across layers.
+ bucket_type = (crush_bucket_type or '').strip().lower()
+ bucket_name = (crush_bucket_name or '').strip()
+
+ both_crush_args = bool(bucket_type) and bool(bucket_name)
+ if not both_crush_args:
+ raise OrchestratorError(
+ 'Both --crush_bucket_type and --crush_bucket_name must be specified together')
+
+ if bucket_type not in CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS:
+ allowed = ', '.join(sorted(CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS))
+ raise OrchestratorError(
+ f'Supported bucket types for OSD upgrade are: {allowed} (specified: {crush_bucket_type!r})')
+
+ osd_in_upgrade_scope = (
+ daemon_types is None
+ or any(
+ (dt or '').strip().lower() == 'osd'
+ for dt in daemon_types
+ )
+ )
+ if not osd_in_upgrade_scope:
+ raise OrchestratorError(
+ 'Bucket parameters for OSD upgrade require --daemon-types to include "osd"')
+
+ single_bucket_name = (
+ ',' not in bucket_name
+ and len(bucket_name.split()) == 1
+ and bool(bucket_name)
+ )
+ if not single_bucket_name:
+ raise OrchestratorError(
+ 'Invalid --crush_bucket_name: use a single name token without commas')
+
+ @staticmethod
+ def is_mon_error_for_invalid_bucket(err: MonCommandFailed) -> bool:
+ """
+ CRUSH bucket errors from ``osd ok-to-upgrade`` mon command.
+ Typically ENOENT: unknown bucket name or no OSDs under that bucket.
+ """
+ return f'retval: {-errno.ENOENT}' in str(err)
+
def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
- hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
+ hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None,
+ bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str:
fail_fs_value = cast(bool, self.mgr.get_module_option_ex(
'orchestrator', 'fail_fs', False))
if self.mgr.mode != 'root':
else:
raise OrchestratorError('must specify either image or version')
+ # Validate the failure domain upgrade options
+ # if the user has provided either or both of the
+ # --crush_bucket_type and --crush_bucket_name arguments.
+ if bucket_type is not None or bucket_name is not None:
+ self._validate_failure_domain_upgrade_options(
+ bucket_type, bucket_name, daemon_types)
+
if daemon_types is not None or services is not None or hosts is not None:
self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
services=services,
total_count=limit,
remaining_count=limit,
+ crush_bucket_type=bucket_type,
+ crush_bucket_name=bucket_name,
)
self._update_upgrade_progress(0.0)
self._save_upgrade_state()
target_image = self.target_image
self.mgr.log.info('Upgrade: Stopped')
self.upgrade_state = None
+ self._ok_to_upgrade_osds_in_crush_bucket = None
self._save_upgrade_state()
self._clear_upgrade_health_checks()
self.mgr.event.set()
tries -= 1
return False
+ def _upgrade_uses_ok_to_upgrade_for_osds(self) -> bool:
+ """
+ When ``upgrade_start`` persisted CRUSH bucket scope (bucket type/name
+ and upgrade includes OSDs, validated there), OSDs that still need the target
+ image are batched via ``osd ok-to-upgrade``. If the mon reports
+ ``all_osds_upgraded`` but mgr still has ``(daemon, redeploy_only=False)``
+ rows (same Ceph version string, different container digest/name),
+ ``_to_upgrade`` falls back to ``_wait_for_ok_to_stop`` per daemon like the
+ non-bucket path. Redeploy-only rows are also handled with ok-to-stop.
+ """
+ if not self.upgrade_state:
+ return False
+ state = self.upgrade_state
+ if not state.crush_bucket_name or not state.crush_bucket_type:
+ return False
+
+ return True
+
+ def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None:
+ raw = report.get('osds_in_crush_bucket')
+ if isinstance(raw, list):
+ self._ok_to_upgrade_osds_in_crush_bucket = {
+ f'osd.{osd_id}' for osd_id in raw
+ }
+
+ def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool:
+ # If not using ok-to-upgrade for OSDs, any OSD is valid.
+ if not self._upgrade_uses_ok_to_upgrade_for_osds():
+ return True
+
+ if d.daemon_type != 'osd':
+ return True
+ # If not in the CRUSH bucket, it is not valid.
+ bset = self._ok_to_upgrade_osds_in_crush_bucket
+ if not bset:
+ return False
+ return d.name() in bset
+
+ def _wait_for_ok_to_upgrade_osd_batch(
+ self,
+ known_ok_to_upgrade: List[str],
+ ) -> bool:
+ """
+ Query ``osd ok-to-upgrade`` once and append approved daemon names
+ (``osd.<id>``) to *known_ok_to_upgrade*.
+
+ *max* is ``mgr.max_parallel_osd_upgrades``, matching ``osd ok-to-stop``.
+
+ The monitor's ``ceph_version`` argument is *upgrade_state.target_version*:
+ the short token taken from the target image after inspect in ``_do_upgrade``
+ (not the raw CLI ``--ceph-version`` string).
+ """
+ assert self.upgrade_state is not None
+ bucket_name = self.upgrade_state.crush_bucket_name
+ ceph_version_short = self.upgrade_state.target_version
+ if not bucket_name or not ceph_version_short:
+ logger.error(
+ 'Upgrade: CRUSH bucket OSD upgrade missing bucket name or '
+ 'target_version (inspected short from image); cannot call '
+ 'osd ok-to-upgrade (no ok-to-stop fallback for this mode)')
+ return False
+
+ max_parallel = self.mgr.max_parallel_osd_upgrades
+ remaining_tries = 4
+ while remaining_tries > 0:
+ if not self.upgrade_state or self.upgrade_state.paused:
+ return False
+
+ try:
+ report = request_osd_ok_to_upgrade_report(
+ self.mgr,
+ bucket_name,
+ ceph_version_short,
+ max_osds=max_parallel,
+ )
+ except MonCommandFailed as err:
+ if self.is_mon_error_for_invalid_bucket(err):
+ st = self.upgrade_state
+ btype = st.crush_bucket_type if st else None
+ logger.error('Upgrade: osd ok-to-upgrade failed (CRUSH bucket): %s', err)
+ self._fail_upgrade('UPGRADE_INVALID_CRUSH_BUCKET', {
+ 'severity': 'error',
+ 'summary': 'Upgrade: invalid failure domain for OSD upgrade',
+ 'count': 1,
+ 'detail': [
+ str(err),
+ f'Invalid failure domain for OSD upgrade: --crush_bucket_type={btype!r} '
+ f'--crush_bucket_name={bucket_name!r}',
+ ],
+ })
+ return False
+ logger.info('Upgrade: osd ok-to-upgrade not ready: %s', err)
+ time.sleep(15)
+ remaining_tries -= 1
+ continue
+
+ self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report)
+
+ # Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``.
+ # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids.
+ raw_osds = report.get('osds_ok_to_upgrade')
+ if isinstance(raw_osds, list):
+ approved_names = [f'osd.{osd_id}' for osd_id in raw_osds]
+ else:
+ approved_names = []
+ bad_no_version = report.get('bad_no_version') or []
+ if (
+ not approved_names
+ and report.get('all_osds_upgraded') is True
+ and not bad_no_version
+ ):
+ self._ok_to_upgrade_all_osds_upgraded = True
+ logger.info(
+ 'Upgrade: osd ok-to-upgrade reports all OSDs under crush_bucket=%r '
+ 'on ceph_version=%r',
+ bucket_name,
+ ceph_version_short,
+ )
+ return True
+
+ if not approved_names:
+ logger.info(
+ 'Upgrade: osd ok-to-upgrade returned no OSDs for '
+ 'crush_bucket=%r ceph_version=%r (wrong or missing CRUSH '
+ 'bucket name is a common cause; also check bucket has OSDs '
+ 'not yet on the target version). Report: %s',
+ bucket_name,
+ ceph_version_short,
+ report,
+ )
+ time.sleep(15)
+ remaining_tries -= 1
+ continue
+
+ # gets reset for each batch of OSDs
+ self._ok_to_upgrade_all_osds_upgraded = False
+
+ known_ok_to_upgrade.extend(approved_names)
+ logger.info(
+ 'Upgrade: osd ok-to-upgrade bucket=%r max=%s approved %s',
+ bucket_name,
+ max_parallel,
+ approved_names,
+ )
+ return True
+
+ return False
+
def _clear_upgrade_health_checks(self) -> None:
for k in self.UPGRADE_ERRORS:
if k in self.mgr.health_checks:
continue
if correct_image:
+ # Matches target (digests or image name per use_repo_digest) but not
+ # deployed_by target digests; redeploy only.
logger.debug('daemon %s.%s not deployed by correct version' % (
d.daemon_type, d.daemon_id))
need_upgrade_deployer.append((d, True))
else:
+ # Does not match target digests or target image name
logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
d.daemon_type, d.daemon_id,
d.container_image_name, d.container_image_digests, d.version))
return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
+ # return True if the upgrade is safe to proceed, False otherwise
+ # to_upgrade is a list of daemons that need to be upgraded
def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
to_upgrade: List[Tuple[DaemonDescription, bool]] = []
known_ok_to_stop: List[str] = []
+ known_ok_to_upgrade: List[str] = []
+ self._ok_to_upgrade_all_osds_upgraded = False
for d_entry in need_upgrade:
d = d_entry[0]
assert d.daemon_type is not None
'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
continue
- if known_ok_to_stop:
- if d.name() in known_ok_to_stop:
- logger.info(f'Upgrade: {d.name()} is also safe to restart')
+ # d_entry[1] True means that the daemon is already on the target image and
+ # just needs redeployment. Use ok-to-stop for PG safety.
+ # Without this branch, redeploy-only bucket OSDs never appear in the osd
+ # ok-to-upgrade batch and are skipped (continue), so they never reach
+ # to_upgrade and the upgrade can stall while they remain in need_upgrade.
+ if (
+ d.daemon_type == 'osd'
+ and self._upgrade_uses_ok_to_upgrade_for_osds()
+ and d_entry[1]
+ ):
+ if not self.is_osd_upgrade_valid_for_failure_domain(d):
+ continue
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+ logger.info(f'Upgrade: {d.name()} is safe to redeploy')
+ to_upgrade.append(d_entry)
+ if not known_ok_to_stop:
+ break
+ continue
+
+ if known_ok_to_stop or known_ok_to_upgrade:
+ if (
+ (d.name() in known_ok_to_stop or d.name() in known_ok_to_upgrade)
+ and self.is_osd_upgrade_valid_for_failure_domain(d)
+ ):
+ logger.info(f'Upgrade: {d.name()} is safe to restart')
to_upgrade.append(d_entry)
continue
if d.daemon_type == 'osd':
- # NOTE: known_ok_to_stop is an output argument for
- # _wait_for_ok_to_stop
- if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
- return False, to_upgrade
+ if self._upgrade_uses_ok_to_upgrade_for_osds():
+ # Refresh ok-to-upgrade batch when we do not have one yet.
+ if not known_ok_to_upgrade and not self._ok_to_upgrade_all_osds_upgraded:
+ if not self._wait_for_ok_to_upgrade_osd_batch(known_ok_to_upgrade):
+ return False, to_upgrade
+
+ if d.name() not in known_ok_to_upgrade:
+ # Mon ok-to-upgrade state (all upgraded) can disagree with cephadm's
+ # view of this OSD (still wrong image in need_upgrade);
+ # Handle with ok-to-stop for PG safety.
+ if (
+ self._ok_to_upgrade_all_osds_upgraded
+ and not known_ok_to_upgrade
+ and not d_entry[1]
+ ):
+ if not self.is_osd_upgrade_valid_for_failure_domain(d):
+ continue
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
+ # Add to to_upgrade list for redeployment.
+ to_upgrade.append(d_entry)
+ if not known_ok_to_stop:
+ break
+ continue
+ else:
+ # NOTE: known_ok_to_stop is an output argument for
+ # _wait_for_ok_to_stop
+ if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+ return False, to_upgrade
if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
and not self._wait_for_ok_to_stop(d, known_ok_to_stop):
return False, to_upgrade
+ if (
+ d.daemon_type == 'osd'
+ and self._upgrade_uses_ok_to_upgrade_for_osds()
+ and not self.is_osd_upgrade_valid_for_failure_domain(d)
+ ):
+ continue
+
to_upgrade.append(d_entry)
- # if we don't have a list of others to consider, stop now
+ # ok-to-stop did not add peer names to known_ok_to_stop.
+ # For osd/mds/mon we then stop scanning need_upgrade this pass.
+ # This helps:
+ # 1. Limit how many core daemons get queued in a single pass without
+ # a mon-supplied peer batch in known_ok_to_stop.
+ # 2. Yield between batches of core daemons to allow the mon to catch up.
if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
+ # osd ok-to-upgrade batch is not empty, so keep looping to
+ # add more OSDs to the batch
+ if d.daemon_type == 'osd' and self._upgrade_uses_ok_to_upgrade_for_osds() and (len(known_ok_to_upgrade) > 0):
+ continue # do not break
break
+
return True, to_upgrade
def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
self.mgr.remote('progress', 'complete',
self.upgrade_state.progress_id)
self.upgrade_state = None
+ self._ok_to_upgrade_osds_in_crush_bucket = None
self._save_upgrade_state()
def _do_upgrade(self):
})
return
self.upgrade_state.target_id = target_id
- # extract the version portion of 'ceph version {version} ({sha1})'
+ # Short token from ``ceph version …`` inside the target image; this
+ # is what the monitor expects for ``osd ok-to-upgrade`` ceph_version
+ # (may differ from CLI --ceph-version, e.g. full build id vs tag).
self.upgrade_state.target_version = target_version.split(' ')[2]
self.upgrade_state.target_digests = target_digests
self._save_upgrade_state()
logger.debug('Upgrade: Checking %s daemons' % daemon_type)
daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
+ # need_upgrade_self: True if the daemon is active mgr itself and needs to be upgraded
+ # need_upgrade: List of daemons that need to be upgraded
+ # need_upgrade_deployer: List of daemons that need to be redeployed
+ # done: Number of daemons that have been upgraded
need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
daemons_of_type, target_digests, target_image)
upgraded_daemon_count += done