From: Ashwin M. Joshi Date: Thu, 23 Apr 2026 11:19:06 +0000 (+0530) Subject: mgr: ok-to-upgrade review comments for dataclass etc X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=97dd1d9a31aa715f19bc3b44d16e18988faf4b4b;p=ceph.git mgr: ok-to-upgrade review comments for dataclass etc Fixes: https://tracker.ceph.com/issues/75603 Signed-off-by: Ashwin M. Joshi --- diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py index 4b3e91590682..dbe37e53df6a 100644 --- a/src/pybind/mgr/cephadm/tests/test_upgrade.py +++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py @@ -1,4 +1,5 @@ import json +import logging from unittest import mock import pytest @@ -7,6 +8,7 @@ from ceph.deployment.service_spec import PlacementSpec, ServiceSpec from cephadm import CephadmOrchestrator from cephadm.upgrade import ( CephadmUpgrade, + OkToUpgradeMonReport, UpgradeState, parse_ok_to_upgrade_mon_json, request_osd_ok_to_upgrade_report, @@ -294,12 +296,80 @@ def test_parse_ok_to_upgrade_mon_json_nested_and_flat(): assert d['ok_to_upgrade'] is True +def test_ok_to_upgrade_mon_report_from_parsed_body(): + rep = OkToUpgradeMonReport.from_parsed_body({ + 'ok_to_upgrade': True, + 'all_osds_upgraded': False, + 'osds_ok_to_upgrade': [0, 1], + 'osds_in_crush_bucket': [2, 3], + 'osds_upgraded': [4], + 'bad_no_version': [5], + }) + assert rep.ok_to_upgrade is True + assert rep.all_osds_upgraded is False + assert rep.osds_ok_to_upgrade == [0, 1] + assert rep.osds_in_crush_bucket == [2, 3] + assert rep.osds_upgraded == [4] + assert rep.bad_no_version == [5] + assert rep.mon_resp_as_dict()['bad_no_version'] == [5] + + +def test_ok_to_upgrade_mon_report_non_list_osd_array_becomes_empty(): + rep = OkToUpgradeMonReport.from_parsed_body({ + 'ok_to_upgrade': True, + 'all_osds_upgraded': False, + 'osds_ok_to_upgrade': 'not-a-list', + }) + assert rep.osds_ok_to_upgrade == [] + + +def test_ok_to_upgrade_mon_report_matches_mgr_json_formatter_shape(): + """ + Shape produced by upgrade_osd_report::dump + JSONFormatter (DaemonServer): + array sections are bare int lists, not objects per element. + """ + mon_stdout = ( + '{"ok_to_upgrade":{' + '"ok_to_upgrade":true,' + '"all_osds_upgraded":false,' + '"osds_in_crush_bucket":[10,11],' + '"osds_ok_to_upgrade":[10],' + '"osds_upgraded":[],' + '"bad_no_version":[]' + '}}' + ) + body = parse_ok_to_upgrade_mon_json(mon_stdout) + rep = OkToUpgradeMonReport.from_parsed_body(body) + assert rep.ok_to_upgrade is True + assert rep.all_osds_upgraded is False + assert rep.osds_in_crush_bucket == [10, 11] + assert rep.osds_ok_to_upgrade == [10] + assert rep.osds_upgraded == [] + assert rep.bad_no_version == [] + + +def test_ok_to_upgrade_mon_report_from_parsed_body_rejects_non_mapping(): + with pytest.raises(ValueError, match='expected JSON object'): + OkToUpgradeMonReport.from_parsed_body([]) + + +def test_ok_to_upgrade_mon_report_warns_on_non_boolean_flags(caplog): + caplog.set_level(logging.WARNING) + rep = OkToUpgradeMonReport.from_parsed_body({ + 'ok_to_upgrade': 'unexpected-string', + 'all_osds_upgraded': False, + }) + assert rep.ok_to_upgrade is None + assert rep.all_osds_upgraded is False + assert any('expected boolean' in r.message for r in caplog.records) + + def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator): cephadm_module.check_mon_command = mock.MagicMock( return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', '')) rep = request_osd_ok_to_upgrade_report( cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3) - assert rep['ok_to_upgrade'] is True + assert rep.ok_to_upgrade is True cephadm_module.check_mon_command.assert_called_once() cmd = cephadm_module.check_mon_command.call_args[0][0] assert cmd['prefix'] == 'osd ok-to-upgrade' @@ -308,6 +378,93 @@ def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator): assert cmd['max'] == 3 +def test_parse_ok_to_upgrade_mon_json_invalid_raises(): + with pytest.raises(json.JSONDecodeError): + parse_ok_to_upgrade_mon_json('not-json{') + + +def test_request_osd_ok_to_upgrade_report_invalid_json(cephadm_module: CephadmOrchestrator): + cephadm_module.check_mon_command = mock.MagicMock( + return_value=(0, 'not-json{', '')) + with pytest.raises(json.JSONDecodeError): + request_osd_ok_to_upgrade_report( + cephadm_module, 'mybucket', '20.1.0', max_osds=3) + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_wait_for_ok_to_upgrade_osd_batch_json_decode_pauses(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'img', + 'pid', + target_version='20.1.0', + crush_bucket_type='host', + crush_bucket_name='host1', + ) + with mock.patch( + 'cephadm.upgrade.request_osd_ok_to_upgrade_report', + side_effect=json.JSONDecodeError('msg', 'doc', 0), + ): + ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([]) + assert ok is False + assert cephadm_module.upgrade.upgrade_state.paused is True + assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks + assert 'invalid JSON' in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary'] + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_wait_for_ok_to_upgrade_osd_batch_value_error_pauses(cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'img', + 'pid', + target_version='20.1.0', + crush_bucket_type='host', + crush_bucket_name='host1', + ) + with mock.patch( + 'cephadm.upgrade.request_osd_ok_to_upgrade_report', + side_effect=ValueError( + "osd ok-to-upgrade: expected JSON object after unwrap, got "), + ): + ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([]) + assert ok is False + assert cephadm_module.upgrade.upgrade_state.paused is True + assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks + assert ( + 'unexpected JSON shape' + in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary'] + ) + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +def test_wait_for_ok_to_upgrade_osd_batch_bad_no_version_pauses( + cephadm_module: CephadmOrchestrator): + cephadm_module.upgrade.upgrade_state = UpgradeState( + 'img', + 'pid', + target_version='20.1.0', + crush_bucket_type='host', + crush_bucket_name='host1', + ) + bad_rep = OkToUpgradeMonReport( + ok_to_upgrade=True, + all_osds_upgraded=False, + osds_ok_to_upgrade=[], + osds_in_crush_bucket=[0, 1], + osds_upgraded=[], + bad_no_version=[99], + ) + with mock.patch( + 'cephadm.upgrade.request_osd_ok_to_upgrade_report', + return_value=bad_rep, + ): + ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([]) + assert ok is False + assert cephadm_module.upgrade.upgrade_state.paused is True + assert 'UPGRADE_OSD_NO_VERSION' in cephadm_module.health_checks + detail = cephadm_module.health_checks['UPGRADE_OSD_NO_VERSION']['detail'][0] + assert 'osd.99' in detail + + def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator): cephadm_module.upgrade._validate_failure_domain_upgrade_options( 'rack', 'rack-a', ['osd']) diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index e85da50213eb..15773447df98 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -3,7 +3,8 @@ import json import logging import time import uuid -from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set +from dataclasses import dataclass, field, asdict +from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Mapping, cast, Set from cephadm.services.service_registry import service_registry import orchestrator @@ -59,11 +60,71 @@ def normalize_image_digest(digest: str, default_registry: str) -> str: return digest +def _get_boolean_values_from_mon_json(value: Any) -> Optional[bool]: + """Handle only JSON booleans for ``ok_to_upgrade`` / ``all_osds_upgraded``.""" + if isinstance(value, bool): + return value + if value is not None: + logger.warning('osd ok-to-upgrade: expected boolean, got %s: %r', type(value), value) + return None + + +def _get_osd_ids_from_mon_json(value: Any) -> List[int]: + """ + OSD id list fields from ``osd ok-to-upgrade`` inner JSON. + + Per-element types are not checked: the monitor's ``upgrade_osd_report::dump`` + path emits bare integer arrays for these keys, and the mgr treats that as the + contract for this command. + """ + if isinstance(value, list): + return list(value) + return [] + + +@dataclass(frozen=True) +class OkToUpgradeMonReport: + """ + Normalized view of the inner mon JSON for osd ok-to-upgrade after + parse_ok_to_upgrade_mon_json unwraps the top-level ok_to_upgrade object. + + Field names match the keys in the inner report object from the mon JSON. + (ok_to_upgrade, all_osds_upgraded, osds_in_crush_bucket, + osds_ok_to_upgrade, osds_upgraded, bad_no_version). + """ + + ok_to_upgrade: Optional[bool] + all_osds_upgraded: Optional[bool] + osds_ok_to_upgrade: List[int] = field(default_factory=list) + osds_in_crush_bucket: List[int] = field(default_factory=list) + osds_upgraded: List[int] = field(default_factory=list) + bad_no_version: List[int] = field(default_factory=list) + + @classmethod + def from_parsed_body(cls, body: Any) -> 'OkToUpgradeMonReport': + if not isinstance(body, Mapping): + raise ValueError( + f'osd ok-to-upgrade: expected JSON object after unwrap, got {type(body)!r}') + b = dict(body) + return cls( + ok_to_upgrade=_get_boolean_values_from_mon_json(b.get('ok_to_upgrade')), + all_osds_upgraded=_get_boolean_values_from_mon_json(b.get('all_osds_upgraded')), + osds_ok_to_upgrade=_get_osd_ids_from_mon_json(b.get('osds_ok_to_upgrade')), + osds_in_crush_bucket=_get_osd_ids_from_mon_json(b.get('osds_in_crush_bucket')), + osds_upgraded=_get_osd_ids_from_mon_json(b.get('osds_upgraded')), + bad_no_version=_get_osd_ids_from_mon_json(b.get('bad_no_version')), + ) + + def mon_resp_as_dict(self) -> Dict[str, Any]: + return asdict(self) + + def parse_ok_to_upgrade_mon_json(out: str) -> dict: """ - Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the - report under a top-level ``ok_to_upgrade`` object. - If the report is not nested, return the entire JSON object. + Parse mon JSON from ``osd ok-to-upgrade``. + osd ok-to-upgrade implementation in DaemonServer.cc dumps the response + as a top-level boolean object with the inner report object as the key-values + """ parsed = json.loads(out) nested_ok_report = parsed.get('ok_to_upgrade') @@ -77,7 +138,7 @@ def request_osd_ok_to_upgrade_report( crush_bucket: str, ceph_version_short: str, max_osds: int, -) -> dict: +) -> OkToUpgradeMonReport: """ Send ``osd ok-to-upgrade`` to the monitor. @@ -93,7 +154,8 @@ def request_osd_ok_to_upgrade_report( 'max': max_osds, } _return_code, mon_out, _stderr = mgr.check_mon_command(cmd) - report = parse_ok_to_upgrade_mon_json(mon_out) + body = parse_ok_to_upgrade_mon_json(mon_out) + report = OkToUpgradeMonReport.from_parsed_body(body) logger.debug( 'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r ' 'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s ' @@ -101,12 +163,12 @@ def request_osd_ok_to_upgrade_report( max_osds, crush_bucket, ceph_version_short, - report.get('ok_to_upgrade'), - report.get('all_osds_upgraded'), - report.get('osds_ok_to_upgrade'), - report.get('osds_in_crush_bucket'), - report.get('osds_upgraded'), - report.get('bad_no_version'), + report.ok_to_upgrade, + report.all_osds_upgraded, + report.osds_ok_to_upgrade, + report.osds_in_crush_bucket, + report.osds_upgraded, + report.bad_no_version, ) return report @@ -192,6 +254,7 @@ class CephadmUpgrade: 'UPGRADE_EXCEPTION', 'UPGRADE_OFFLINE_HOST', 'UPGRADE_INVALID_CRUSH_BUCKET', + 'UPGRADE_OSD_NO_VERSION', ] def __init__(self, mgr: "CephadmOrchestrator"): @@ -724,12 +787,12 @@ class CephadmUpgrade: return True - def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None: - raw = report.get('osds_in_crush_bucket') - if isinstance(raw, list): - self._ok_to_upgrade_osds_in_crush_bucket = { - f'osd.{osd_id}' for osd_id in raw - } + def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report( + self, report: OkToUpgradeMonReport) -> None: + ids = report.osds_in_crush_bucket + self._ok_to_upgrade_osds_in_crush_bucket = { + f'osd.{osd_id}' for osd_id in ids + } def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool: # If not using ok-to-upgrade for OSDs, any OSD is valid. @@ -781,6 +844,25 @@ class CephadmUpgrade: ceph_version_short, max_osds=max_parallel, ) + except json.JSONDecodeError as err: + logger.error('Upgrade: osd ok-to-upgrade JSON parse failed: %s', err) + self._fail_upgrade('UPGRADE_EXCEPTION', { + 'severity': 'error', + 'summary': 'Upgrade: osd ok-to-upgrade returned invalid JSON', + 'count': 1, + 'detail': [str(err)], + }) + return False + except ValueError as err: + logger.error( + 'Upgrade: osd ok-to-upgrade unexpected response shape: %s', err) + self._fail_upgrade('UPGRADE_EXCEPTION', { + 'severity': 'error', + 'summary': 'Upgrade: osd ok-to-upgrade returned unexpected JSON shape', + 'count': 1, + 'detail': [str(err)], + }) + return False except MonCommandFailed as err: if self.is_mon_error_for_invalid_bucket(err): st = self.upgrade_state @@ -804,18 +886,30 @@ class CephadmUpgrade: self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report) + if report.bad_no_version: + osd_names = ', '.join(f'osd.{i}' for i in report.bad_no_version) + self._fail_upgrade('UPGRADE_OSD_NO_VERSION', { + 'severity': 'error', + 'summary': ( + 'Upgrade: osd ok-to-upgrade reported OSDs without a detectable ' + 'Ceph version' + ), + 'count': len(report.bad_no_version), + 'detail': [ + f'The monitor cannot compare these OSDs to the target version ' + f'({ceph_version_short!r}): {osd_names}. ' + 'Resolve daemon or version reporting on those OSDs before ' + 'continuing the upgrade.', + ], + }) + return False + # Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``. - # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids. - raw_osds = report.get('osds_ok_to_upgrade') - if isinstance(raw_osds, list): - approved_names = [f'osd.{osd_id}' for osd_id in raw_osds] - else: - approved_names = [] - bad_no_version = report.get('bad_no_version') or [] + approved_names = [f'osd.{osd_id}' for osd_id in report.osds_ok_to_upgrade] if ( not approved_names - and report.get('all_osds_upgraded') is True - and not bad_no_version + and report.all_osds_upgraded is True + and not report.bad_no_version ): self._ok_to_upgrade_all_osds_upgraded = True logger.info( @@ -834,7 +928,7 @@ class CephadmUpgrade: 'not yet on the target version). Report: %s', bucket_name, ceph_version_short, - report, + report.mon_resp_as_dict(), ) time.sleep(15) remaining_tries -= 1