From: Ashwin M. Joshi Date: Tue, 7 Apr 2026 10:05:10 +0000 (+0530) Subject: mgr: Bucket scoped OSD upgrades using ok-to-upgrade X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9cb9e9ff7e77ab965591fc8bb7b94557b9eddc67;p=ceph.git mgr: Bucket scoped OSD upgrades using ok-to-upgrade Fixes: https://tracker.ceph.com/issues/75603 Signed-off-by: Ashwin M. Joshi Conflicts: src/pybind/mgr/orchestrator/module.py --- diff --git a/doc/cephadm/upgrade.rst b/doc/cephadm/upgrade.rst index 003e0ea15339..78e101d5bf7d 100644 --- a/doc/cephadm/upgrade.rst +++ b/doc/cephadm/upgrade.rst @@ -100,6 +100,44 @@ For example, to upgrade to v16.2.6, run the following command: ceph orch upgrade start --image quay.io/ceph/ceph:v16.2.6 +CRUSH bucket scoped OSD upgrades (``osd ok-to-upgrade``) +======================================================== + +For **OSD-only** upgrades you can limit which failure domain cephadm works +through and ask the monitor which OSDs under a given CRUSH bucket may safely +move to the target **Ceph short version** (the same string as +``ceph_version_short`` in OSD metadata, e.g. ``20.3.0-3803-g63ca1ffb5a2`` or +``20.1.0-144.el9cp``). + +Requirements: + +* For OSD-only upgrades, pass both ``--crush_bucket_type`` and ``--crush_bucket_name``. + Supported types today are ``host``, ``rack``, and ``chassis``. +* The monitor's ``osd ok-to-upgrade`` expects the target **short** Ceph version + (same shape as ``ceph_version_short`` in ``ceph osd metadata``). Cephadm does + **not** fall back to ``osd ok-to-stop`` for bucket-scoped OSD runs. If the mon + returns no OSDs (e.g. unknown bucket name), cephadm logs details and retries. +* If bucket parameters are not provided, cephadm will fall back to ``osd ok-to-stop`` + for OSD upgrades. +* Bucket scope applies only to OSDs. Other daemon types (mon, mgr, mds) are + upgraded cluster-wide without bucket constraints. + +Example: + +.. prompt:: bash # + + ceph orch upgrade start --image quay.ceph.io/ceph-ci/ceph:recent-git-branch-name + --daemon-types osd \\ + --crush_bucket_type rack --crush_bucket_name rack-a + +For each failure domain batch, cephadm calls ``ceph osd ok-to-upgrade`` with the +specified failure domain name, the target short version, and ``max`` set to +:confval:`mgr/cephadm/max_parallel_osd_upgrades` + +Note that it is not recommended to change the CRUSH bucket type or name after +the upgrade has started as it may cause the upgrade to fail. + + Monitoring the Upgrade ====================== diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 77630d4d50a9..aff629fcde1e 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -4162,7 +4162,8 @@ Then run the following: @handle_orch_error def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, - services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: + services: Optional[List[str]] = None, limit: Optional[int] = None, + bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str: if self.inventory.get_host_with_state("maintenance"): raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state") if self.offline_hosts: @@ -4192,12 +4193,17 @@ Then run the following: raise OrchestratorError( f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts') + if hosts and (bucket_type is not None or bucket_name is not None): + raise OrchestratorError( + '--hosts cannot be combined with --crush_bucket_type or --crush_bucket_name') + if limit is not None: if limit < 1: raise OrchestratorError( f'Upgrade aborted - --limit arg must be a positive integer, not {limit}') - return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit) + return self.upgrade.upgrade_start( + image, version, daemon_types, hosts, services, limit, bucket_type, bucket_name) @handle_orch_error def upgrade_pause(self) -> str: diff --git a/src/pybind/mgr/cephadm/tests/test_remote_executables.py b/src/pybind/mgr/cephadm/tests/test_remote_executables.py index 433dc916d129..26b02e9de96b 100644 --- a/src/pybind/mgr/cephadm/tests/test_remote_executables.py +++ b/src/pybind/mgr/cephadm/tests/test_remote_executables.py @@ -104,6 +104,8 @@ def _names(node): return [f""] if isinstance(node, ast.BinOp): return [f""] if ( isinstance(node, ast.Add) or isinstance(node, ast.Sub) diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py index 3b5c305b5f0f..4b3e91590682 100644 --- a/src/pybind/mgr/cephadm/tests/test_upgrade.py +++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py @@ -5,7 +5,12 @@ import pytest from ceph.deployment.service_spec import PlacementSpec, ServiceSpec from cephadm import CephadmOrchestrator -from cephadm.upgrade import CephadmUpgrade, UpgradeState +from cephadm.upgrade import ( + CephadmUpgrade, + UpgradeState, + parse_ok_to_upgrade_mon_json, + request_osd_ok_to_upgrade_report, +) from cephadm.ssh import HostConnectionError from cephadm.utils import ContainerInspectInfo from orchestrator import OrchestratorError, DaemonDescription @@ -36,6 +41,22 @@ def test_upgrade_start(cephadm_module: CephadmOrchestrator): ) == 'Stopped upgrade to image_id' +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_start_hosts_mutually_exclusive_with_bucket(cephadm_module: CephadmOrchestrator): + with with_host(cephadm_module, 'test'): + with with_host(cephadm_module, 'test2'): + with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), status_running=True): + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade_start( + 'image_id', None, + daemon_types=['osd'], + host_placement='test', + bucket_type='rack', + bucket_name='rack-a', + ) + assert str(err.value) == '--hosts cannot be combined with --crush_bucket_type or --crush_bucket_name' + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_upgrade_start_offline_hosts(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'test'): @@ -198,7 +219,168 @@ def test_upgrade_state_null(cephadm_module: CephadmOrchestrator): assert CephadmUpgrade(cephadm_module).upgrade_state is None +def test_upgrade_state_crush_roundtrip(): + u = UpgradeState( + 'target', 'pid', crush_bucket_type='rack', crush_bucket_name='rack1') + restored = UpgradeState.from_json(u.to_json()) + assert restored + assert restored.crush_bucket_type == 'rack' + assert restored.crush_bucket_name == 'rack1' + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_status_which_crush_osd_only(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'target', 'pid', + target_digests=['digest1'], + daemon_types=['osd'], + crush_bucket_type='rack', + crush_bucket_name='rack1', + ) + with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])): + status = wait(cephadm_module, cephadm_module.upgrade_status()) + assert status.which == 'Upgrading daemons of type(s) osd (OSDs in bucket scope)' + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_status_which_crush_osd_only_uppercase(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'target', 'pid', + target_digests=['digest1'], + daemon_types=['OSD'], + crush_bucket_type='rack', + crush_bucket_name='rack1', + ) + with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])): + status = wait(cephadm_module, cephadm_module.upgrade_status()) + assert status.which == 'Upgrading daemons of type(s) OSD (OSDs in bucket scope)' + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_status_which_crush_mixed_daemon_types(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'target', 'pid', + target_digests=['digest1'], + daemon_types=['mon', 'osd'], + crush_bucket_type='rack', + crush_bucket_name='rack1', + ) + with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])): + status = wait(cephadm_module, cephadm_module.upgrade_status()) + assert status.which == ( + 'Upgrading daemons of type(s) mon,osd (OSDs in bucket scope)') + + @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_upgrade_status_which_full_cluster_with_crush_bucket(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'target', 'pid', + target_digests=['digest1'], + crush_bucket_type='rack', + crush_bucket_name='rack1', + ) + with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])): + status = wait(cephadm_module, cephadm_module.upgrade_status()) + assert status.which == ( + 'Upgrading all daemon types on all hosts (OSDs only in bucket scope)') + + +def test_parse_ok_to_upgrade_mon_json_nested_and_flat(): + nested = '{"ok_to_upgrade": {"ok_to_upgrade": true, "all_osds_upgraded": false}}' + inner = parse_ok_to_upgrade_mon_json(nested) + assert inner['ok_to_upgrade'] is True + flat = '{"ok_to_upgrade": true}' + d = parse_ok_to_upgrade_mon_json(flat) + assert d['ok_to_upgrade'] is True + + +def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator): + cephadm_module.check_mon_command = mock.MagicMock( + return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', '')) + rep = request_osd_ok_to_upgrade_report( + cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3) + assert rep['ok_to_upgrade'] is True + cephadm_module.check_mon_command.assert_called_once() + cmd = cephadm_module.check_mon_command.call_args[0][0] + assert cmd['prefix'] == 'osd ok-to-upgrade' + assert cmd['crush_bucket'] == 'mybucket' + assert cmd['ceph_version'] == '20.1.0-144.el9cp' + assert cmd['max'] == 3 + + +def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a', ['osd']) + + +def test_validate_failure_domain_upgrade_options_chassis_ok(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'chassis', 'c1', ['osd']) + + +def test_validate_failure_domain_upgrade_options_host_ok(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'host', 'host1', ['osd']) + + +def test_validate_failure_domain_upgrade_options_invalid_type(cephadm_module: CephadmOrchestrator): + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'root', 'default', ['osd']) + assert str(err.value) == ( + "Supported bucket types for OSD upgrade are: chassis, host, rack (specified: 'root')") + + +def test_validate_failure_domain_upgrade_options_pairing(cephadm_module: CephadmOrchestrator): + both_msg = 'Both --crush_bucket_type and --crush_bucket_name must be specified together' + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', None, ['osd']) + assert str(err.value) == both_msg + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + None, 'rack-a', ['osd']) + assert str(err.value) == both_msg + + +def test_validate_failure_domain_upgrade_options_comma_in_name(cephadm_module: CephadmOrchestrator): + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'a,b', ['osd']) + assert str(err.value) == ( + 'Invalid --crush_bucket_name: use a single name token without commas') + + +def test_validate_failure_domain_upgrade_options_multi_token_name(cephadm_module: CephadmOrchestrator): + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a rack-b', ['osd']) + assert str(err.value) == ( + 'Invalid --crush_bucket_name: use a single name token without commas') + + +def test_validate_failure_domain_upgrade_options_daemon_types(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a', None) + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a', ['osd', 'mds']) + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a', ['mgr', 'mon', 'osd']) + osd_msg = 'Bucket parameters for OSD upgrade require --daemon-types to include "osd"' + with pytest.raises(OrchestratorError) as err: + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'rack-a', ['mgr', 'mon']) + assert str(err.value) == osd_msg + + +def test_validate_failure_domain_upgrade_options_name_not_consulting_crush_map( + cephadm_module: CephadmOrchestrator): + """Name existence in the CRUSH map is not validated here.""" + cephadm_module.upgrade._validate_failure_domain_upgrade_options( + 'rack', 'not-in-map', ['osd']) + + +@mock.patch('cephadm.serve.CephadmServe._run_cephadm', _run_cephadm('{}')) def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1'): with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''): diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 26396f7f93d7..e85da50213eb 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -1,8 +1,9 @@ +import errno import json import logging import time import uuid -from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast +from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set from cephadm.services.service_registry import service_registry import orchestrator @@ -22,6 +23,10 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# Underlying mon ok-to-upgrade command supports the following +# OSD upgrade failure domain types relevant for parallelization. +CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS = frozenset({'rack', 'chassis', 'host'}) + # from ceph_fs.h CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5) CEPH_MDSMAP_NOT_JOINABLE = (1 << 0) @@ -54,6 +59,58 @@ def normalize_image_digest(digest: str, default_registry: str) -> str: return digest +def parse_ok_to_upgrade_mon_json(out: str) -> dict: + """ + Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the + report under a top-level ``ok_to_upgrade`` object. + If the report is not nested, return the entire JSON object. + """ + parsed = json.loads(out) + nested_ok_report = parsed.get('ok_to_upgrade') + if isinstance(nested_ok_report, dict): + return nested_ok_report + return parsed + + +def request_osd_ok_to_upgrade_report( + mgr: "CephadmOrchestrator", + crush_bucket: str, + ceph_version_short: str, + max_osds: int, +) -> dict: + """ + Send ``osd ok-to-upgrade`` to the monitor. + + *ceph_version_short* must be the **inspected** short token from the target + image's ``ceph version …`` line (same as ``upgrade_state.target_version`` + after the first pull in ``_do_upgrade``). It is not necessarily the same + string as CLI ``--ceph-version`` (e.g. tag ``18.2.1`` vs build suffix). + """ + cmd: Dict[str, Any] = { + 'prefix': 'osd ok-to-upgrade', + 'crush_bucket': crush_bucket, + 'ceph_version': ceph_version_short, + 'max': max_osds, + } + _return_code, mon_out, _stderr = mgr.check_mon_command(cmd) + report = parse_ok_to_upgrade_mon_json(mon_out) + logger.debug( + 'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r ' + 'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s ' + 'osds_in_crush_bucket=%s osds_upgraded=%s bad_no_version=%s', + max_osds, + crush_bucket, + ceph_version_short, + report.get('ok_to_upgrade'), + report.get('all_osds_upgraded'), + report.get('osds_ok_to_upgrade'), + report.get('osds_in_crush_bucket'), + report.get('osds_upgraded'), + report.get('bad_no_version'), + ) + return report + + class UpgradeState: def __init__(self, target_name: str, @@ -71,6 +128,8 @@ class UpgradeState: services: Optional[List[str]] = None, total_count: Optional[int] = None, remaining_count: Optional[int] = None, + crush_bucket_type: Optional[str] = None, + crush_bucket_name: Optional[str] = None, ): self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. self.progress_id: str = progress_id @@ -88,6 +147,8 @@ class UpgradeState: self.services = services self.total_count = total_count self.remaining_count = remaining_count + self.crush_bucket_type = crush_bucket_type + self.crush_bucket_name = crush_bucket_name def to_json(self) -> dict: return { @@ -106,6 +167,8 @@ class UpgradeState: 'services': self.services, 'total_count': self.total_count, 'remaining_count': self.remaining_count, + 'crush_bucket_type': self.crush_bucket_type, + 'crush_bucket_name': self.crush_bucket_name, } @classmethod @@ -127,7 +190,8 @@ class CephadmUpgrade: 'UPGRADE_REDEPLOY_DAEMON', 'UPGRADE_BAD_TARGET_VERSION', 'UPGRADE_EXCEPTION', - 'UPGRADE_OFFLINE_HOST' + 'UPGRADE_OFFLINE_HOST', + 'UPGRADE_INVALID_CRUSH_BUCKET', ] def __init__(self, mgr: "CephadmOrchestrator"): @@ -139,6 +203,13 @@ class CephadmUpgrade: else: self.upgrade_state = None self.upgrade_info_str: str = '' + # Set during _to_upgrade when last osd ok-to-upgrade call reported all bucket OSDs + # on target version (no batch ids); used for logging and to skip repeat mon RPCs. + self._ok_to_upgrade_all_osds_upgraded: bool = False + # osd. names under the upgrade CRUSH bucket from the last osd ok-to-upgrade + # report (``osds_in_crush_bucket``). Used so ok-to-stop ``known`` (cluster-wide) + # cannot schedule OSDs outside the bucket. + self._ok_to_upgrade_osds_in_crush_bucket: Optional[Set[str]] = None @property def target_image(self) -> str: @@ -151,6 +222,11 @@ class CephadmUpgrade: # FIXME: we assume the first digest is the best one to use return self.upgrade_state.target_digests[0] + def _upgrade_status_osd_bucket_scope_active(self) -> bool: + """True when upgrade state selects OSD bucket scope""" + st = self.upgrade_state + return bool(st and st.crush_bucket_name) + def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: r = orchestrator.UpgradeStatusSpec() if self.upgrade_state: @@ -159,8 +235,13 @@ class CephadmUpgrade: r.progress, r.services_complete = self._get_upgrade_info() r.is_paused = self.upgrade_state.paused + osd_bucket_scope = self._upgrade_status_osd_bucket_scope_active() if self.upgrade_state.daemon_types is not None: - which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}' + daemon_types = self.upgrade_state.daemon_types + which_str = f'Upgrading daemons of type(s) {",".join(daemon_types)}' + types_norm = [(dt or '').strip().lower() for dt in daemon_types] + if osd_bucket_scope and 'osd' in types_norm: + which_str += ' (OSDs in bucket scope)' if self.upgrade_state.hosts is not None: which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' elif self.upgrade_state.services is not None: @@ -169,6 +250,8 @@ class CephadmUpgrade: which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}' elif self.upgrade_state.hosts is not None: which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}' + elif osd_bucket_scope: + which_str = 'Upgrading all daemon types on all hosts (OSDs only in bucket scope)' else: which_str = 'Upgrading all daemon types on all hosts' if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None: @@ -310,8 +393,58 @@ class CephadmUpgrade: r["tags"] = sorted(ls) return r + def _validate_failure_domain_upgrade_options( + self, + crush_bucket_type: Optional[str], + crush_bucket_name: Optional[str], + daemon_types: Optional[List[str]], + ) -> None: + # Validates syntax only. Bucket existence validated by monitor's + # osd ok-to-upgrade to avoid duplicate validation across layers. + bucket_type = (crush_bucket_type or '').strip().lower() + bucket_name = (crush_bucket_name or '').strip() + + both_crush_args = bool(bucket_type) and bool(bucket_name) + if not both_crush_args: + raise OrchestratorError( + 'Both --crush_bucket_type and --crush_bucket_name must be specified together') + + if bucket_type not in CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS: + allowed = ', '.join(sorted(CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS)) + raise OrchestratorError( + f'Supported bucket types for OSD upgrade are: {allowed} (specified: {crush_bucket_type!r})') + + osd_in_upgrade_scope = ( + daemon_types is None + or any( + (dt or '').strip().lower() == 'osd' + for dt in daemon_types + ) + ) + if not osd_in_upgrade_scope: + raise OrchestratorError( + 'Bucket parameters for OSD upgrade require --daemon-types to include "osd"') + + single_bucket_name = ( + ',' not in bucket_name + and len(bucket_name.split()) == 1 + and bool(bucket_name) + ) + if not single_bucket_name: + raise OrchestratorError( + 'Invalid --crush_bucket_name: use a single name token without commas') + + @staticmethod + def is_mon_error_for_invalid_bucket(err: MonCommandFailed) -> bool: + """ + CRUSH bucket errors from ``osd ok-to-upgrade`` mon command. + Typically ENOENT: unknown bucket name or no OSDs under that bucket. + """ + return f'retval: {-errno.ENOENT}' in str(err) + def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, - hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str: + hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None, + bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str: fail_fs_value = cast(bool, self.mgr.get_module_option_ex( 'orchestrator', 'fail_fs', False)) if self.mgr.mode != 'root': @@ -327,6 +460,13 @@ class CephadmUpgrade: else: raise OrchestratorError('must specify either image or version') + # Validate the failure domain upgrade options + # if the user has provided either or both of the + # --crush_bucket_type and --crush_bucket_name arguments. + if bucket_type is not None or bucket_name is not None: + self._validate_failure_domain_upgrade_options( + bucket_type, bucket_name, daemon_types) + if daemon_types is not None or services is not None or hosts is not None: self._validate_upgrade_filters(target_name, daemon_types, hosts, services) @@ -357,6 +497,8 @@ class CephadmUpgrade: services=services, total_count=limit, remaining_count=limit, + crush_bucket_type=bucket_type, + crush_bucket_name=bucket_name, ) self._update_upgrade_progress(0.0) self._save_upgrade_state() @@ -490,6 +632,7 @@ class CephadmUpgrade: target_image = self.target_image self.mgr.log.info('Upgrade: Stopped') self.upgrade_state = None + self._ok_to_upgrade_osds_in_crush_bucket = None self._save_upgrade_state() self._clear_upgrade_health_checks() self.mgr.event.set() @@ -563,6 +706,154 @@ class CephadmUpgrade: tries -= 1 return False + def _upgrade_uses_ok_to_upgrade_for_osds(self) -> bool: + """ + When ``upgrade_start`` persisted CRUSH bucket scope (bucket type/name + and upgrade includes OSDs, validated there), OSDs that still need the target + image are batched via ``osd ok-to-upgrade``. If the mon reports + ``all_osds_upgraded`` but mgr still has ``(daemon, redeploy_only=False)`` + rows (same Ceph version string, different container digest/name), + ``_to_upgrade`` falls back to ``_wait_for_ok_to_stop`` per daemon like the + non-bucket path. Redeploy-only rows are also handled with ok-to-stop. + """ + if not self.upgrade_state: + return False + state = self.upgrade_state + if not state.crush_bucket_name or not state.crush_bucket_type: + return False + + return True + + def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None: + raw = report.get('osds_in_crush_bucket') + if isinstance(raw, list): + self._ok_to_upgrade_osds_in_crush_bucket = { + f'osd.{osd_id}' for osd_id in raw + } + + def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool: + # If not using ok-to-upgrade for OSDs, any OSD is valid. + if not self._upgrade_uses_ok_to_upgrade_for_osds(): + return True + + if d.daemon_type != 'osd': + return True + # If not in the CRUSH bucket, it is not valid. + bset = self._ok_to_upgrade_osds_in_crush_bucket + if not bset: + return False + return d.name() in bset + + def _wait_for_ok_to_upgrade_osd_batch( + self, + known_ok_to_upgrade: List[str], + ) -> bool: + """ + Query ``osd ok-to-upgrade`` once and append approved daemon names + (``osd.``) to *known_ok_to_upgrade*. + + *max* is ``mgr.max_parallel_osd_upgrades``, matching ``osd ok-to-stop``. + + The monitor's ``ceph_version`` argument is *upgrade_state.target_version*: + the short token taken from the target image after inspect in ``_do_upgrade`` + (not the raw CLI ``--ceph-version`` string). + """ + assert self.upgrade_state is not None + bucket_name = self.upgrade_state.crush_bucket_name + ceph_version_short = self.upgrade_state.target_version + if not bucket_name or not ceph_version_short: + logger.error( + 'Upgrade: CRUSH bucket OSD upgrade missing bucket name or ' + 'target_version (inspected short from image); cannot call ' + 'osd ok-to-upgrade (no ok-to-stop fallback for this mode)') + return False + + max_parallel = self.mgr.max_parallel_osd_upgrades + remaining_tries = 4 + while remaining_tries > 0: + if not self.upgrade_state or self.upgrade_state.paused: + return False + + try: + report = request_osd_ok_to_upgrade_report( + self.mgr, + bucket_name, + ceph_version_short, + max_osds=max_parallel, + ) + except MonCommandFailed as err: + if self.is_mon_error_for_invalid_bucket(err): + st = self.upgrade_state + btype = st.crush_bucket_type if st else None + logger.error('Upgrade: osd ok-to-upgrade failed (CRUSH bucket): %s', err) + self._fail_upgrade('UPGRADE_INVALID_CRUSH_BUCKET', { + 'severity': 'error', + 'summary': 'Upgrade: invalid failure domain for OSD upgrade', + 'count': 1, + 'detail': [ + str(err), + f'Invalid failure domain for OSD upgrade: --crush_bucket_type={btype!r} ' + f'--crush_bucket_name={bucket_name!r}', + ], + }) + return False + logger.info('Upgrade: osd ok-to-upgrade not ready: %s', err) + time.sleep(15) + remaining_tries -= 1 + continue + + self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report) + + # Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``. + # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids. + raw_osds = report.get('osds_ok_to_upgrade') + if isinstance(raw_osds, list): + approved_names = [f'osd.{osd_id}' for osd_id in raw_osds] + else: + approved_names = [] + bad_no_version = report.get('bad_no_version') or [] + if ( + not approved_names + and report.get('all_osds_upgraded') is True + and not bad_no_version + ): + self._ok_to_upgrade_all_osds_upgraded = True + logger.info( + 'Upgrade: osd ok-to-upgrade reports all OSDs under crush_bucket=%r ' + 'on ceph_version=%r', + bucket_name, + ceph_version_short, + ) + return True + + if not approved_names: + logger.info( + 'Upgrade: osd ok-to-upgrade returned no OSDs for ' + 'crush_bucket=%r ceph_version=%r (wrong or missing CRUSH ' + 'bucket name is a common cause; also check bucket has OSDs ' + 'not yet on the target version). Report: %s', + bucket_name, + ceph_version_short, + report, + ) + time.sleep(15) + remaining_tries -= 1 + continue + + # gets reset for each batch of OSDs + self._ok_to_upgrade_all_osds_upgraded = False + + known_ok_to_upgrade.extend(approved_names) + logger.info( + 'Upgrade: osd ok-to-upgrade bucket=%r max=%s approved %s', + bucket_name, + max_parallel, + approved_names, + ) + return True + + return False + def _clear_upgrade_health_checks(self) -> None: for k in self.UPGRADE_ERRORS: if k in self.mgr.health_checks: @@ -806,10 +1097,13 @@ class CephadmUpgrade: continue if correct_image: + # Matches target (digests or image name per use_repo_digest) but not + # deployed_by target digests; redeploy only. logger.debug('daemon %s.%s not deployed by correct version' % ( d.daemon_type, d.daemon_id)) need_upgrade_deployer.append((d, True)) else: + # Does not match target digests or target image name logger.debug('daemon %s.%s not correct (%s, %s, %s)' % ( d.daemon_type, d.daemon_id, d.container_image_name, d.container_image_digests, d.version)) @@ -817,9 +1111,13 @@ class CephadmUpgrade: return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done) + # return True if the upgrade is safe to proceed, False otherwise + # to_upgrade is a list of daemons that need to be upgraded def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]: to_upgrade: List[Tuple[DaemonDescription, bool]] = [] known_ok_to_stop: List[str] = [] + known_ok_to_upgrade: List[str] = [] + self._ok_to_upgrade_all_osds_upgraded = False for d_entry in need_upgrade: d = d_entry[0] assert d.daemon_type is not None @@ -832,17 +1130,65 @@ class CephadmUpgrade: 'daemon %s has unknown container_image_id but has correct image name' % (d.name())) continue - if known_ok_to_stop: - if d.name() in known_ok_to_stop: - logger.info(f'Upgrade: {d.name()} is also safe to restart') + # d_entry[1] True means that the daemon is already on the target image and + # just needs redeployment. Use ok-to-stop for PG safety. + # Without this branch, redeploy-only bucket OSDs never appear in the osd + # ok-to-upgrade batch and are skipped (continue), so they never reach + # to_upgrade and the upgrade can stall while they remain in need_upgrade. + if ( + d.daemon_type == 'osd' + and self._upgrade_uses_ok_to_upgrade_for_osds() + and d_entry[1] + ): + if not self.is_osd_upgrade_valid_for_failure_domain(d): + continue + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade + logger.info(f'Upgrade: {d.name()} is safe to redeploy') + to_upgrade.append(d_entry) + if not known_ok_to_stop: + break + continue + + if known_ok_to_stop or known_ok_to_upgrade: + if ( + (d.name() in known_ok_to_stop or d.name() in known_ok_to_upgrade) + and self.is_osd_upgrade_valid_for_failure_domain(d) + ): + logger.info(f'Upgrade: {d.name()} is safe to restart') to_upgrade.append(d_entry) continue if d.daemon_type == 'osd': - # NOTE: known_ok_to_stop is an output argument for - # _wait_for_ok_to_stop - if not self._wait_for_ok_to_stop(d, known_ok_to_stop): - return False, to_upgrade + if self._upgrade_uses_ok_to_upgrade_for_osds(): + # Refresh ok-to-upgrade batch when we do not have one yet. + if not known_ok_to_upgrade and not self._ok_to_upgrade_all_osds_upgraded: + if not self._wait_for_ok_to_upgrade_osd_batch(known_ok_to_upgrade): + return False, to_upgrade + + if d.name() not in known_ok_to_upgrade: + # Mon ok-to-upgrade state (all upgraded) can disagree with cephadm's + # view of this OSD (still wrong image in need_upgrade); + # Handle with ok-to-stop for PG safety. + if ( + self._ok_to_upgrade_all_osds_upgraded + and not known_ok_to_upgrade + and not d_entry[1] + ): + if not self.is_osd_upgrade_valid_for_failure_domain(d): + continue + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade + # Add to to_upgrade list for redeployment. + to_upgrade.append(d_entry) + if not known_ok_to_stop: + break + continue + else: + # NOTE: known_ok_to_stop is an output argument for + # _wait_for_ok_to_stop + if not self._wait_for_ok_to_stop(d, known_ok_to_stop): + return False, to_upgrade if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop(): if not self._wait_for_ok_to_stop(d, known_ok_to_stop): @@ -860,11 +1206,28 @@ class CephadmUpgrade: and not self._wait_for_ok_to_stop(d, known_ok_to_stop): return False, to_upgrade + if ( + d.daemon_type == 'osd' + and self._upgrade_uses_ok_to_upgrade_for_osds() + and not self.is_osd_upgrade_valid_for_failure_domain(d) + ): + continue + to_upgrade.append(d_entry) - # if we don't have a list of others to consider, stop now + # ok-to-stop did not add peer names to known_ok_to_stop. + # For osd/mds/mon we then stop scanning need_upgrade this pass. + # This helps: + # 1. Limit how many core daemons get queued in a single pass without + # a mon-supplied peer batch in known_ok_to_stop. + # 2. Yield between batches of core daemons to allow the mon to catch up. if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop: + # osd ok-to-upgrade batch is not empty, so keep looping to + # add more OSDs to the batch + if d.daemon_type == 'osd' and self._upgrade_uses_ok_to_upgrade_for_osds() and (len(known_ok_to_upgrade) > 0): + continue # do not break break + return True, to_upgrade def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None: @@ -1091,6 +1454,7 @@ class CephadmUpgrade: self.mgr.remote('progress', 'complete', self.upgrade_state.progress_id) self.upgrade_state = None + self._ok_to_upgrade_osds_in_crush_bucket = None self._save_upgrade_state() def _do_upgrade(self): @@ -1142,7 +1506,9 @@ class CephadmUpgrade: }) return self.upgrade_state.target_id = target_id - # extract the version portion of 'ceph version {version} ({sha1})' + # Short token from ``ceph version …`` inside the target image; this + # is what the monitor expects for ``osd ok-to-upgrade`` ceph_version + # (may differ from CLI --ceph-version, e.g. full build id vs tag). self.upgrade_state.target_version = target_version.split(' ')[2] self.upgrade_state.target_digests = target_digests self._save_upgrade_state() @@ -1210,6 +1576,10 @@ class CephadmUpgrade: logger.debug('Upgrade: Checking %s daemons' % daemon_type) daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type] + # need_upgrade_self: True if the daemon is active mgr itself and needs to be upgraded + # need_upgrade: List of daemons that need to be upgraded + # need_upgrade_deployer: List of daemons that need to be redeployed + # done: Number of daemons that have been upgraded need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade( daemons_of_type, target_digests, target_image) upgraded_daemon_count += done diff --git a/src/pybind/mgr/dashboard/services/orchestrator.py b/src/pybind/mgr/dashboard/services/orchestrator.py index e2495a44444b..51c36cd4db17 100644 --- a/src/pybind/mgr/dashboard/services/orchestrator.py +++ b/src/pybind/mgr/dashboard/services/orchestrator.py @@ -182,9 +182,11 @@ class UpgradeManager(ResourceManager): @wait_api_result def start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None, services: Optional[List[str]] = None, - limit: Optional[int] = None) -> str: - return self.api.upgrade_start(image, version, daemon_types, host_placement, services, - limit) + limit: Optional[int] = None, bucket_type: Optional[str] = None, + bucket_name: Optional[str] = None) -> str: + return self.api.upgrade_start( + image, version, daemon_types, host_placement, services, limit, + bucket_type, bucket_name) @wait_api_result def pause(self) -> str: diff --git a/src/pybind/mgr/orchestrator/_interface.py b/src/pybind/mgr/orchestrator/_interface.py index 136fde595ac0..f0f90966110b 100644 --- a/src/pybind/mgr/orchestrator/_interface.py +++ b/src/pybind/mgr/orchestrator/_interface.py @@ -949,7 +949,8 @@ class Orchestrator(object): raise NotImplementedError() def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]], - hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]: + hosts: Optional[str], services: Optional[List[str]], limit: Optional[int], + bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> OrchResult[str]: raise NotImplementedError() def upgrade_pause(self) -> OrchResult[str]: diff --git a/src/pybind/mgr/orchestrator/module.py b/src/pybind/mgr/orchestrator/module.py index 56d94fe24ab3..3cdd21bbaca4 100644 --- a/src/pybind/mgr/orchestrator/module.py +++ b/src/pybind/mgr/orchestrator/module.py @@ -2586,13 +2586,17 @@ Usage: hosts: Optional[str] = None, services: Optional[str] = None, limit: Optional[int] = None, - ceph_version: Optional[str] = None) -> HandleCommandResult: + ceph_version: Optional[str] = None, + crush_bucket_type: Optional[str] = None, + crush_bucket_name: Optional[str] = None) -> HandleCommandResult: """Initiate upgrade""" self._upgrade_check_image_name(image, ceph_version) + # Split comma-separated lists and trim whitespace so "mon, crash" and "mon,crash" are equivalent. dtypes = [d.strip() for d in daemon_types.split(',')] if daemon_types is not None else None service_names = [s.strip() for s in services.split(',')] if services is not None else None - completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit) + completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit, + crush_bucket_type, crush_bucket_name) raise_if_exception(completion) return HandleCommandResult(stdout=completion.result_str())