From da4b361e63c1563a7d13deaceeb25205b162aafe Mon Sep 17 00:00:00 2001 From: "Ashwin M. Joshi" Date: Wed, 25 Feb 2026 11:28:39 +0530 Subject: [PATCH] mgr: Control PG autoscaler during upgrades with pg_autoscale_during_upgrade Fixes: https://tracker.ceph.com/issues/69477 Signed-off-by: Ashwin M. Joshi Conflicts: src/pybind/mgr/cephadm/tests/test_upgrade.py src/pybind/mgr/cephadm/upgrade.py --- doc/cephadm/upgrade.rst | 17 ++ src/pybind/mgr/cephadm/module.py | 10 + src/pybind/mgr/cephadm/tests/test_upgrade.py | 186 +++++++++++++++++++ src/pybind/mgr/cephadm/upgrade.py | 139 ++++++++++++++ 4 files changed, 352 insertions(+) diff --git a/doc/cephadm/upgrade.rst b/doc/cephadm/upgrade.rst index 823f1623dbea..5d7d7e44b4a7 100644 --- a/doc/cephadm/upgrade.rst +++ b/doc/cephadm/upgrade.rst @@ -43,6 +43,23 @@ The automated upgrade process follows Ceph best practices. For example: If the new release changes the above target value, there may be splitting or merging of PGs when unsetting after the upgrade. + Cephadm will automatically pause and resume the PG autoscaler activity + during upgrade unless opted-in by setting: + + .. prompt:: bash # + + ceph config set mgr mgr/cephadm/pg_autoscale_during_upgrade true + + To view the current value: + + .. prompt:: bash # + + ceph config get mgr mgr/cephadm/pg_autoscale_during_upgrade + + If autoscaling was already off before the upgrade, cephadm does not change + it unless you have set ``pg_autoscale_during_upgrade`` to ``true`` (opt-in + to turn autoscaling on for the duration of the upgrade). + Starting the Upgrade ==================== diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index 720e88343f1f..51b3ce36eed4 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -411,6 +411,15 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): default=16, desc='Maximum number of OSD daemons upgraded in parallel.' ), + Option( + 'pg_autoscale_during_upgrade', + type='bool', + default=False, + desc='Opt-in to keep PG autoscaling enabled during OSD upgrades. ' + 'When False (default), cephadm disables pool autoscaling (sets noautoscale) ' + 'before OSD upgrades and restores it on completion/stop/failure. ' + 'Set to true to keep autoscaling on during upgrade.' + ), Option( 'service_discovery_port', type='int', @@ -578,6 +587,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): self.default_registry = '' self.autotune_memory_target_ratio = 0.0 self.autotune_interval = 0 + self.pg_autoscale_during_upgrade = False self.ssh_user: Optional[str] = None self._ssh_options: Optional[str] = None self.tkey = NamedTemporaryFile() diff --git a/src/pybind/mgr/cephadm/tests/test_upgrade.py b/src/pybind/mgr/cephadm/tests/test_upgrade.py index 7344d26e4bd6..98ca31c0600a 100644 --- a/src/pybind/mgr/cephadm/tests/test_upgrade.py +++ b/src/pybind/mgr/cephadm/tests/test_upgrade.py @@ -562,6 +562,192 @@ def test_validate_failure_domain_upgrade_options_name_not_consulting_crush_map( @mock.patch('cephadm.serve.CephadmServe._run_cephadm', _run_cephadm('{}')) +@pytest.mark.parametrize( + "prior_autoscale,pg_autoscale_during_upgrade,autoscale_during_upgrade", + [ + # Decision table: prior_autoscale, pg_autoscale_during_upgrade -> autoscale_during_upgrade + (False, False, False), # prior off, no opt-in -> autoscale off during upgrade + (False, True, True), # prior off, opt-in -> autoscale on during upgrade + (True, False, False), # prior on, no opt-in -> autoscale off during upgrade + (True, True, True), # prior on, opt-in -> autoscale on during upgrade + ], +) +def test_pg_autoscale_decision_table( + prior_autoscale, + pg_autoscale_during_upgrade, + autoscale_during_upgrade, + cephadm_module: CephadmOrchestrator, +): + """Test PG autoscaling decision table at upgrade start. + Verifies that prior_autoscale and pg_autoscale_during_upgrade produce the + expected autoscale_during_upgrade decision, and that _set_noautoscale is + called only when autoscale_during_upgrade is False. + """ + expect_set_noautoscale = not autoscale_during_upgrade + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service( + cephadm_module, + ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*', count=2)), + status_running=True, + ): + cephadm_module.pg_autoscale_during_upgrade = pg_autoscale_during_upgrade + + with mock.patch.object( + cephadm_module.upgrade, + '_is_upgrade_autoscaling_allowed', + return_value=prior_autoscale, + ), mock.patch.object( + cephadm_module.upgrade, + '_set_noautoscale', + return_value=True, + ) as mock_set_noautoscale: + result = wait( + cephadm_module, + cephadm_module.upgrade_start('image_id', None), + ) + assert result == 'Initiating upgrade to image_id' + + # Decision: autoscale_during_upgrade=False -> set noautoscale + if expect_set_noautoscale: + mock_set_noautoscale.assert_called_once() + assert cephadm_module.upgrade.upgrade_state.noautoscale_set is True + else: + mock_set_noautoscale.assert_not_called() + assert getattr( + cephadm_module.upgrade.upgrade_state, + 'noautoscale_set', + False, + ) is False + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.serve.CephadmServe._get_container_image_info") +@pytest.mark.parametrize( + "daemon_types,expect_set_noautoscale", + [ + (['mon', 'mgr'], False), # excludes OSDs -> noautoscale not set + (['mon', 'mgr', 'osd'], True), # includes OSDs, prior_autoscale=False -> noautoscale set + ], +) +def test_pg_autoscale_skipped_when_upgrade_excludes_osds( + _get_container_image_info, cephadm_module: CephadmOrchestrator, + daemon_types, expect_set_noautoscale +): + """When upgrade excludes OSDs, _set_noautoscale should not be called. + When it includes OSDs and prior_autoscale=False, _set_noautoscale should be called. + """ + _get_container_image_info.side_effect = async_side_effect( + ('img_id', 'ceph version 18.2.0 (hash)', ['digest']) + ) + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service( + cephadm_module, + ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*', count=2)), + status_running=True, + ): + cephadm_module.pg_autoscale_during_upgrade = False + + with mock.patch.object( + cephadm_module.upgrade, + '_is_upgrade_autoscaling_allowed', + return_value=False, + ), mock.patch.object( + cephadm_module.upgrade, + '_set_noautoscale', + return_value=True, + ) as mock_set_noautoscale: + result = wait( + cephadm_module, + cephadm_module.upgrade_start( + 'image_id', None, + daemon_types=daemon_types, + ), + ) + assert result == 'Initiating upgrade to image_id' + if expect_set_noautoscale: + mock_set_noautoscale.assert_called_once() + else: + mock_set_noautoscale.assert_not_called() + + +@pytest.mark.parametrize( + "prior_autoscale,opt_in,autoscale_after", + [ + # Decision table: prior_autoscale, opt-in -> autoscale_after + (False, False, False), # Case 1: prior off, no opt-in -> autoscale off after + (False, True, False), # Case 2: prior off, opt-in -> autoscale off after (revert) + (True, False, True), # Case 3: prior on, no opt-in -> autoscale on after (revert) + (True, True, True), # Case 4: prior on, opt-in -> autoscale on after + ], +) +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) +@mock.patch("cephadm.serve.CephadmServe._get_container_image_info") +@mock.patch("cephadm.CephadmOrchestrator.check_mon_command") +def test_pg_autoscale_revert_after_upgrade( + check_mon_command, + _get_container_image_info, + prior_autoscale, + opt_in, + autoscale_after, + cephadm_module: CephadmOrchestrator, +): + """Test autoscale_after per decision table: prior_autoscale, opt-in -> autoscale_after. + Cases 1,3: we set noautoscale during upgrade, so we restore to prior on stop. + Cases 2,4: we never set noautoscale, so no restore; cluster stays as prior. + """ + _get_container_image_info.side_effect = async_side_effect( + ('img_id', 'ceph version 18.2.0 (hash)', ['digest']) + ) + check_mon_command.return_value = (0, '', '') + + with with_host(cephadm_module, 'host1'): + with with_host(cephadm_module, 'host2'): + with with_service( + cephadm_module, + ServiceSpec('mgr', placement=PlacementSpec(host_pattern='*', count=2)), + status_running=True, + ): + cephadm_module.pg_autoscale_during_upgrade = opt_in + + with mock.patch.object( + cephadm_module.upgrade, + '_is_upgrade_autoscaling_allowed', + return_value=prior_autoscale, + ): + wait( + cephadm_module, + cephadm_module.upgrade_start( + 'image_id', None, + daemon_types=['mon', 'mgr', 'osd'], + ), + ) + + # upgrade_stop triggers _unset_noautoscale when noautoscale_set + check_mon_command.reset_mock() + wait(cephadm_module, cephadm_module.upgrade_stop()) + + # Verify autoscale_after: restore path (cases 1,3) vs no restore (cases 2,4) + config_calls = [ + c for c in check_mon_command.call_args_list + if isinstance(c[0][0], dict) + and c[0][0].get('name') == 'osd_pool_default_pg_autoscale_mode' + ] + if not opt_in: + # Cases 1,3: we set noautoscale, so we restore + assert len(config_calls) >= 1 + expected_value = 'on' if autoscale_after else 'off' + assert any( + c[0][0].get('value') == expected_value + for c in config_calls + ) + else: + # Cases 2,4: we never set noautoscale, so no restore calls + assert len(config_calls) == 0 + + +@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) def test_not_enough_mgrs(cephadm_module: CephadmOrchestrator): with with_host(cephadm_module, 'host1'): with with_service(cephadm_module, ServiceSpec('mgr', placement=PlacementSpec(count=1)), CephadmOrchestrator.apply_mgr, ''): diff --git a/src/pybind/mgr/cephadm/upgrade.py b/src/pybind/mgr/cephadm/upgrade.py index 0960f7936436..55dc7d1560ad 100644 --- a/src/pybind/mgr/cephadm/upgrade.py +++ b/src/pybind/mgr/cephadm/upgrade.py @@ -202,7 +202,10 @@ class UpgradeState: remaining_count: Optional[int] = None, crush_bucket_type: Optional[str] = None, crush_bucket_name: Optional[str] = None, + noautoscale_set: Optional[bool] = False, + prior_autoscale: Optional[bool] = True, ): + self._target_name: str = target_name # Use CephadmUpgrade.target_image instead. self.progress_id: str = progress_id self.target_id: Optional[str] = target_id @@ -221,6 +224,8 @@ class UpgradeState: self.remaining_count = remaining_count self.crush_bucket_type = crush_bucket_type self.crush_bucket_name = crush_bucket_name + self.noautoscale_set = noautoscale_set + self.prior_autoscale = prior_autoscale def to_json(self) -> dict: return { @@ -241,6 +246,8 @@ class UpgradeState: 'remaining_count': self.remaining_count, 'crush_bucket_type': self.crush_bucket_type, 'crush_bucket_name': self.crush_bucket_name, + 'noautoscale_set': self.noautoscale_set, + 'prior_autoscale': self.prior_autoscale, } @classmethod @@ -513,6 +520,37 @@ class CephadmUpgrade: """ return f'retval: {-errno.ENOENT}' in str(err) + def _hosts_include_osds(self, hosts: List[str]) -> bool: + """Return True if any OSD daemon is on one of the given hosts.""" + osds = self.mgr.cache.get_daemons_by_type('osd') + hosts_set = set(hosts) + for d in osds: + if d.hostname in hosts_set: + return True + return False + + def _services_include_osds(self, services: List[str]) -> bool: + for s in services: + if s in self.mgr.spec_store: + spec = self.mgr.spec_store[s].spec + if (spec is not None + and 'osd' in orchestrator.service_to_daemon_types(spec.service_type)): + return True + return False + + def _upgrade_includes_osds(self, daemon_types: Optional[List[str]], + hosts: Optional[List[str]], + services: Optional[List[str]]) -> bool: + """Return True if this upgrade will include OSD daemons.""" + if daemon_types is not None: + return 'osd' in daemon_types + if services is not None: + return self._services_include_osds(services) + if hosts is not None: + return self._hosts_include_osds(hosts) + # No filter = full upgrade, includes OSDs + return True + def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, hosts: Optional[List[str]] = None, services: Optional[List[str]] = None, limit: Optional[int] = None, bucket_type: Optional[str] = None, bucket_name: Optional[str] = None) -> str: @@ -571,6 +609,25 @@ class CephadmUpgrade: crush_bucket_type=bucket_type, crush_bucket_name=bucket_name, ) + # One-time PG autoscaling decision when upgrade includes OSDs + if self._upgrade_includes_osds(daemon_types, hosts, services): + # prior_autoscale: current OSD noautoscale status from osd_map flags (before we touch it) + prior_autoscale = self._is_upgrade_autoscaling_allowed() + opt_in = bool(self.mgr.pg_autoscale_during_upgrade) + # Only opt-in keeps autoscaling on during upgrade; otherwise turn off and restore after + autoscale_during_upgrade = opt_in + logger.info( + 'Upgrade: PG autoscaling prior=%s, mgr/cephadm/pg_autoscale_during_upgrade=%s, ' + 'autoscaling during upgrade=%s', + prior_autoscale, opt_in, autoscale_during_upgrade + ) + if not autoscale_during_upgrade: + # If not opted-in disable the autoscale during upgrade and + # restore the state after upgrade complete/stops + if self._set_noautoscale(): + self.upgrade_state.noautoscale_set = True + # Store prior state from current OSD noautoscale status for restore + self.upgrade_state.prior_autoscale = prior_autoscale self._update_upgrade_progress(0.0) self._save_upgrade_state() self._clear_upgrade_health_checks() @@ -699,6 +756,8 @@ class CephadmUpgrade: def upgrade_stop(self) -> str: if not self.upgrade_state: return 'No upgrade in progress' + if getattr(self.upgrade_state, 'noautoscale_set', False): + self._unset_noautoscale() if self.upgrade_state.progress_id: self.mgr.remote('progress', 'complete', self.upgrade_state.progress_id) @@ -958,6 +1017,80 @@ class CephadmUpgrade: return False + def _is_upgrade_autoscaling_allowed(self) -> bool: + """Return True if PG autoscaling is allowed based on current OSD noautoscale status. + Reads osd_map flags; True when noautoscale is not set, False when it is set. + """ + osdmap = self.mgr.get("osd_map") + flags_str = (osdmap.get('flags') or '') if osdmap else '' + return 'noautoscale' not in flags_str + + def _set_noautoscale(self) -> bool: + """Set noautoscale (disable PG autoscaling) before OSD upgrade. Returns True on success.""" + try: + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': 'global', + 'name': 'osd_pool_default_pg_autoscale_mode', + 'value': 'off', + }) + try: + self.mgr.check_mon_command({ + 'prefix': 'osd set', + 'key': 'noautoscale', + }) + except Exception as e: + logger.warning('Upgrade: Failed to set noautoscale: %s', e) + logger.warning( + 'Upgrade: Partial state: osd_pool_default_pg_autoscale_mode set to off ' + 'but osd noautoscale flag not set. Check cluster config.' + ) + return False + logger.info('Upgrade: Set noautoscale (disable PG autoscaling) for OSD upgrade') + return True + except Exception as e: + logger.warning('Upgrade: Failed to set noautoscale: %s', e) + return False + + def _unset_noautoscale(self) -> None: + """Restore PG autoscaling to prior state on upgrade completion/stop/failure. + Retries on failure to improve resilience against transient mon command failures. + """ + prior_autoscale = getattr(self.upgrade_state, 'prior_autoscale', True) + restore_on = prior_autoscale + retry_delays = [2, 5, 10] + for i, sleep_secs in enumerate(retry_delays): + try: + self.mgr.check_mon_command({ + 'prefix': 'config set', + 'who': 'global', + 'name': 'osd_pool_default_pg_autoscale_mode', + 'value': 'on' if restore_on else 'off', + }) + if restore_on: + self.mgr.check_mon_command({ + 'prefix': 'osd unset', + 'key': 'noautoscale', + }) + else: + self.mgr.check_mon_command({ + 'prefix': 'osd set', + 'key': 'noautoscale', + }) + logger.info( + 'Upgrade: Restored PG autoscaling to %s', + 'on' if restore_on else 'off' + ) + return + except Exception as e: + logger.warning( + 'Upgrade: Failed to restore noautoscale (retry in %ds): %s', + sleep_secs, e + ) + if i < len(retry_delays) - 1: + time.sleep(sleep_secs) + logger.warning('Upgrade: Failed to restore noautoscale after retries') + def _clear_upgrade_health_checks(self) -> None: for k in self.UPGRADE_ERRORS: if k in self.mgr.health_checks: @@ -975,6 +1108,9 @@ class CephadmUpgrade: alert['summary'])) self.upgrade_state.error = alert_id + ': ' + alert['summary'] self.upgrade_state.paused = True + # Do not restore PG autoscaling here: upgrade is only paused. Restore + # only on upgrade_stop or _mark_upgrade_complete so that resume + # continues with autoscaling still disabled for OSD upgrades. self._save_upgrade_state() self.mgr.health_checks[alert_id] = alert self.mgr.set_health_checks(self.mgr.health_checks) @@ -1553,6 +1689,9 @@ class CephadmUpgrade: if not self.upgrade_state: logger.debug('_mark_upgrade_complete upgrade already marked complete, exiting') return + if getattr(self.upgrade_state, 'noautoscale_set', False): + self._unset_noautoscale() + self.upgrade_state.noautoscale_set = False logger.info('Upgrade: Complete!') if self.upgrade_state.progress_id: self.mgr.remote('progress', 'complete', -- 2.47.3