import json
from unittest import mock
+import pytest
+
from ceph.deployment.service_spec import ServiceSpec
from cephadm import CephadmOrchestrator
from .fixtures import _run_cephadm, wait, cephadm_module, with_host, with_service
@mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm('{}'))
-def test_upgrade_run(cephadm_module: CephadmOrchestrator):
+@pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+def test_upgrade_run(use_repo_digest, cephadm_module: CephadmOrchestrator):
with with_host(cephadm_module, 'test'):
cephadm_module.set_container_image('global', 'from_image')
+ if use_repo_digest:
+ cephadm_module.use_repo_digest = True
with with_service(cephadm_module, ServiceSpec('mgr'), CephadmOrchestrator.apply_mgr, 'test'):
assert wait(cephadm_module, cephadm_module.upgrade_start(
'to_image', None)) == 'Initiating upgrade to to_image'
cephadm_module._mon_command_mock_versions = _versions_mock
+ with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digest': 'to_image@repo_digest',
+ }))):
+
+ cephadm_module.upgrade._do_upgrade()
+
+ assert cephadm_module.upgrade_status is not None
+
+ with mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(
+ json.dumps([
+ dict(
+ name=list(cephadm_module.cache.daemons['test'].keys())[0],
+ style='cephadm',
+ fsid='fsid',
+ container_id='container_id',
+ container_image_id='image_id',
+ version='version',
+ state='running',
+ )
+ ])
+ )):
+ cephadm_module._refresh_hosts_and_daemons()
+
cephadm_module.upgrade._do_upgrade()
_, image, _ = cephadm_module.check_mon_command({
'who': 'global',
'key': 'container_image',
})
-
- assert image == 'to_image'
+ if use_repo_digest:
+ assert image == 'to_image@repo_digest'
+ else:
+ assert image == 'to_image'
import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional, Dict
+from typing import TYPE_CHECKING, Optional, Dict, NamedTuple
import orchestrator
from cephadm.utils import name_to_config_section
target_name: str,
progress_id: str,
target_id: Optional[str] = None,
+ repo_digest: Optional[str] = None,
target_version: Optional[str] = None,
error: Optional[str] = None,
paused: Optional[bool] = None,
):
- self.target_name: str = target_name
+ self._target_name: str = target_name # Use CephadmUpgrade.target_image instead.
self.progress_id: str = progress_id
self.target_id: Optional[str] = target_id
+ self.repo_digest: Optional[str] = repo_digest
self.target_version: Optional[str] = target_version
self.error: Optional[str] = error
self.paused: bool = paused or False
def to_json(self) -> dict:
return {
- 'target_name': self.target_name,
+ 'target_name': self._target_name,
'progress_id': self.progress_id,
'target_id': self.target_id,
+ 'repo_digest': self.repo_digest,
'target_version': self.target_version,
'error': self.error,
'paused': self.paused,
else:
self.upgrade_state = None
+ @property
+ def target_image(self) -> str:
+ assert self.upgrade_state
+ if not self.mgr.use_repo_digest:
+ return self.upgrade_state._target_name
+ if not self.upgrade_state.repo_digest:
+ return self.upgrade_state._target_name
+
+ return self.upgrade_state.repo_digest
+
def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
r = orchestrator.UpgradeStatusSpec()
if self.upgrade_state:
- r.target_image = self.upgrade_state.target_name
+ r.target_image = self.target_image
r.in_progress = True
if self.upgrade_state.error:
r.message = 'Error: ' + self.upgrade_state.error
else:
raise OrchestratorError('must specify either image or version')
if self.upgrade_state:
- if self.upgrade_state.target_name != target_name:
+ if self.upgrade_state._target_name != target_name:
raise OrchestratorError(
'Upgrade to %s (not %s) already in progress' %
- (self.upgrade_state.target_name, target_name))
+ (self.upgrade_state._target_name, target_name))
if self.upgrade_state.paused:
self.upgrade_state.paused = False
self._save_upgrade_state()
- return 'Resumed upgrade to %s' % self.upgrade_state.target_name
- return 'Upgrade to %s in progress' % self.upgrade_state.target_name
+ return 'Resumed upgrade to %s' % self.target_image
+ return 'Upgrade to %s in progress' % self.target_image
self.upgrade_state = UpgradeState(
target_name=target_name,
progress_id=str(uuid.uuid4())
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
if self.upgrade_state.paused:
- return 'Upgrade to %s already paused' % self.upgrade_state.target_name
+ return 'Upgrade to %s already paused' % self.target_image
self.upgrade_state.paused = True
self._save_upgrade_state()
- return 'Paused upgrade to %s' % self.upgrade_state.target_name
+ return 'Paused upgrade to %s' % self.target_image
def upgrade_resume(self) -> str:
if not self.upgrade_state:
raise OrchestratorError('No upgrade in progress')
if not self.upgrade_state.paused:
- return 'Upgrade to %s not paused' % self.upgrade_state.target_name
+ return 'Upgrade to %s not paused' % self.target_image
self.upgrade_state.paused = False
self._save_upgrade_state()
self.mgr.event.set()
- return 'Resumed upgrade to %s' % self.upgrade_state.target_name
+ return 'Resumed upgrade to %s' % self.target_image
def upgrade_stop(self) -> str:
if not self.upgrade_state:
return 'No upgrade in progress'
- target_name = self.upgrade_state.target_name
if self.upgrade_state.progress_id:
self.mgr.remote('progress', 'complete',
self.upgrade_state.progress_id)
+ target_image = self.target_image
self.upgrade_state = None
self._save_upgrade_state()
self._clear_upgrade_health_checks()
self.mgr.event.set()
- return 'Stopped upgrade to %s' % target_name
+ return 'Stopped upgrade to %s' % target_image
def continue_upgrade(self) -> bool:
"""
self.upgrade_state.progress_id = str(uuid.uuid4())
self._save_upgrade_state()
self.mgr.remote('progress', 'update', self.upgrade_state.progress_id,
- ev_msg='Upgrade to %s' % self.upgrade_state.target_name,
+ ev_msg='Upgrade to %s' % self.target_image,
ev_progress=progress)
def _save_upgrade_state(self) -> None:
logger.debug('_do_upgrade no state, exiting')
return
- target_name = self.upgrade_state.target_name
+ target_image = self.target_image
target_id = self.upgrade_state.target_id
- if not target_id:
+ if not target_id or (self.mgr.use_repo_digest and not self.upgrade_state.repo_digest):
# need to learn the container hash
- logger.info('Upgrade: First pull of %s' % target_name)
+ logger.info('Upgrade: First pull of %s' % target_image)
try:
target_id, target_version, repo_digest = self.mgr._get_container_image_info(
- target_name)
+ target_image)
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',
return
self.upgrade_state.target_id = target_id
self.upgrade_state.target_version = target_version
+ self.upgrade_state.repo_digest = repo_digest
self._save_upgrade_state()
+ target_image = self.target_image
target_version = self.upgrade_state.target_version
- logger.info('Upgrade: Target is %s with id %s' % (target_name,
+ logger.info('Upgrade: Target is %s with id %s' % (target_image,
target_id))
image_settings = self.get_distinct_container_image_settings()
# make sure host has latest container image
out, err, code = self.mgr._run_cephadm(
d.hostname, '', 'inspect-image', [],
- image=target_name, no_fsid=True, error_ok=True)
+ image=target_image, no_fsid=True, error_ok=True)
if code or json.loads(''.join(out)).get('image_id') != target_id:
- logger.info('Upgrade: Pulling %s on %s' % (target_name,
+ logger.info('Upgrade: Pulling %s on %s' % (target_image,
d.hostname))
out, err, code = self.mgr._run_cephadm(
d.hostname, '', 'pull', [],
- image=target_name, no_fsid=True, error_ok=True)
+ image=target_image, no_fsid=True, error_ok=True)
if code:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
'severity': 'warning',
'summary': 'Upgrade: failed to pull target image',
'count': 1,
'detail': [
- 'failed to pull %s on host %s' % (target_name,
+ 'failed to pull %s on host %s' % (target_image,
d.hostname)],
})
return
r = json.loads(''.join(out))
if r.get('image_id') != target_id:
logger.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (
- target_name, d.hostname, r['image_id'], target_id))
+ target_image, d.hostname, r['image_id'], target_id))
self.upgrade_state.target_id = r['image_id']
self._save_upgrade_state()
return
self._update_upgrade_progress(done / len(daemons))
if not d.container_image_id:
- if d.container_image_name == target_name:
+ if d.container_image_name == target_image:
logger.debug(
'daemon %s has unknown container_image_id but has correct image name' % (d.name()))
continue
d.daemon_id,
d.hostname,
'redeploy',
- image=target_name
+ image=target_image
)
return
(count, daemon_type, version, target_version))
# push down configs
- if image_settings.get(daemon_type) != target_name:
+ if image_settings.get(daemon_type) != target_image:
logger.info('Upgrade: Setting container_image for all %s...' %
daemon_type)
- self.mgr.set_container_image(name_to_config_section(daemon_type), target_name)
+ self.mgr.set_container_image(name_to_config_section(daemon_type), target_image)
to_clean = []
for section in image_settings.keys():
if section.startswith(name_to_config_section(daemon_type) + '.'):
# clean up
logger.info('Upgrade: Finalizing container_image settings')
- self.mgr.set_container_image('global', target_name)
+ self.mgr.set_container_image('global', target_image)
for daemon_type in CEPH_UPGRADE_ORDER:
ret, image, err = self.mgr.check_mon_command({