from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
-from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
+from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, is_repo_digest
try:
import remoto
'default': None,
'desc': 'Custom repository password'
},
+ {
+ 'name': 'use_repo_digest',
+ 'type': 'bool',
+ 'default': False,
+ 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
+ }
]
def __init__(self, *args, **kwargs):
self.registry_url: Optional[str] = None
self.registry_username: Optional[str] = None
self.registry_password: Optional[str] = None
+ self.use_repo_digest = False
self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
remoto.backends.LegacyModuleExecute]] = {}
try:
+ self.convert_tags_to_repo_digest()
+
# refresh daemons
self.log.debug('refreshing hosts and daemons')
self._refresh_hosts_and_daemons()
self._serve_sleep()
self.log.debug("serve exit")
+ def convert_tags_to_repo_digest(self):
+ if not self.use_repo_digest:
+ return
+ settings = self.upgrade.get_distinct_container_image_settings()
+ digests: Dict[str, ContainerInspectInfo] = {}
+ for container_image_ref in set(settings.values()):
+ if not is_repo_digest(container_image_ref):
+ image_info = self._get_container_image_info(container_image_ref)
+ if image_info.repo_digest:
+ assert is_repo_digest(image_info.repo_digest), image_info
+ digests[container_image_ref] = image_info
+
+ for entity, container_image_ref in settings.items():
+ if not is_repo_digest(container_image_ref):
+ image_info = digests[container_image_ref]
+ if image_info.repo_digest:
+ self.set_container_image(entity, image_info.repo_digest)
+
def set_container_image(self, entity: str, image):
self.check_mon_command({
'prefix': 'config set',
'current_id': dd.container_image_id,
'current_version': dd.version,
}
+ if self.use_repo_digest:
+ r['target_digest'] = image_info.repo_digest
+
return json.dumps(r, indent=4, sort_keys=True)
@trivial_completion
code, out, err = cephadm_module.registry_login('fail-url', 'fail-user', 'fail-password')
assert err == 'Host test failed to login to fail-url as fail-user with given password'
check_registry_credentials('json-url', 'json-user', 'json-pass')
+
+ @mock.patch("cephadm.module.CephadmOrchestrator._run_cephadm", _run_cephadm(json.dumps({
+ 'image_id': 'image_id',
+ 'repo_digest': 'image@repo_digest',
+ })))
+ @pytest.mark.parametrize("use_repo_digest",
+ [
+ False,
+ True
+ ])
+ def test_upgrade_run(self, use_repo_digest, cephadm_module: CephadmOrchestrator):
+ with with_host(cephadm_module, 'test'):
+ cephadm_module.set_container_image('global', 'image')
+ if use_repo_digest:
+ cephadm_module.use_repo_digest = True
+
+ cephadm_module.convert_tags_to_repo_digest()
+
+ _, image, _ = cephadm_module.check_mon_command({
+ 'prefix': 'config get',
+ 'who': 'global',
+ 'key': 'container_image',
+ })
+ if use_repo_digest:
+ assert image == 'image@repo_digest'
+ else:
+ assert image == 'image'
import logging
import time
import uuid
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING, Optional, Dict
import orchestrator
from cephadm.utils import name_to_config_section
return
self.mgr.set_store('upgrade_state', json.dumps(self.upgrade_state.to_json()))
+ def get_distinct_container_image_settings(self) -> Dict[str, str]:
+ # get all distinct container_image settings
+ image_settings = {}
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config dump',
+ 'format': 'json',
+ })
+ config = json.loads(out)
+ for opt in config:
+ if opt['name'] == 'container_image':
+ image_settings[opt['section']] = opt['value']
+ return image_settings
+
def _do_upgrade(self):
# type: () -> None
if not self.upgrade_state:
logger.info('Upgrade: Target is %s with id %s' % (target_name,
target_id))
- # get all distinct container_image settings
- image_settings = {}
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config dump',
- 'format': 'json',
- })
- config = json.loads(out)
- for opt in config:
- if opt['name'] == 'container_image':
- image_settings[opt['section']] = opt['value']
+ image_settings = self.get_distinct_container_image_settings()
daemons = self.mgr.cache.get_daemons()
done = 0
raise OrchestratorError('failed to parse health status')
return j['status']
+
+
+def is_repo_digest(image_name: str) -> bool:
+ """
+ repo digest are something like "ceph/ceph@sha256:blablabla"
+ """
+ return '@' in image_name