]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: Bucket scoped OSD upgrades using ok-to-upgrade
authorAshwin M. Joshi <ashjosh1@in.ibm.com>
Tue, 7 Apr 2026 10:05:10 +0000 (15:35 +0530)
committerAshwin M. Joshi <ashjosh1@in.ibm.com>
Wed, 29 Apr 2026 06:57:18 +0000 (12:27 +0530)
Fixes: https://tracker.ceph.com/issues/75603
Signed-off-by: Ashwin M. Joshi <ashjosh1@in.ibm.com>
 Conflicts:
src/pybind/mgr/orchestrator/module.py

doc/cephadm/upgrade.rst
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/tests/test_remote_executables.py
src/pybind/mgr/cephadm/tests/test_upgrade.py
src/pybind/mgr/cephadm/upgrade.py
src/pybind/mgr/dashboard/services/orchestrator.py
src/pybind/mgr/orchestrator/_interface.py
src/pybind/mgr/orchestrator/module.py

index 003e0ea15339d52dbd8d725b81aceaede0cf0da8..78e101d5bf7d49b0bb505939b1ddc376924b5a81 100644 (file)
@@ -100,6 +100,44 @@ For example, to upgrade to v16.2.6, run the following command:
        ceph orch upgrade start --image quay.io/ceph/ceph:v16.2.6
 
 
+CRUSH bucket scoped OSD upgrades (``osd ok-to-upgrade``)
+========================================================
+
+For **OSD-only** upgrades you can limit which failure domain cephadm works
+through and ask the monitor which OSDs under a given CRUSH bucket may safely
+move to the target **Ceph short version** (the same string as
+``ceph_version_short`` in OSD metadata, e.g. ``20.3.0-3803-g63ca1ffb5a2`` or
+``20.1.0-144.el9cp``).
+
+Requirements:
+
+* For OSD-only upgrades, pass both ``--crush_bucket_type`` and ``--crush_bucket_name``.
+  Supported types today are ``host``, ``rack``, and ``chassis``.
+* The monitor's ``osd ok-to-upgrade`` expects the target **short** Ceph version
+  (same shape as ``ceph_version_short`` in ``ceph osd metadata``). Cephadm does
+  **not** fall back to ``osd ok-to-stop`` for bucket-scoped OSD runs. If the mon
+  returns no OSDs (e.g. unknown bucket name), cephadm logs details and retries.
+* If bucket parameters are not provided, cephadm will fall back to ``osd ok-to-stop`` 
+  for OSD upgrades.
+* Bucket scope applies only to OSDs. Other daemon types (mon, mgr, mds) are 
+  upgraded cluster-wide without bucket constraints.
+
+Example:
+
+.. prompt:: bash #
+
+  ceph orch upgrade start --image quay.ceph.io/ceph-ci/ceph:recent-git-branch-name
+    --daemon-types osd \\
+    --crush_bucket_type rack --crush_bucket_name rack-a
+
+For each failure domain batch, cephadm calls ``ceph osd ok-to-upgrade`` with the 
+specified failure domain name, the target short version, and ``max`` set to
+:confval:`mgr/cephadm/max_parallel_osd_upgrades`
+
+Note that it is not recommended to change the CRUSH bucket type or name after
+the upgrade has started as it may cause the upgrade to fail.
+
+
 Monitoring the Upgrade
 ======================
 
index 77630d4d50a9cda5e23d1f7d2167c8ac289861de..aff629fcde1eacdbb653a438e726fa78c6c1b881 100644 (file)
@@ -4162,7 +4162,8 @@ Then run the following:
 
     @handle_orch_error
     def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
-                      services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
+                      services: Optional[List[str]] = None, limit: Optional[int] = None,
+                      bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str:
         if self.inventory.get_host_with_state("maintenance"):
             raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
         if self.offline_hosts:
@@ -4192,12 +4193,17 @@ Then run the following:
                 raise OrchestratorError(
                     f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
 
+        if hosts and (bucket_type is not None or bucket_name is not None):
+            raise OrchestratorError(
+                '--hosts cannot be combined with --crush_bucket_type or --crush_bucket_name')
+
         if limit is not None:
             if limit < 1:
                 raise OrchestratorError(
                     f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
 
-        return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
+        return self.upgrade.upgrade_start(
+            image, version, daemon_types, hosts, services, limit, bucket_type, bucket_name)
 
     @handle_orch_error
     def upgrade_pause(self) -> str:
index 433dc916d129d91b30571de0959517fe12e51067..26b02e9de96bd9522afc93d08015b2bc598022e7 100644 (file)
@@ -104,6 +104,8 @@ def _names(node):
         return [f"<Subscript: {node.value}{node.slice}>"]
     if isinstance(node, ast.BinOp):
         return [f"<BinaryOp: {_names(node.left)} {_names(node.op)} {_names(node.right)}"]
+    if isinstance(node, ast.BoolOp):
+        return [f"<BoolOp: {node.op} {[_names(v) for v in node.values]}>"]
     if (
         isinstance(node, ast.Add)
         or isinstance(node, ast.Sub)
index 3b5c305b5f0f9f83ed93f9804e4b7d0a3c8dfb6d..4b3e9159068207ae2016612dec09e3888673e671 100644 (file)
@@ -5,7 +5,12 @@ import pytest
 
 from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
 from cephadm import CephadmOrchestrator
-from cephadm.upgrade import CephadmUpgrade, UpgradeState
+from cephadm.upgrade import (
+    CephadmUpgrade,
+    UpgradeState,
+    parse_ok_to_upgrade_mon_json,
+    request_osd_ok_to_upgrade_report,
+)
 from cephadm.ssh import HostConnectionError
 from cephadm.utils import ContainerInspectInfo
 from orchestrator import OrchestratorError, DaemonDescription
@@ -36,6 +41,22 @@ def test_upgrade_start(cephadm_module: CephadmOrchestrator):
                             ) == 'Stopped upgrade to image_id'
 
 
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_start_hosts_mutually_exclusive_with_bucket(cephadm_module: CephadmOrchestrator):
+    with with_host(cephadm_module, 'test'):
+        with with_host(cephadm_module, 'test2'):
+            with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=2)), status_running=True):
+                with pytest.raises(OrchestratorError) as err:
+                    cephadm_module.upgrade_start(
+                        'image_id', None,
+                        daemon_types=['osd'],
+                        host_placement='test',
+                        bucket_type='rack',
+                        bucket_name='rack-a',
+                    )
+                assert str(err.value) == '--hosts cannot be combined with --crush_bucket_type or --crush_bucket_name'
+
+
 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
 def test_upgrade_start_offline_hosts(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'test'):
@@ -198,7 +219,168 @@ def test_upgrade_state_null(cephadm_module: CephadmOrchestrator):
     assert CephadmUpgrade(cephadm_module).upgrade_state is None
 
 
+def test_upgrade_state_crush_roundtrip():
+    u = UpgradeState(
+        'target', 'pid', crush_bucket_type='rack', crush_bucket_name='rack1')
+    restored = UpgradeState.from_json(u.to_json())
+    assert restored
+    assert restored.crush_bucket_type == 'rack'
+    assert restored.crush_bucket_name == 'rack1'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_osd_only(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'target', 'pid',
+        target_digests=['digest1'],
+        daemon_types=['osd'],
+        crush_bucket_type='rack',
+        crush_bucket_name='rack1',
+    )
+    with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+        status = wait(cephadm_module, cephadm_module.upgrade_status())
+    assert status.which == 'Upgrading daemons of type(s) osd (OSDs in bucket scope)'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_osd_only_uppercase(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'target', 'pid',
+        target_digests=['digest1'],
+        daemon_types=['OSD'],
+        crush_bucket_type='rack',
+        crush_bucket_name='rack1',
+    )
+    with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+        status = wait(cephadm_module, cephadm_module.upgrade_status())
+    assert status.which == 'Upgrading daemons of type(s) OSD (OSDs in bucket scope)'
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_crush_mixed_daemon_types(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'target', 'pid',
+        target_digests=['digest1'],
+        daemon_types=['mon', 'osd'],
+        crush_bucket_type='rack',
+        crush_bucket_name='rack1',
+    )
+    with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+        status = wait(cephadm_module, cephadm_module.upgrade_status())
+    assert status.which == (
+        'Upgrading daemons of type(s) mon,osd (OSDs in bucket scope)')
+
+
 @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_upgrade_status_which_full_cluster_with_crush_bucket(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'target', 'pid',
+        target_digests=['digest1'],
+        crush_bucket_type='rack',
+        crush_bucket_name='rack1',
+    )
+    with mock.patch.object(cephadm_module.upgrade, '_get_upgrade_info', return_value=('0/0', [])):
+        status = wait(cephadm_module, cephadm_module.upgrade_status())
+    assert status.which == (
+        'Upgrading all daemon types on all hosts (OSDs only in bucket scope)')
+
+
+def test_parse_ok_to_upgrade_mon_json_nested_and_flat():
+    nested = '{"ok_to_upgrade": {"ok_to_upgrade": true, "all_osds_upgraded": false}}'
+    inner = parse_ok_to_upgrade_mon_json(nested)
+    assert inner['ok_to_upgrade'] is True
+    flat = '{"ok_to_upgrade": true}'
+    d = parse_ok_to_upgrade_mon_json(flat)
+    assert d['ok_to_upgrade'] is True
+
+
+def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator):
+    cephadm_module.check_mon_command = mock.MagicMock(
+        return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', ''))
+    rep = request_osd_ok_to_upgrade_report(
+        cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3)
+    assert rep['ok_to_upgrade'] is True
+    cephadm_module.check_mon_command.assert_called_once()
+    cmd = cephadm_module.check_mon_command.call_args[0][0]
+    assert cmd['prefix'] == 'osd ok-to-upgrade'
+    assert cmd['crush_bucket'] == 'mybucket'
+    assert cmd['ceph_version'] == '20.1.0-144.el9cp'
+    assert cmd['max'] == 3
+
+
+def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'rack', 'rack-a', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_chassis_ok(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'chassis', 'c1', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_host_ok(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'host', 'host1', ['osd'])
+
+
+def test_validate_failure_domain_upgrade_options_invalid_type(cephadm_module: CephadmOrchestrator):
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            'root', 'default', ['osd'])
+    assert str(err.value) == (
+        "Supported bucket types for OSD upgrade are: chassis, host, rack (specified: 'root')")
+
+
+def test_validate_failure_domain_upgrade_options_pairing(cephadm_module: CephadmOrchestrator):
+    both_msg = 'Both --crush_bucket_type and --crush_bucket_name must be specified together'
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            'rack', None, ['osd'])
+    assert str(err.value) == both_msg
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            None, 'rack-a', ['osd'])
+    assert str(err.value) == both_msg
+
+
+def test_validate_failure_domain_upgrade_options_comma_in_name(cephadm_module: CephadmOrchestrator):
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            'rack', 'a,b', ['osd'])
+    assert str(err.value) == (
+        'Invalid --crush_bucket_name: use a single name token without commas')
+
+
+def test_validate_failure_domain_upgrade_options_multi_token_name(cephadm_module: CephadmOrchestrator):
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            'rack', 'rack-a rack-b', ['osd'])
+    assert str(err.value) == (
+        'Invalid --crush_bucket_name: use a single name token without commas')
+
+
+def test_validate_failure_domain_upgrade_options_daemon_types(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'rack', 'rack-a', None)
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'rack', 'rack-a', ['osd', 'mds'])
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'rack', 'rack-a', ['mgr', 'mon', 'osd'])
+    osd_msg = 'Bucket parameters for OSD upgrade require --daemon-types to include "osd"'
+    with pytest.raises(OrchestratorError) as err:
+        cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+            'rack', 'rack-a', ['mgr', 'mon'])
+    assert str(err.value) == osd_msg
+
+
+def test_validate_failure_domain_upgrade_options_name_not_consulting_crush_map(
+        cephadm_module: CephadmOrchestrator):
+    """Name existence in the CRUSH map is not validated here."""
+    cephadm_module.upgrade._validate_failure_domain_upgrade_options(
+        'rack', 'not-in-map', ['osd'])
+
+
+@mock.patch('cephadm.serve.CephadmServe._run_cephadm', _run_cephadm('{}'))
 def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator):
     with with_host(cephadm_module, 'host1'):
         with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''):
index 26396f7f93d7e917c9b61355037871c2d76bf4ef..e85da50213ebea67d4d6d3a89d2b0476099a4679 100644 (file)
@@ -1,8 +1,9 @@
+import errno
 import json
 import logging
 import time
 import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set
 from cephadm.services.service_registry import service_registry
 
 import orchestrator
@@ -22,6 +23,10 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# Underlying mon ok-to-upgrade command supports the following
+# OSD upgrade failure domain types relevant for parallelization.
+CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS = frozenset({'rack', 'chassis', 'host'})
+
 # from ceph_fs.h
 CEPH_MDSMAP_ALLOW_STANDBY_REPLAY = (1 << 5)
 CEPH_MDSMAP_NOT_JOINABLE = (1 << 0)
@@ -54,6 +59,58 @@ def normalize_image_digest(digest: str, default_registry: str) -> str:
     return digest
 
 
+def parse_ok_to_upgrade_mon_json(out: str) -> dict:
+    """
+    Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the
+    report under a top-level ``ok_to_upgrade`` object.
+    If the report is not nested, return the entire JSON object.
+    """
+    parsed = json.loads(out)
+    nested_ok_report = parsed.get('ok_to_upgrade')
+    if isinstance(nested_ok_report, dict):
+        return nested_ok_report
+    return parsed
+
+
+def request_osd_ok_to_upgrade_report(
+    mgr: "CephadmOrchestrator",
+    crush_bucket: str,
+    ceph_version_short: str,
+    max_osds: int,
+) -> dict:
+    """
+    Send ``osd ok-to-upgrade`` to the monitor.
+
+    *ceph_version_short* must be the **inspected** short token from the target
+    image's ``ceph version …`` line (same as ``upgrade_state.target_version``
+    after the first pull in ``_do_upgrade``). It is not necessarily the same
+    string as CLI ``--ceph-version`` (e.g. tag ``18.2.1`` vs build suffix).
+    """
+    cmd: Dict[str, Any] = {
+        'prefix': 'osd ok-to-upgrade',
+        'crush_bucket': crush_bucket,
+        'ceph_version': ceph_version_short,
+        'max': max_osds,
+    }
+    _return_code, mon_out, _stderr = mgr.check_mon_command(cmd)
+    report = parse_ok_to_upgrade_mon_json(mon_out)
+    logger.debug(
+        'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r '
+        'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s '
+        'osds_in_crush_bucket=%s osds_upgraded=%s bad_no_version=%s',
+        max_osds,
+        crush_bucket,
+        ceph_version_short,
+        report.get('ok_to_upgrade'),
+        report.get('all_osds_upgraded'),
+        report.get('osds_ok_to_upgrade'),
+        report.get('osds_in_crush_bucket'),
+        report.get('osds_upgraded'),
+        report.get('bad_no_version'),
+    )
+    return report
+
+
 class UpgradeState:
     def __init__(self,
                  target_name: str,
@@ -71,6 +128,8 @@ class UpgradeState:
                  services: Optional[List[str]] = None,
                  total_count: Optional[int] = None,
                  remaining_count: Optional[int] = None,
+                 crush_bucket_type: Optional[str] = None,
+                 crush_bucket_name: Optional[str] = None,
                  ):
         self._target_name: str = target_name  # Use CephadmUpgrade.target_image instead.
         self.progress_id: str = progress_id
@@ -88,6 +147,8 @@ class UpgradeState:
         self.services = services
         self.total_count = total_count
         self.remaining_count = remaining_count
+        self.crush_bucket_type = crush_bucket_type
+        self.crush_bucket_name = crush_bucket_name
 
     def to_json(self) -> dict:
         return {
@@ -106,6 +167,8 @@ class UpgradeState:
             'services': self.services,
             'total_count': self.total_count,
             'remaining_count': self.remaining_count,
+            'crush_bucket_type': self.crush_bucket_type,
+            'crush_bucket_name': self.crush_bucket_name,
         }
 
     @classmethod
@@ -127,7 +190,8 @@ class CephadmUpgrade:
         'UPGRADE_REDEPLOY_DAEMON',
         'UPGRADE_BAD_TARGET_VERSION',
         'UPGRADE_EXCEPTION',
-        'UPGRADE_OFFLINE_HOST'
+        'UPGRADE_OFFLINE_HOST',
+        'UPGRADE_INVALID_CRUSH_BUCKET',
     ]
 
     def __init__(self, mgr: "CephadmOrchestrator"):
@@ -139,6 +203,13 @@ class CephadmUpgrade:
         else:
             self.upgrade_state = None
         self.upgrade_info_str: str = ''
+        # Set during _to_upgrade when last osd ok-to-upgrade call reported all bucket OSDs
+        # on target version (no batch ids); used for logging and to skip repeat mon RPCs.
+        self._ok_to_upgrade_all_osds_upgraded: bool = False
+        # osd.<id> names under the upgrade CRUSH bucket from the last osd ok-to-upgrade
+        # report (``osds_in_crush_bucket``). Used so ok-to-stop ``known`` (cluster-wide)
+        # cannot schedule OSDs outside the bucket.
+        self._ok_to_upgrade_osds_in_crush_bucket: Optional[Set[str]] = None
 
     @property
     def target_image(self) -> str:
@@ -151,6 +222,11 @@ class CephadmUpgrade:
         # FIXME: we assume the first digest is the best one to use
         return self.upgrade_state.target_digests[0]
 
+    def _upgrade_status_osd_bucket_scope_active(self) -> bool:
+        """True when upgrade state selects OSD bucket scope"""
+        st = self.upgrade_state
+        return bool(st and st.crush_bucket_name)
+
     def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
         r = orchestrator.UpgradeStatusSpec()
         if self.upgrade_state:
@@ -159,8 +235,13 @@ class CephadmUpgrade:
             r.progress, r.services_complete = self._get_upgrade_info()
             r.is_paused = self.upgrade_state.paused
 
+            osd_bucket_scope = self._upgrade_status_osd_bucket_scope_active()
             if self.upgrade_state.daemon_types is not None:
-                which_str = f'Upgrading daemons of type(s) {",".join(self.upgrade_state.daemon_types)}'
+                daemon_types = self.upgrade_state.daemon_types
+                which_str = f'Upgrading daemons of type(s) {",".join(daemon_types)}'
+                types_norm = [(dt or '').strip().lower() for dt in daemon_types]
+                if osd_bucket_scope and 'osd' in types_norm:
+                    which_str += ' (OSDs in bucket scope)'
                 if self.upgrade_state.hosts is not None:
                     which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
             elif self.upgrade_state.services is not None:
@@ -169,6 +250,8 @@ class CephadmUpgrade:
                     which_str += f' on host(s) {",".join(self.upgrade_state.hosts)}'
             elif self.upgrade_state.hosts is not None:
                 which_str = f'Upgrading all daemons on host(s) {",".join(self.upgrade_state.hosts)}'
+            elif osd_bucket_scope:
+                which_str = 'Upgrading all daemon types on all hosts (OSDs only in bucket scope)'
             else:
                 which_str = 'Upgrading all daemon types on all hosts'
             if self.upgrade_state.total_count is not None and self.upgrade_state.remaining_count is not None:
@@ -310,8 +393,58 @@ class CephadmUpgrade:
             r["tags"] = sorted(ls)
         return r
 
+    def _validate_failure_domain_upgrade_options(
+        self,
+        crush_bucket_type: Optional[str],
+        crush_bucket_name: Optional[str],
+        daemon_types: Optional[List[str]],
+    ) -> None:
+        # Validates syntax only. Bucket existence validated by monitor's
+        # osd ok-to-upgrade to avoid duplicate validation across layers.
+        bucket_type = (crush_bucket_type or '').strip().lower()
+        bucket_name = (crush_bucket_name or '').strip()
+
+        both_crush_args = bool(bucket_type) and bool(bucket_name)
+        if not both_crush_args:
+            raise OrchestratorError(
+                'Both --crush_bucket_type and --crush_bucket_name must be specified together')
+
+        if bucket_type not in CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS:
+            allowed = ', '.join(sorted(CEPH_ORCH_VALID_OSD_UPGRADE_CRUSH_BUCKETS))
+            raise OrchestratorError(
+                f'Supported bucket types for OSD upgrade are: {allowed} (specified: {crush_bucket_type!r})')
+
+        osd_in_upgrade_scope = (
+            daemon_types is None
+            or any(
+                (dt or '').strip().lower() == 'osd'
+                for dt in daemon_types
+            )
+        )
+        if not osd_in_upgrade_scope:
+            raise OrchestratorError(
+                'Bucket parameters for OSD upgrade require --daemon-types to include "osd"')
+
+        single_bucket_name = (
+            ',' not in bucket_name
+            and len(bucket_name.split()) == 1
+            and bool(bucket_name)
+        )
+        if not single_bucket_name:
+            raise OrchestratorError(
+                'Invalid --crush_bucket_name: use a single name token without commas')
+
+    @staticmethod
+    def is_mon_error_for_invalid_bucket(err: MonCommandFailed) -> bool:
+        """
+        CRUSH bucket errors from ``osd ok-to-upgrade`` mon command.
+        Typically ENOENT: unknown bucket name or no OSDs under that bucket.
+        """
+        return f'retval: {-errno.ENOENT}' in str(err)
+
     def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
-                      hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
+                      hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None,
+                      bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str:
         fail_fs_value = cast(bool, self.mgr.get_module_option_ex(
             'orchestrator', 'fail_fs', False))
         if self.mgr.mode != 'root':
@@ -327,6 +460,13 @@ class CephadmUpgrade:
         else:
             raise OrchestratorError('must specify either image or version')
 
+        # Validate the failure domain upgrade options
+        # if the user has provided either or both of the
+        # --crush_bucket_type and --crush_bucket_name arguments.
+        if bucket_type is not None or bucket_name is not None:
+            self._validate_failure_domain_upgrade_options(
+                bucket_type, bucket_name, daemon_types)
+
         if daemon_types is not None or services is not None or hosts is not None:
             self._validate_upgrade_filters(target_name, daemon_types, hosts, services)
 
@@ -357,6 +497,8 @@ class CephadmUpgrade:
             services=services,
             total_count=limit,
             remaining_count=limit,
+            crush_bucket_type=bucket_type,
+            crush_bucket_name=bucket_name,
         )
         self._update_upgrade_progress(0.0)
         self._save_upgrade_state()
@@ -490,6 +632,7 @@ class CephadmUpgrade:
         target_image = self.target_image
         self.mgr.log.info('Upgrade: Stopped')
         self.upgrade_state = None
+        self._ok_to_upgrade_osds_in_crush_bucket = None
         self._save_upgrade_state()
         self._clear_upgrade_health_checks()
         self.mgr.event.set()
@@ -563,6 +706,154 @@ class CephadmUpgrade:
             tries -= 1
         return False
 
+    def _upgrade_uses_ok_to_upgrade_for_osds(self) -> bool:
+        """
+        When ``upgrade_start`` persisted CRUSH bucket scope (bucket type/name
+        and upgrade includes OSDs, validated there), OSDs that still need the target
+        image are batched via ``osd ok-to-upgrade``. If the mon reports
+        ``all_osds_upgraded`` but mgr still has ``(daemon, redeploy_only=False)``
+        rows (same Ceph version string, different container digest/name),
+        ``_to_upgrade`` falls back to ``_wait_for_ok_to_stop`` per daemon like the
+        non-bucket path. Redeploy-only rows are also handled with ok-to-stop.
+        """
+        if not self.upgrade_state:
+            return False
+        state = self.upgrade_state
+        if not state.crush_bucket_name or not state.crush_bucket_type:
+            return False
+
+        return True
+
+    def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None:
+        raw = report.get('osds_in_crush_bucket')
+        if isinstance(raw, list):
+            self._ok_to_upgrade_osds_in_crush_bucket = {
+                f'osd.{osd_id}' for osd_id in raw
+            }
+
+    def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool:
+        # If not using ok-to-upgrade for OSDs, any OSD is valid.
+        if not self._upgrade_uses_ok_to_upgrade_for_osds():
+            return True
+
+        if d.daemon_type != 'osd':
+            return True
+        # If not in the CRUSH bucket, it is not valid.
+        bset = self._ok_to_upgrade_osds_in_crush_bucket
+        if not bset:
+            return False
+        return d.name() in bset
+
+    def _wait_for_ok_to_upgrade_osd_batch(
+            self,
+            known_ok_to_upgrade: List[str],
+    ) -> bool:
+        """
+        Query ``osd ok-to-upgrade`` once and append approved daemon names
+        (``osd.<id>``) to *known_ok_to_upgrade*.
+
+        *max* is ``mgr.max_parallel_osd_upgrades``, matching ``osd ok-to-stop``.
+
+        The monitor's ``ceph_version`` argument is *upgrade_state.target_version*:
+        the short token taken from the target image after inspect in ``_do_upgrade``
+        (not the raw CLI ``--ceph-version`` string).
+        """
+        assert self.upgrade_state is not None
+        bucket_name = self.upgrade_state.crush_bucket_name
+        ceph_version_short = self.upgrade_state.target_version
+        if not bucket_name or not ceph_version_short:
+            logger.error(
+                'Upgrade: CRUSH bucket OSD upgrade missing bucket name or '
+                'target_version (inspected short from image); cannot call '
+                'osd ok-to-upgrade (no ok-to-stop fallback for this mode)')
+            return False
+
+        max_parallel = self.mgr.max_parallel_osd_upgrades
+        remaining_tries = 4
+        while remaining_tries > 0:
+            if not self.upgrade_state or self.upgrade_state.paused:
+                return False
+
+            try:
+                report = request_osd_ok_to_upgrade_report(
+                    self.mgr,
+                    bucket_name,
+                    ceph_version_short,
+                    max_osds=max_parallel,
+                )
+            except MonCommandFailed as err:
+                if self.is_mon_error_for_invalid_bucket(err):
+                    st = self.upgrade_state
+                    btype = st.crush_bucket_type if st else None
+                    logger.error('Upgrade: osd ok-to-upgrade failed (CRUSH bucket): %s', err)
+                    self._fail_upgrade('UPGRADE_INVALID_CRUSH_BUCKET', {
+                        'severity': 'error',
+                        'summary': 'Upgrade: invalid failure domain for OSD upgrade',
+                        'count': 1,
+                        'detail': [
+                            str(err),
+                            f'Invalid failure domain for OSD upgrade: --crush_bucket_type={btype!r} '
+                            f'--crush_bucket_name={bucket_name!r}',
+                        ],
+                    })
+                    return False
+                logger.info('Upgrade: osd ok-to-upgrade not ready: %s', err)
+                time.sleep(15)
+                remaining_tries -= 1
+                continue
+
+            self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report)
+
+            # Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``.
+            # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids.
+            raw_osds = report.get('osds_ok_to_upgrade')
+            if isinstance(raw_osds, list):
+                approved_names = [f'osd.{osd_id}' for osd_id in raw_osds]
+            else:
+                approved_names = []
+            bad_no_version = report.get('bad_no_version') or []
+            if (
+                not approved_names
+                and report.get('all_osds_upgraded') is True
+                and not bad_no_version
+            ):
+                self._ok_to_upgrade_all_osds_upgraded = True
+                logger.info(
+                    'Upgrade: osd ok-to-upgrade reports all OSDs under crush_bucket=%r '
+                    'on ceph_version=%r',
+                    bucket_name,
+                    ceph_version_short,
+                )
+                return True
+
+            if not approved_names:
+                logger.info(
+                    'Upgrade: osd ok-to-upgrade returned no OSDs for '
+                    'crush_bucket=%r ceph_version=%r (wrong or missing CRUSH '
+                    'bucket name is a common cause; also check bucket has OSDs '
+                    'not yet on the target version). Report: %s',
+                    bucket_name,
+                    ceph_version_short,
+                    report,
+                )
+                time.sleep(15)
+                remaining_tries -= 1
+                continue
+
+            # gets reset for each batch of OSDs
+            self._ok_to_upgrade_all_osds_upgraded = False
+
+            known_ok_to_upgrade.extend(approved_names)
+            logger.info(
+                'Upgrade: osd ok-to-upgrade bucket=%r max=%s approved %s',
+                bucket_name,
+                max_parallel,
+                approved_names,
+            )
+            return True
+
+        return False
+
     def _clear_upgrade_health_checks(self) -> None:
         for k in self.UPGRADE_ERRORS:
             if k in self.mgr.health_checks:
@@ -806,10 +1097,13 @@ class CephadmUpgrade:
                 continue
 
             if correct_image:
+                # Matches target (digests or image name per use_repo_digest) but not
+                # deployed_by target digests; redeploy only.
                 logger.debug('daemon %s.%s not deployed by correct version' % (
                     d.daemon_type, d.daemon_id))
                 need_upgrade_deployer.append((d, True))
             else:
+                # Does not match target digests or target image name
                 logger.debug('daemon %s.%s not correct (%s, %s, %s)' % (
                     d.daemon_type, d.daemon_id,
                     d.container_image_name, d.container_image_digests, d.version))
@@ -817,9 +1111,13 @@ class CephadmUpgrade:
 
         return (need_upgrade_self, need_upgrade, need_upgrade_deployer, done)
 
+    # return True if the upgrade is safe to proceed, False otherwise
+    # to_upgrade is a list of daemons that need to be upgraded
     def _to_upgrade(self, need_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str) -> Tuple[bool, List[Tuple[DaemonDescription, bool]]]:
         to_upgrade: List[Tuple[DaemonDescription, bool]] = []
         known_ok_to_stop: List[str] = []
+        known_ok_to_upgrade: List[str] = []
+        self._ok_to_upgrade_all_osds_upgraded = False
         for d_entry in need_upgrade:
             d = d_entry[0]
             assert d.daemon_type is not None
@@ -832,17 +1130,65 @@ class CephadmUpgrade:
                         'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
                     continue
 
-            if known_ok_to_stop:
-                if d.name() in known_ok_to_stop:
-                    logger.info(f'Upgrade: {d.name()} is also safe to restart')
+            # d_entry[1] True means that the daemon is already on the target image and
+            # just needs redeployment. Use ok-to-stop for PG safety.
+            # Without this branch, redeploy-only bucket OSDs never appear in the osd
+            # ok-to-upgrade batch and are skipped (continue), so they never reach
+            # to_upgrade and the upgrade can stall while they remain in need_upgrade.
+            if (
+                d.daemon_type == 'osd'
+                and self._upgrade_uses_ok_to_upgrade_for_osds()
+                and d_entry[1]
+            ):
+                if not self.is_osd_upgrade_valid_for_failure_domain(d):
+                    continue
+                if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                    return False, to_upgrade
+                logger.info(f'Upgrade: {d.name()} is safe to redeploy')
+                to_upgrade.append(d_entry)
+                if not known_ok_to_stop:
+                    break
+                continue
+
+            if known_ok_to_stop or known_ok_to_upgrade:
+                if (
+                    (d.name() in known_ok_to_stop or d.name() in known_ok_to_upgrade)
+                    and self.is_osd_upgrade_valid_for_failure_domain(d)
+                ):
+                    logger.info(f'Upgrade: {d.name()} is safe to restart')
                     to_upgrade.append(d_entry)
                 continue
 
             if d.daemon_type == 'osd':
-                # NOTE: known_ok_to_stop is an output argument for
-                # _wait_for_ok_to_stop
-                if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
-                    return False, to_upgrade
+                if self._upgrade_uses_ok_to_upgrade_for_osds():
+                    # Refresh ok-to-upgrade batch when we do not have one yet.
+                    if not known_ok_to_upgrade and not self._ok_to_upgrade_all_osds_upgraded:
+                        if not self._wait_for_ok_to_upgrade_osd_batch(known_ok_to_upgrade):
+                            return False, to_upgrade
+
+                    if d.name() not in known_ok_to_upgrade:
+                        # Mon ok-to-upgrade state (all upgraded) can disagree with cephadm's
+                        # view of this OSD (still wrong image in need_upgrade);
+                        # Handle with ok-to-stop for PG safety.
+                        if (
+                            self._ok_to_upgrade_all_osds_upgraded
+                            and not known_ok_to_upgrade
+                            and not d_entry[1]
+                        ):
+                            if not self.is_osd_upgrade_valid_for_failure_domain(d):
+                                continue
+                            if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                                return False, to_upgrade
+                            # Add to to_upgrade list for redeployment.
+                            to_upgrade.append(d_entry)
+                            if not known_ok_to_stop:
+                                break
+                        continue
+                else:
+                    # NOTE: known_ok_to_stop is an output argument for
+                    # _wait_for_ok_to_stop
+                    if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
+                        return False, to_upgrade
 
             if d.daemon_type == 'mon' and self._enough_mons_for_ok_to_stop():
                 if not self._wait_for_ok_to_stop(d, known_ok_to_stop):
@@ -860,11 +1206,28 @@ class CephadmUpgrade:
                         and not self._wait_for_ok_to_stop(d, known_ok_to_stop):
                     return False, to_upgrade
 
+            if (
+                d.daemon_type == 'osd'
+                and self._upgrade_uses_ok_to_upgrade_for_osds()
+                and not self.is_osd_upgrade_valid_for_failure_domain(d)
+            ):
+                continue
+
             to_upgrade.append(d_entry)
 
-            # if we don't have a list of others to consider, stop now
+            # ok-to-stop did not add peer names to known_ok_to_stop.
+            # For osd/mds/mon we then stop scanning need_upgrade this pass.
+            # This helps:
+            # 1. Limit how many core daemons get queued in a single pass without
+            #    a mon-supplied peer batch in known_ok_to_stop.
+            # 2. Yield between batches of core daemons to allow the mon to catch up.
             if d.daemon_type in ['osd', 'mds', 'mon'] and not known_ok_to_stop:
+                # osd ok-to-upgrade batch is not empty, so keep looping to
+                # add more OSDs to the batch
+                if d.daemon_type == 'osd' and self._upgrade_uses_ok_to_upgrade_for_osds() and (len(known_ok_to_upgrade) > 0):
+                    continue  # do not break
                 break
+
         return True, to_upgrade
 
     def _upgrade_daemons(self, to_upgrade: List[Tuple[DaemonDescription, bool]], target_image: str, target_digests: Optional[List[str]] = None) -> None:
@@ -1091,6 +1454,7 @@ class CephadmUpgrade:
             self.mgr.remote('progress', 'complete',
                             self.upgrade_state.progress_id)
         self.upgrade_state = None
+        self._ok_to_upgrade_osds_in_crush_bucket = None
         self._save_upgrade_state()
 
     def _do_upgrade(self):
@@ -1142,7 +1506,9 @@ class CephadmUpgrade:
                 })
                 return
             self.upgrade_state.target_id = target_id
-            # extract the version portion of 'ceph version {version} ({sha1})'
+            # Short token from ``ceph version …`` inside the target image; this
+            # is what the monitor expects for ``osd ok-to-upgrade`` ceph_version
+            # (may differ from CLI --ceph-version, e.g. full build id vs tag).
             self.upgrade_state.target_version = target_version.split(' ')[2]
             self.upgrade_state.target_digests = target_digests
             self._save_upgrade_state()
@@ -1210,6 +1576,10 @@ class CephadmUpgrade:
             logger.debug('Upgrade: Checking %s daemons' % daemon_type)
             daemons_of_type = [d for d in daemons if d.daemon_type == daemon_type]
 
+            # need_upgrade_self: True if the daemon is active mgr itself and needs to be upgraded
+            # need_upgrade: List of daemons that need to be upgraded
+            # need_upgrade_deployer: List of daemons that need to be redeployed
+            # done: Number of daemons that have been upgraded
             need_upgrade_self, need_upgrade, need_upgrade_deployer, done = self._detect_need_upgrade(
                 daemons_of_type, target_digests, target_image)
             upgraded_daemon_count += done
index e2495a44444bee45e0f6595a937cbd40207baf63..51c36cd4db178ef6518dae1876b5867e35500104 100644 (file)
@@ -182,9 +182,11 @@ class UpgradeManager(ResourceManager):
     @wait_api_result
     def start(self, image: str, version: str, daemon_types: Optional[List[str]] = None,
               host_placement: Optional[str] = None, services: Optional[List[str]] = None,
-              limit: Optional[int] = None) -> str:
-        return self.api.upgrade_start(image, version, daemon_types, host_placement, services,
-                                      limit)
+              limit: Optional[int] = None, bucket_type: Optional[str] = None,
+              bucket_name: Optional[str] = None) -> str:
+        return self.api.upgrade_start(
+            image, version, daemon_types, host_placement, services, limit,
+            bucket_type, bucket_name)
 
     @wait_api_result
     def pause(self) -> str:
index 136fde595ac05abde23e2a7785510544f07577e6..f0f90966110bbe5c1129f3a91dd6be624841d28b 100644 (file)
@@ -949,7 +949,8 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
-                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int],
+                      bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> OrchResult[str]:
         raise NotImplementedError()
 
     def upgrade_pause(self) -> OrchResult[str]:
index 56d94fe24ab3d7bc763453171036e76826fd36d9..3cdd21bbaca4587245f38b9c059409068f4fec29 100644 (file)
@@ -2586,13 +2586,17 @@ Usage:
                        hosts: Optional[str] = None,
                        services: Optional[str] = None,
                        limit: Optional[int] = None,
-                       ceph_version: Optional[str] = None) -> HandleCommandResult:
+                       ceph_version: Optional[str] = None,
+                       crush_bucket_type: Optional[str] = None,
+                       crush_bucket_name: Optional[str] = None) -> HandleCommandResult:
         """Initiate upgrade"""
         self._upgrade_check_image_name(image, ceph_version)
+
         # Split comma-separated lists and trim whitespace so "mon, crash" and "mon,crash" are equivalent.
         dtypes = [d.strip() for d in daemon_types.split(',')] if daemon_types is not None else None
         service_names = [s.strip() for s in services.split(',')] if services is not None else None
-        completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit)
+        completion = self.upgrade_start(image, ceph_version, dtypes, hosts, service_names, limit,
+                                        crush_bucket_type, crush_bucket_name)
         raise_if_exception(completion)
         return HandleCommandResult(stdout=completion.result_str())