]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: ok-to-upgrade review comments for dataclass etc
authorAshwin M. Joshi <ashjosh1@in.ibm.com>
Thu, 23 Apr 2026 11:19:06 +0000 (16:49 +0530)
committerAshwin M. Joshi <ashjosh1@in.ibm.com>
Wed, 29 Apr 2026 06:57:43 +0000 (12:27 +0530)
Fixes: https://tracker.ceph.com/issues/75603
Signed-off-by: Ashwin M. Joshi <ashjosh1@in.ibm.com>
src/pybind/mgr/cephadm/tests/test_upgrade.py
src/pybind/mgr/cephadm/upgrade.py

index 4b3e9159068207ae2016612dec09e3888673e671..dbe37e53df6ab3acd0eddba5a825a2d08c9351aa 100644 (file)
@@ -1,4 +1,5 @@
 import json
+import logging
 from unittest import mock
 
 import pytest
@@ -7,6 +8,7 @@ from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
 from cephadm import CephadmOrchestrator
 from cephadm.upgrade import (
     CephadmUpgrade,
+    OkToUpgradeMonReport,
     UpgradeState,
     parse_ok_to_upgrade_mon_json,
     request_osd_ok_to_upgrade_report,
@@ -294,12 +296,80 @@ def test_parse_ok_to_upgrade_mon_json_nested_and_flat():
     assert d['ok_to_upgrade'] is True
 
 
+def test_ok_to_upgrade_mon_report_from_parsed_body():
+    rep = OkToUpgradeMonReport.from_parsed_body({
+        'ok_to_upgrade': True,
+        'all_osds_upgraded': False,
+        'osds_ok_to_upgrade': [0, 1],
+        'osds_in_crush_bucket': [2, 3],
+        'osds_upgraded': [4],
+        'bad_no_version': [5],
+    })
+    assert rep.ok_to_upgrade is True
+    assert rep.all_osds_upgraded is False
+    assert rep.osds_ok_to_upgrade == [0, 1]
+    assert rep.osds_in_crush_bucket == [2, 3]
+    assert rep.osds_upgraded == [4]
+    assert rep.bad_no_version == [5]
+    assert rep.mon_resp_as_dict()['bad_no_version'] == [5]
+
+
+def test_ok_to_upgrade_mon_report_non_list_osd_array_becomes_empty():
+    rep = OkToUpgradeMonReport.from_parsed_body({
+        'ok_to_upgrade': True,
+        'all_osds_upgraded': False,
+        'osds_ok_to_upgrade': 'not-a-list',
+    })
+    assert rep.osds_ok_to_upgrade == []
+
+
+def test_ok_to_upgrade_mon_report_matches_mgr_json_formatter_shape():
+    """
+    Shape produced by upgrade_osd_report::dump + JSONFormatter (DaemonServer):
+    array sections are bare int lists, not objects per element.
+    """
+    mon_stdout = (
+        '{"ok_to_upgrade":{'
+        '"ok_to_upgrade":true,'
+        '"all_osds_upgraded":false,'
+        '"osds_in_crush_bucket":[10,11],'
+        '"osds_ok_to_upgrade":[10],'
+        '"osds_upgraded":[],'
+        '"bad_no_version":[]'
+        '}}'
+    )
+    body = parse_ok_to_upgrade_mon_json(mon_stdout)
+    rep = OkToUpgradeMonReport.from_parsed_body(body)
+    assert rep.ok_to_upgrade is True
+    assert rep.all_osds_upgraded is False
+    assert rep.osds_in_crush_bucket == [10, 11]
+    assert rep.osds_ok_to_upgrade == [10]
+    assert rep.osds_upgraded == []
+    assert rep.bad_no_version == []
+
+
+def test_ok_to_upgrade_mon_report_from_parsed_body_rejects_non_mapping():
+    with pytest.raises(ValueError, match='expected JSON object'):
+        OkToUpgradeMonReport.from_parsed_body([])
+
+
+def test_ok_to_upgrade_mon_report_warns_on_non_boolean_flags(caplog):
+    caplog.set_level(logging.WARNING)
+    rep = OkToUpgradeMonReport.from_parsed_body({
+        'ok_to_upgrade': 'unexpected-string',
+        'all_osds_upgraded': False,
+    })
+    assert rep.ok_to_upgrade is None
+    assert rep.all_osds_upgraded is False
+    assert any('expected boolean' in r.message for r in caplog.records)
+
+
 def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator):
     cephadm_module.check_mon_command = mock.MagicMock(
         return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', ''))
     rep = request_osd_ok_to_upgrade_report(
         cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3)
-    assert rep['ok_to_upgrade'] is True
+    assert rep.ok_to_upgrade is True
     cephadm_module.check_mon_command.assert_called_once()
     cmd = cephadm_module.check_mon_command.call_args[0][0]
     assert cmd['prefix'] == 'osd ok-to-upgrade'
@@ -308,6 +378,93 @@ def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator):
     assert cmd['max'] == 3
 
 
+def test_parse_ok_to_upgrade_mon_json_invalid_raises():
+    with pytest.raises(json.JSONDecodeError):
+        parse_ok_to_upgrade_mon_json('not-json{')
+
+
+def test_request_osd_ok_to_upgrade_report_invalid_json(cephadm_module: CephadmOrchestrator):
+    cephadm_module.check_mon_command = mock.MagicMock(
+        return_value=(0, 'not-json{', ''))
+    with pytest.raises(json.JSONDecodeError):
+        request_osd_ok_to_upgrade_report(
+            cephadm_module, 'mybucket', '20.1.0', max_osds=3)
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_json_decode_pauses(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'img',
+        'pid',
+        target_version='20.1.0',
+        crush_bucket_type='host',
+        crush_bucket_name='host1',
+    )
+    with mock.patch(
+        'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+        side_effect=json.JSONDecodeError('msg', 'doc', 0),
+    ):
+        ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+    assert ok is False
+    assert cephadm_module.upgrade.upgrade_state.paused is True
+    assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks
+    assert 'invalid JSON' in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary']
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_value_error_pauses(cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'img',
+        'pid',
+        target_version='20.1.0',
+        crush_bucket_type='host',
+        crush_bucket_name='host1',
+    )
+    with mock.patch(
+        'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+        side_effect=ValueError(
+            "osd ok-to-upgrade: expected JSON object after unwrap, got <class 'list'>"),
+    ):
+        ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+    assert ok is False
+    assert cephadm_module.upgrade.upgrade_state.paused is True
+    assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks
+    assert (
+        'unexpected JSON shape'
+        in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary']
+    )
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_bad_no_version_pauses(
+        cephadm_module: CephadmOrchestrator):
+    cephadm_module.upgrade.upgrade_state = UpgradeState(
+        'img',
+        'pid',
+        target_version='20.1.0',
+        crush_bucket_type='host',
+        crush_bucket_name='host1',
+    )
+    bad_rep = OkToUpgradeMonReport(
+        ok_to_upgrade=True,
+        all_osds_upgraded=False,
+        osds_ok_to_upgrade=[],
+        osds_in_crush_bucket=[0, 1],
+        osds_upgraded=[],
+        bad_no_version=[99],
+    )
+    with mock.patch(
+        'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+        return_value=bad_rep,
+    ):
+        ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+    assert ok is False
+    assert cephadm_module.upgrade.upgrade_state.paused is True
+    assert 'UPGRADE_OSD_NO_VERSION' in cephadm_module.health_checks
+    detail = cephadm_module.health_checks['UPGRADE_OSD_NO_VERSION']['detail'][0]
+    assert 'osd.99' in detail
+
+
 def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator):
     cephadm_module.upgrade._validate_failure_domain_upgrade_options(
         'rack', 'rack-a', ['osd'])
index e85da50213ebea67d4d6d3a89d2b0476099a4679..15773447df985fcd1ea6f7bfd39fe1c19f9d6bc6 100644 (file)
@@ -3,7 +3,8 @@ import json
 import logging
 import time
 import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set
+from dataclasses import dataclass, field, asdict
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Mapping, cast, Set
 from cephadm.services.service_registry import service_registry
 
 import orchestrator
@@ -59,11 +60,71 @@ def normalize_image_digest(digest: str, default_registry: str) -> str:
     return digest
 
 
+def _get_boolean_values_from_mon_json(value: Any) -> Optional[bool]:
+    """Handle only JSON booleans for ``ok_to_upgrade`` / ``all_osds_upgraded``."""
+    if isinstance(value, bool):
+        return value
+    if value is not None:
+        logger.warning('osd ok-to-upgrade: expected boolean, got %s: %r', type(value), value)
+    return None
+
+
+def _get_osd_ids_from_mon_json(value: Any) -> List[int]:
+    """
+    OSD id list fields from ``osd ok-to-upgrade`` inner JSON.
+
+    Per-element types are not checked: the monitor's ``upgrade_osd_report::dump``
+    path emits bare integer arrays for these keys, and the mgr treats that as the
+    contract for this command.
+    """
+    if isinstance(value, list):
+        return list(value)
+    return []
+
+
+@dataclass(frozen=True)
+class OkToUpgradeMonReport:
+    """
+    Normalized view of the inner mon JSON for osd ok-to-upgrade after
+    parse_ok_to_upgrade_mon_json unwraps the top-level ok_to_upgrade object.
+
+    Field names match the keys in the inner report object from the mon JSON.
+    (ok_to_upgrade, all_osds_upgraded, osds_in_crush_bucket,
+     osds_ok_to_upgrade, osds_upgraded, bad_no_version).
+    """
+
+    ok_to_upgrade: Optional[bool]
+    all_osds_upgraded: Optional[bool]
+    osds_ok_to_upgrade: List[int] = field(default_factory=list)
+    osds_in_crush_bucket: List[int] = field(default_factory=list)
+    osds_upgraded: List[int] = field(default_factory=list)
+    bad_no_version: List[int] = field(default_factory=list)
+
+    @classmethod
+    def from_parsed_body(cls, body: Any) -> 'OkToUpgradeMonReport':
+        if not isinstance(body, Mapping):
+            raise ValueError(
+                f'osd ok-to-upgrade: expected JSON object after unwrap, got {type(body)!r}')
+        b = dict(body)
+        return cls(
+            ok_to_upgrade=_get_boolean_values_from_mon_json(b.get('ok_to_upgrade')),
+            all_osds_upgraded=_get_boolean_values_from_mon_json(b.get('all_osds_upgraded')),
+            osds_ok_to_upgrade=_get_osd_ids_from_mon_json(b.get('osds_ok_to_upgrade')),
+            osds_in_crush_bucket=_get_osd_ids_from_mon_json(b.get('osds_in_crush_bucket')),
+            osds_upgraded=_get_osd_ids_from_mon_json(b.get('osds_upgraded')),
+            bad_no_version=_get_osd_ids_from_mon_json(b.get('bad_no_version')),
+        )
+
+    def mon_resp_as_dict(self) -> Dict[str, Any]:
+        return asdict(self)
+
+
 def parse_ok_to_upgrade_mon_json(out: str) -> dict:
     """
-    Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the
-    report under a top-level ``ok_to_upgrade`` object.
-    If the report is not nested, return the entire JSON object.
+    Parse mon JSON from ``osd ok-to-upgrade``.
+    osd ok-to-upgrade implementation in DaemonServer.cc dumps the response
+    as a top-level boolean object with the inner report object as the key-values
+
     """
     parsed = json.loads(out)
     nested_ok_report = parsed.get('ok_to_upgrade')
@@ -77,7 +138,7 @@ def request_osd_ok_to_upgrade_report(
     crush_bucket: str,
     ceph_version_short: str,
     max_osds: int,
-) -> dict:
+) -> OkToUpgradeMonReport:
     """
     Send ``osd ok-to-upgrade`` to the monitor.
 
@@ -93,7 +154,8 @@ def request_osd_ok_to_upgrade_report(
         'max': max_osds,
     }
     _return_code, mon_out, _stderr = mgr.check_mon_command(cmd)
-    report = parse_ok_to_upgrade_mon_json(mon_out)
+    body = parse_ok_to_upgrade_mon_json(mon_out)
+    report = OkToUpgradeMonReport.from_parsed_body(body)
     logger.debug(
         'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r '
         'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s '
@@ -101,12 +163,12 @@ def request_osd_ok_to_upgrade_report(
         max_osds,
         crush_bucket,
         ceph_version_short,
-        report.get('ok_to_upgrade'),
-        report.get('all_osds_upgraded'),
-        report.get('osds_ok_to_upgrade'),
-        report.get('osds_in_crush_bucket'),
-        report.get('osds_upgraded'),
-        report.get('bad_no_version'),
+        report.ok_to_upgrade,
+        report.all_osds_upgraded,
+        report.osds_ok_to_upgrade,
+        report.osds_in_crush_bucket,
+        report.osds_upgraded,
+        report.bad_no_version,
     )
     return report
 
@@ -192,6 +254,7 @@ class CephadmUpgrade:
         'UPGRADE_EXCEPTION',
         'UPGRADE_OFFLINE_HOST',
         'UPGRADE_INVALID_CRUSH_BUCKET',
+        'UPGRADE_OSD_NO_VERSION',
     ]
 
     def __init__(self, mgr: "CephadmOrchestrator"):
@@ -724,12 +787,12 @@ class CephadmUpgrade:
 
         return True
 
-    def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None:
-        raw = report.get('osds_in_crush_bucket')
-        if isinstance(raw, list):
-            self._ok_to_upgrade_osds_in_crush_bucket = {
-                f'osd.{osd_id}' for osd_id in raw
-            }
+    def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(
+            self, report: OkToUpgradeMonReport) -> None:
+        ids = report.osds_in_crush_bucket
+        self._ok_to_upgrade_osds_in_crush_bucket = {
+            f'osd.{osd_id}' for osd_id in ids
+        }
 
     def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool:
         # If not using ok-to-upgrade for OSDs, any OSD is valid.
@@ -781,6 +844,25 @@ class CephadmUpgrade:
                     ceph_version_short,
                     max_osds=max_parallel,
                 )
+            except json.JSONDecodeError as err:
+                logger.error('Upgrade: osd ok-to-upgrade JSON parse failed: %s', err)
+                self._fail_upgrade('UPGRADE_EXCEPTION', {
+                    'severity': 'error',
+                    'summary': 'Upgrade: osd ok-to-upgrade returned invalid JSON',
+                    'count': 1,
+                    'detail': [str(err)],
+                })
+                return False
+            except ValueError as err:
+                logger.error(
+                    'Upgrade: osd ok-to-upgrade unexpected response shape: %s', err)
+                self._fail_upgrade('UPGRADE_EXCEPTION', {
+                    'severity': 'error',
+                    'summary': 'Upgrade: osd ok-to-upgrade returned unexpected JSON shape',
+                    'count': 1,
+                    'detail': [str(err)],
+                })
+                return False
             except MonCommandFailed as err:
                 if self.is_mon_error_for_invalid_bucket(err):
                     st = self.upgrade_state
@@ -804,18 +886,30 @@ class CephadmUpgrade:
 
             self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report)
 
+            if report.bad_no_version:
+                osd_names = ', '.join(f'osd.{i}' for i in report.bad_no_version)
+                self._fail_upgrade('UPGRADE_OSD_NO_VERSION', {
+                    'severity': 'error',
+                    'summary': (
+                        'Upgrade: osd ok-to-upgrade reported OSDs without a detectable '
+                        'Ceph version'
+                    ),
+                    'count': len(report.bad_no_version),
+                    'detail': [
+                        f'The monitor cannot compare these OSDs to the target version '
+                        f'({ceph_version_short!r}): {osd_names}. '
+                        'Resolve daemon or version reporting on those OSDs before '
+                        'continuing the upgrade.',
+                    ],
+                })
+                return False
+
             # Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``.
-            # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids.
-            raw_osds = report.get('osds_ok_to_upgrade')
-            if isinstance(raw_osds, list):
-                approved_names = [f'osd.{osd_id}' for osd_id in raw_osds]
-            else:
-                approved_names = []
-            bad_no_version = report.get('bad_no_version') or []
+            approved_names = [f'osd.{osd_id}' for osd_id in report.osds_ok_to_upgrade]
             if (
                 not approved_names
-                and report.get('all_osds_upgraded') is True
-                and not bad_no_version
+                and report.all_osds_upgraded is True
+                and not report.bad_no_version
             ):
                 self._ok_to_upgrade_all_osds_upgraded = True
                 logger.info(
@@ -834,7 +928,7 @@ class CephadmUpgrade:
                     'not yet on the target version). Report: %s',
                     bucket_name,
                     ceph_version_short,
-                    report,
+                    report.mon_resp_as_dict(),
                 )
                 time.sleep(15)
                 remaining_tries -= 1