concurrent_tasks,
)
from cephadmlib.container_engines import (
+ ImageInfo,
Podman,
check_container_engine,
find_container_engine,
MemUsageStatusUpdater,
VersionStatusUpdater,
)
-from cephadmlib.container_lookup import get_container_info
FuncT = TypeVar('FuncT', bound=Callable)
ctx.image = _get_default_image(ctx)
-def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
- """
- Infer the local ceph image based on the following priority criteria:
- 1- the image specified by --image arg (if provided).
- 2- the same image as the daemon container specified by --name arg (if provided).
- 3- image used by any ceph container running on the host. In this case we use daemon types.
- 4- if no container is found then we use the most ceph recent image on the host.
-
- Note: any selected container must have the same fsid inferred previously.
+def infer_local_ceph_image(
+ ctx: CephadmContext, container_path: str = ''
+) -> Optional[str]:
+ """Infer the best ceph image to use based on the following criteria:
+ Out of all images labeled as ceph that are non-dangling, prefer
+ 1. the same image as the daemon container specified by -name arg (if provided).
+ 2. the image used by any ceph container running on the host
+ 3. the most ceph recent image on the host
- :return: The most recent local ceph image (already pulled)
+ :return: An image name or none
"""
+ from operator import itemgetter
+
+ # enumerate ceph images on the system
images = parsed_container_image_list(
ctx,
filters=['dangling=false', 'label=ceph=True'],
container_path=container_path,
)
+ if not images:
+ logger.warning('No non-dangling ceph images found')
+ return None # no images at all cached on host
+
+ # find running ceph daemons
+ _daemons = ceph_daemons()
+ daemon_name = getattr(ctx, 'name', '')
+ _cinfo_key = '_container_info'
+ _updater = CoreStatusUpdater(keep_container_info=_cinfo_key)
+ matching_daemons = [
+ itemgetter(_cinfo_key, 'name')(_updater.expand(ctx, entry))
+ for entry in daemons_matching(
+ ctx, fsid=ctx.fsid, daemon_type_predicate=lambda t: t in _daemons
+ )
+ ]
+ # collect the running ceph daemon image ids
+ images_in_use_by_daemon = set(
+ d.image_id for d, n in matching_daemons if n == daemon_name
+ )
+ images_in_use = set(d.image_id for d, _ in matching_daemons)
+
+ # prioritize images
+ def _keyfunc(image: ImageInfo) -> Tuple[bool, bool, str]:
+ return (
+ bool(
+ image.digest
+ and any(
+ v.startswith(image.image_id)
+ for v in images_in_use_by_daemon
+ )
+ ),
+ bool(
+ image.digest
+ and any(v.startswith(image.image_id) for v in images_in_use)
+ ),
+ image.created,
+ )
- container_info = None
- daemon_name = ctx.name if ('name' in ctx and ctx.name and '.' in ctx.name) else None
- daemons_ls = [daemon_name] if daemon_name is not None else ceph_daemons() # daemon types: 'mon', 'mgr', etc
- for daemon in daemons_ls:
- container_info = get_container_info(ctx, daemon, daemon_name is not None)
- if container_info is not None:
- logger.debug(f"Using container info for daemon '{daemon}'")
- break
-
- for image in images:
- if container_info is not None and image.image_id not in container_info.image_id:
- continue
- if image.digest:
- logger.info(f"Using ceph image with id '{image.image_id}' and tag '{image.tag}' created on {image.created}\n{image.name}")
- return image.name
- if container_info is not None:
- logger.warning(f"Not using image '{container_info.image_id}' as it's not in list of non-dangling images with ceph=True label")
- return None
+ images.sort(key=_keyfunc, reverse=True)
+ best_image = images[0]
+ name_match, ceph_match, _ = _keyfunc(best_image)
+ reason = 'not in the list of non-dangling images with ceph=True label'
+ if images_in_use_by_daemon and not name_match:
+ expected = list(images_in_use_by_daemon)[0]
+ logger.warning(
+ 'Not using image %r of named daemon: %s',
+ expected,
+ reason,
+ )
+ if images_in_use and not ceph_match:
+ expected = list(images_in_use)[0]
+ logger.warning(
+ 'Not using image %r of ceph daemon: %s',
+ expected,
+ reason,
+ )
+ return best_image.name
def get_log_dir(fsid, log_dir):
'expected': 'quay.io/ceph/ceph@sha256:939a46c06b334e094901560c8346de33c00309e3e3968a2db240eb4897c6a508',
},
# ceph images in local store do not match running instance
+ # return a valid ceph image
{
'container_info': _container_info(
'a1bb549714b8f007c6a4e29c758689cf9e8e69f2e0f51180506492974b90a972',
quay.ceph.io/ceph-ci/ceph@sha256:eeddcc536bb887b36b959e887d5984dd7a3f008a23aa1f283ab55d48b22c6185|dad864ee21e9|pacific|2022-03-23 16:29:19 +0000 UTC
'''
),
- 'expected': None,
+ 'expected': 'quay.ceph.io/ceph-ci/ceph@sha256:87f200536bb887b36b959e887d5984dd7a3f008a23aa1f283ab55d48b22c6185',
},
# ceph image in store have been pulled (new image) since daemon started
{
out = params.get('images_output', '')
expected = params.get('expected', None)
funkypatch.patch('cephadmlib.call_wrappers.call').return_value = out, '', 0
- funkypatch.patch('cephadm.get_container_info').return_value = cinfo
+ funkypatch.patch('cephadmlib.listing_updaters.CoreStatusUpdater')().expand.return_value = {
+ '_container_info': cinfo,
+ 'name': 'mon.foo',
+ }
+ funkypatch.patch('cephadmlib.listing.daemons_matching').return_value = [0] if cinfo else []
image = _cephadm.infer_local_ceph_image(
ctx, ctx.container_engine
)
):
import cephadmlib.listing
import cephadmlib.daemon_identity
+ from cephadmlib.container_lookup import get_container_info
ctx = _cephadm.CephadmContext()
ctx.fsid = '00000000-0000-0000-0000-0000deadbeef'
'cephadmlib.container_types.get_container_stats'
).return_value = cinfo
assert (
- _cephadm.get_container_info(ctx, daemon_filter, by_name) == output
+ get_container_info(ctx, daemon_filter, by_name) == output
)
def test_get_container_info_daemon_down(self, funkypatch):
import cephadmlib.listing
import cephadmlib.daemon_identity
+ from cephadmlib.container_lookup import get_container_info
+
funkypatch.patch('cephadmlib.systemd.check_unit').return_value = (True, 'stopped', '')
funkypatch.patch('cephadmlib.call_wrappers.call').side_effect = ValueError
_get_stats_by_name = funkypatch.patch('cephadmlib.container_engines.parsed_container_image_stats')
# redundant
_get_stats_by_name.return_value = expected_container_info
- assert _cephadm.get_container_info(ctx, 'osd.2', by_name=True) == expected_container_info
+ assert get_container_info(ctx, 'osd.2', by_name=True) == expected_container_info
assert not _get_stats.called, 'only get_container_stats_by_image_name should have been called'
# If there is one down and one up daemon of the same name, it should use the up one
start='the_past',
version='')
- assert _cephadm.get_container_info(ctx, 'osd.2', by_name=True) == expected_container_info
+ assert get_container_info(ctx, 'osd.2', by_name=True) == expected_container_info
def test_should_log_to_journald(self):
from cephadmlib import context_getters