return inner
-class ContainerInspectInfo(NamedTuple):
- image_id: str
- ceph_version: Optional[str]
- repo_digest: Optional[str]
-
-
class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
metaclass=CLICommandMeta):
def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
return self._apply(spec)
- def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
- # pick a random host...
- host = None
- for host_name in self.inventory.keys():
- host = host_name
- break
- if not host:
- raise OrchestratorError('no hosts defined')
- if self.cache.host_needs_registry_login(host) and self.registry_url:
- self._registry_login(host, self.registry_url,
- self.registry_username, self.registry_password)
- out, err, code = CephadmServe(self)._run_cephadm(
- host, '', 'pull', [],
- image=image_name,
- no_fsid=True,
- error_ok=True)
- if code:
- raise OrchestratorError('Failed to pull %s on %s: %s' % (
- image_name, host, '\n'.join(out)))
- try:
- j = json.loads('\n'.join(out))
- r = ContainerInspectInfo(
- j['image_id'],
- j.get('ceph_version'),
- j.get('repo_digest')
- )
- self.log.debug(f'image {image_name} -> {r}')
- return r
- except (ValueError, KeyError) as _:
- msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host)
- self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out)))
- raise OrchestratorError(msg)
-
@trivial_completion
def upgrade_check(self, image: str, version: str) -> str:
if self.inventory.get_host_with_state("maintenance"):
else:
raise OrchestratorError('must specify either image or version')
- image_info = self._get_container_image_info(target_name)
+ image_info = CephadmServe(self)._get_container_image_info(target_name)
self.log.debug(f'image info {image} -> {image_info}')
r: dict = {
'target_name': target_name,
from cephadm.services.cephadmservice import CephadmDaemonSpec
from cephadm.schedule import HostAssignment
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
- CephadmNoImage, CEPH_UPGRADE_ORDER
+ CephadmNoImage, CEPH_UPGRADE_ORDER, ContainerInspectInfo
from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
if TYPE_CHECKING:
- from cephadm.module import CephadmOrchestrator, ContainerInspectInfo
+ from cephadm.module import CephadmOrchestrator
logger = logging.getLogger(__name__)
This module contains functions that are executed in the
serve() thread. Thus they don't block the CLI.
+ Please see the `Note regarding network calls from CLI handlers`
+ chapter in the cephadm developer guide.
+
On the other hand, These function should *not* be called form
CLI handlers, to avoid blocking the CLI
"""
digests: Dict[str, ContainerInspectInfo] = {}
for container_image_ref in set(settings.values()):
if not is_repo_digest(container_image_ref):
- image_info = self.mgr._get_container_image_info(container_image_ref)
+ image_info = self._get_container_image_info(container_image_ref)
if image_info.repo_digest:
assert is_repo_digest(image_info.repo_digest), image_info
digests[container_image_ref] = image_info
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
return out, err, code
+
+ def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+ # pick a random host...
+ host = None
+ for host_name in self.mgr.inventory.keys():
+ host = host_name
+ break
+ if not host:
+ raise OrchestratorError('no hosts defined')
+ if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
+ self.mgr._registry_login(host, self.mgr.registry_url,
+ self.mgr.registry_username, self.mgr.registry_password)
+ out, err, code = self._run_cephadm(
+ host, '', 'pull', [],
+ image=image_name,
+ no_fsid=True,
+ error_ok=True)
+ if code:
+ raise OrchestratorError('Failed to pull %s on %s: %s' % (
+ image_name, host, '\n'.join(out)))
+ try:
+ j = json.loads('\n'.join(out))
+ r = ContainerInspectInfo(
+ j['image_id'],
+ j.get('ceph_version'),
+ j.get('repo_digest')
+ )
+ self.log.debug(f'image {image_name} -> {r}')
+ return r
+ except (ValueError, KeyError) as _:
+ msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host)
+ self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out)))
+ raise OrchestratorError(msg)
# need to learn the container hash
logger.info('Upgrade: First pull of %s' % target_image)
try:
- target_id, target_version, repo_digest = self.mgr._get_container_image_info(
+ target_id, target_version, repo_digest = CephadmServe(self.mgr)._get_container_image_info(
target_image)
except OrchestratorError as e:
self._fail_upgrade('UPGRADE_FAILED_PULL', {
import socket
from enum import Enum
from functools import wraps
-from typing import Callable, TypeVar, List, NewType, TYPE_CHECKING, Any
+from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple
from orchestrator import OrchestratorError
if TYPE_CHECKING:
cephadmNoImage = CephadmNoImage.token
+class ContainerInspectInfo(NamedTuple):
+ image_id: str
+ ceph_version: Optional[str]
+ repo_digest: Optional[str]
+
+
def name_to_config_section(name: str) -> ConfEntity:
"""
Map from daemon names to ceph entity names (as seen in config)