import json
+import logging
from unittest import mock
import pytest
from cephadm import CephadmOrchestrator
from cephadm.upgrade import (
CephadmUpgrade,
+ OkToUpgradeMonReport,
UpgradeState,
parse_ok_to_upgrade_mon_json,
request_osd_ok_to_upgrade_report,
assert d['ok_to_upgrade'] is True
+def test_ok_to_upgrade_mon_report_from_parsed_body():
+ rep = OkToUpgradeMonReport.from_parsed_body({
+ 'ok_to_upgrade': True,
+ 'all_osds_upgraded': False,
+ 'osds_ok_to_upgrade': [0, 1],
+ 'osds_in_crush_bucket': [2, 3],
+ 'osds_upgraded': [4],
+ 'bad_no_version': [5],
+ })
+ assert rep.ok_to_upgrade is True
+ assert rep.all_osds_upgraded is False
+ assert rep.osds_ok_to_upgrade == [0, 1]
+ assert rep.osds_in_crush_bucket == [2, 3]
+ assert rep.osds_upgraded == [4]
+ assert rep.bad_no_version == [5]
+ assert rep.mon_resp_as_dict()['bad_no_version'] == [5]
+
+
+def test_ok_to_upgrade_mon_report_non_list_osd_array_becomes_empty():
+ rep = OkToUpgradeMonReport.from_parsed_body({
+ 'ok_to_upgrade': True,
+ 'all_osds_upgraded': False,
+ 'osds_ok_to_upgrade': 'not-a-list',
+ })
+ assert rep.osds_ok_to_upgrade == []
+
+
+def test_ok_to_upgrade_mon_report_matches_mgr_json_formatter_shape():
+ """
+ Shape produced by upgrade_osd_report::dump + JSONFormatter (DaemonServer):
+ array sections are bare int lists, not objects per element.
+ """
+ mon_stdout = (
+ '{"ok_to_upgrade":{'
+ '"ok_to_upgrade":true,'
+ '"all_osds_upgraded":false,'
+ '"osds_in_crush_bucket":[10,11],'
+ '"osds_ok_to_upgrade":[10],'
+ '"osds_upgraded":[],'
+ '"bad_no_version":[]'
+ '}}'
+ )
+ body = parse_ok_to_upgrade_mon_json(mon_stdout)
+ rep = OkToUpgradeMonReport.from_parsed_body(body)
+ assert rep.ok_to_upgrade is True
+ assert rep.all_osds_upgraded is False
+ assert rep.osds_in_crush_bucket == [10, 11]
+ assert rep.osds_ok_to_upgrade == [10]
+ assert rep.osds_upgraded == []
+ assert rep.bad_no_version == []
+
+
+def test_ok_to_upgrade_mon_report_from_parsed_body_rejects_non_mapping():
+ with pytest.raises(ValueError, match='expected JSON object'):
+ OkToUpgradeMonReport.from_parsed_body([])
+
+
+def test_ok_to_upgrade_mon_report_warns_on_non_boolean_flags(caplog):
+ caplog.set_level(logging.WARNING)
+ rep = OkToUpgradeMonReport.from_parsed_body({
+ 'ok_to_upgrade': 'unexpected-string',
+ 'all_osds_upgraded': False,
+ })
+ assert rep.ok_to_upgrade is None
+ assert rep.all_osds_upgraded is False
+ assert any('expected boolean' in r.message for r in caplog.records)
+
+
def test_request_osd_ok_to_upgrade_report(cephadm_module: CephadmOrchestrator):
cephadm_module.check_mon_command = mock.MagicMock(
return_value=(0, '{"ok_to_upgrade": {"ok_to_upgrade": true}}', ''))
rep = request_osd_ok_to_upgrade_report(
cephadm_module, 'mybucket', '20.1.0-144.el9cp', max_osds=3)
- assert rep['ok_to_upgrade'] is True
+ assert rep.ok_to_upgrade is True
cephadm_module.check_mon_command.assert_called_once()
cmd = cephadm_module.check_mon_command.call_args[0][0]
assert cmd['prefix'] == 'osd ok-to-upgrade'
assert cmd['max'] == 3
+def test_parse_ok_to_upgrade_mon_json_invalid_raises():
+ with pytest.raises(json.JSONDecodeError):
+ parse_ok_to_upgrade_mon_json('not-json{')
+
+
+def test_request_osd_ok_to_upgrade_report_invalid_json(cephadm_module: CephadmOrchestrator):
+ cephadm_module.check_mon_command = mock.MagicMock(
+ return_value=(0, 'not-json{', ''))
+ with pytest.raises(json.JSONDecodeError):
+ request_osd_ok_to_upgrade_report(
+ cephadm_module, 'mybucket', '20.1.0', max_osds=3)
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_json_decode_pauses(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'img',
+ 'pid',
+ target_version='20.1.0',
+ crush_bucket_type='host',
+ crush_bucket_name='host1',
+ )
+ with mock.patch(
+ 'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+ side_effect=json.JSONDecodeError('msg', 'doc', 0),
+ ):
+ ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+ assert ok is False
+ assert cephadm_module.upgrade.upgrade_state.paused is True
+ assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks
+ assert 'invalid JSON' in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary']
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_value_error_pauses(cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'img',
+ 'pid',
+ target_version='20.1.0',
+ crush_bucket_type='host',
+ crush_bucket_name='host1',
+ )
+ with mock.patch(
+ 'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+ side_effect=ValueError(
+ "osd ok-to-upgrade: expected JSON object after unwrap, got <class 'list'>"),
+ ):
+ ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+ assert ok is False
+ assert cephadm_module.upgrade.upgrade_state.paused is True
+ assert 'UPGRADE_EXCEPTION' in cephadm_module.health_checks
+ assert (
+ 'unexpected JSON shape'
+ in cephadm_module.health_checks['UPGRADE_EXCEPTION']['summary']
+ )
+
+
+@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
+def test_wait_for_ok_to_upgrade_osd_batch_bad_no_version_pauses(
+ cephadm_module: CephadmOrchestrator):
+ cephadm_module.upgrade.upgrade_state = UpgradeState(
+ 'img',
+ 'pid',
+ target_version='20.1.0',
+ crush_bucket_type='host',
+ crush_bucket_name='host1',
+ )
+ bad_rep = OkToUpgradeMonReport(
+ ok_to_upgrade=True,
+ all_osds_upgraded=False,
+ osds_ok_to_upgrade=[],
+ osds_in_crush_bucket=[0, 1],
+ osds_upgraded=[],
+ bad_no_version=[99],
+ )
+ with mock.patch(
+ 'cephadm.upgrade.request_osd_ok_to_upgrade_report',
+ return_value=bad_rep,
+ ):
+ ok = cephadm_module.upgrade._wait_for_ok_to_upgrade_osd_batch([])
+ assert ok is False
+ assert cephadm_module.upgrade.upgrade_state.paused is True
+ assert 'UPGRADE_OSD_NO_VERSION' in cephadm_module.health_checks
+ detail = cephadm_module.health_checks['UPGRADE_OSD_NO_VERSION']['detail'][0]
+ assert 'osd.99' in detail
+
+
def test_validate_failure_domain_upgrade_options_ok(cephadm_module: CephadmOrchestrator):
cephadm_module.upgrade._validate_failure_domain_upgrade_options(
'rack', 'rack-a', ['osd'])
import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, cast, Set
+from dataclasses import dataclass, field, asdict
+from typing import TYPE_CHECKING, Optional, Dict, List, Tuple, Any, Mapping, cast, Set
from cephadm.services.service_registry import service_registry
import orchestrator
return digest
+def _get_boolean_values_from_mon_json(value: Any) -> Optional[bool]:
+ """Handle only JSON booleans for ``ok_to_upgrade`` / ``all_osds_upgraded``."""
+ if isinstance(value, bool):
+ return value
+ if value is not None:
+ logger.warning('osd ok-to-upgrade: expected boolean, got %s: %r', type(value), value)
+ return None
+
+
+def _get_osd_ids_from_mon_json(value: Any) -> List[int]:
+ """
+ OSD id list fields from ``osd ok-to-upgrade`` inner JSON.
+
+ Per-element types are not checked: the monitor's ``upgrade_osd_report::dump``
+ path emits bare integer arrays for these keys, and the mgr treats that as the
+ contract for this command.
+ """
+ if isinstance(value, list):
+ return list(value)
+ return []
+
+
+@dataclass(frozen=True)
+class OkToUpgradeMonReport:
+ """
+ Normalized view of the inner mon JSON for osd ok-to-upgrade after
+ parse_ok_to_upgrade_mon_json unwraps the top-level ok_to_upgrade object.
+
+ Field names match the keys in the inner report object from the mon JSON.
+ (ok_to_upgrade, all_osds_upgraded, osds_in_crush_bucket,
+ osds_ok_to_upgrade, osds_upgraded, bad_no_version).
+ """
+
+ ok_to_upgrade: Optional[bool]
+ all_osds_upgraded: Optional[bool]
+ osds_ok_to_upgrade: List[int] = field(default_factory=list)
+ osds_in_crush_bucket: List[int] = field(default_factory=list)
+ osds_upgraded: List[int] = field(default_factory=list)
+ bad_no_version: List[int] = field(default_factory=list)
+
+ @classmethod
+ def from_parsed_body(cls, body: Any) -> 'OkToUpgradeMonReport':
+ if not isinstance(body, Mapping):
+ raise ValueError(
+ f'osd ok-to-upgrade: expected JSON object after unwrap, got {type(body)!r}')
+ b = dict(body)
+ return cls(
+ ok_to_upgrade=_get_boolean_values_from_mon_json(b.get('ok_to_upgrade')),
+ all_osds_upgraded=_get_boolean_values_from_mon_json(b.get('all_osds_upgraded')),
+ osds_ok_to_upgrade=_get_osd_ids_from_mon_json(b.get('osds_ok_to_upgrade')),
+ osds_in_crush_bucket=_get_osd_ids_from_mon_json(b.get('osds_in_crush_bucket')),
+ osds_upgraded=_get_osd_ids_from_mon_json(b.get('osds_upgraded')),
+ bad_no_version=_get_osd_ids_from_mon_json(b.get('bad_no_version')),
+ )
+
+ def mon_resp_as_dict(self) -> Dict[str, Any]:
+ return asdict(self)
+
+
def parse_ok_to_upgrade_mon_json(out: str) -> dict:
"""
- Parse mon JSON from ``osd ok-to-upgrade``. The monitor nests the
- report under a top-level ``ok_to_upgrade`` object.
- If the report is not nested, return the entire JSON object.
+ Parse mon JSON from ``osd ok-to-upgrade``.
+ osd ok-to-upgrade implementation in DaemonServer.cc dumps the response
+ as a top-level boolean object with the inner report object as the key-values
+
"""
parsed = json.loads(out)
nested_ok_report = parsed.get('ok_to_upgrade')
crush_bucket: str,
ceph_version_short: str,
max_osds: int,
-) -> dict:
+) -> OkToUpgradeMonReport:
"""
Send ``osd ok-to-upgrade`` to the monitor.
'max': max_osds,
}
_return_code, mon_out, _stderr = mgr.check_mon_command(cmd)
- report = parse_ok_to_upgrade_mon_json(mon_out)
+ body = parse_ok_to_upgrade_mon_json(mon_out)
+ report = OkToUpgradeMonReport.from_parsed_body(body)
logger.debug(
'Upgrade: osd ok-to-upgrade mon response: requested max=%s crush_bucket=%r '
'ceph_version=%r ok_to_upgrade=%s all_osds_upgraded=%s osds_ok_to_upgrade=%s '
max_osds,
crush_bucket,
ceph_version_short,
- report.get('ok_to_upgrade'),
- report.get('all_osds_upgraded'),
- report.get('osds_ok_to_upgrade'),
- report.get('osds_in_crush_bucket'),
- report.get('osds_upgraded'),
- report.get('bad_no_version'),
+ report.ok_to_upgrade,
+ report.all_osds_upgraded,
+ report.osds_ok_to_upgrade,
+ report.osds_in_crush_bucket,
+ report.osds_upgraded,
+ report.bad_no_version,
)
return report
'UPGRADE_EXCEPTION',
'UPGRADE_OFFLINE_HOST',
'UPGRADE_INVALID_CRUSH_BUCKET',
+ 'UPGRADE_OSD_NO_VERSION',
]
def __init__(self, mgr: "CephadmOrchestrator"):
return True
- def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(self, report: dict) -> None:
- raw = report.get('osds_in_crush_bucket')
- if isinstance(raw, list):
- self._ok_to_upgrade_osds_in_crush_bucket = {
- f'osd.{osd_id}' for osd_id in raw
- }
+ def _cache_osds_in_crush_bucket_from_ok_to_upgrade_report(
+ self, report: OkToUpgradeMonReport) -> None:
+ ids = report.osds_in_crush_bucket
+ self._ok_to_upgrade_osds_in_crush_bucket = {
+ f'osd.{osd_id}' for osd_id in ids
+ }
def is_osd_upgrade_valid_for_failure_domain(self, d: DaemonDescription) -> bool:
# If not using ok-to-upgrade for OSDs, any OSD is valid.
ceph_version_short,
max_osds=max_parallel,
)
+ except json.JSONDecodeError as err:
+ logger.error('Upgrade: osd ok-to-upgrade JSON parse failed: %s', err)
+ self._fail_upgrade('UPGRADE_EXCEPTION', {
+ 'severity': 'error',
+ 'summary': 'Upgrade: osd ok-to-upgrade returned invalid JSON',
+ 'count': 1,
+ 'detail': [str(err)],
+ })
+ return False
+ except ValueError as err:
+ logger.error(
+ 'Upgrade: osd ok-to-upgrade unexpected response shape: %s', err)
+ self._fail_upgrade('UPGRADE_EXCEPTION', {
+ 'severity': 'error',
+ 'summary': 'Upgrade: osd ok-to-upgrade returned unexpected JSON shape',
+ 'count': 1,
+ 'detail': [str(err)],
+ })
+ return False
except MonCommandFailed as err:
if self.is_mon_error_for_invalid_bucket(err):
st = self.upgrade_state
self._cache_osds_in_crush_bucket_from_ok_to_upgrade_report(report)
+ if report.bad_no_version:
+ osd_names = ', '.join(f'osd.{i}' for i in report.bad_no_version)
+ self._fail_upgrade('UPGRADE_OSD_NO_VERSION', {
+ 'severity': 'error',
+ 'summary': (
+ 'Upgrade: osd ok-to-upgrade reported OSDs without a detectable '
+ 'Ceph version'
+ ),
+ 'count': len(report.bad_no_version),
+ 'detail': [
+ f'The monitor cannot compare these OSDs to the target version '
+ f'({ceph_version_short!r}): {osd_names}. '
+ 'Resolve daemon or version reporting on those OSDs before '
+ 'continuing the upgrade.',
+ ],
+ })
+ return False
+
# Detailed mon fields logged in ``request_osd_ok_to_upgrade_report``.
- # Same JSON shape as ``osd ok-to-stop`` → ``osds``: array of OSD ids.
- raw_osds = report.get('osds_ok_to_upgrade')
- if isinstance(raw_osds, list):
- approved_names = [f'osd.{osd_id}' for osd_id in raw_osds]
- else:
- approved_names = []
- bad_no_version = report.get('bad_no_version') or []
+ approved_names = [f'osd.{osd_id}' for osd_id in report.osds_ok_to_upgrade]
if (
not approved_names
- and report.get('all_osds_upgraded') is True
- and not bad_no_version
+ and report.all_osds_upgraded is True
+ and not report.bad_no_version
):
self._ok_to_upgrade_all_osds_upgraded = True
logger.info(
'not yet on the target version). Report: %s',
bucket_name,
ceph_version_short,
- report,
+ report.mon_resp_as_dict(),
)
time.sleep(15)
remaining_tries -= 1