ctx, container_name, container_path=container_path
)
return _parse_container_stats(out, err, code)
+
+
+def _container_image_stats(
+ ctx: CephadmContext, image_name: str, *, container_path: str = ''
+) -> Tuple[str, str, int]:
+ """returns image id, created time, and ceph version if available"""
+ container_path = container_path or ctx.container_engine.path
+ cmd = [
+ container_path,
+ 'image',
+ 'inspect',
+ '--format',
+ '{{.Id}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}',
+ image_name,
+ ]
+ out, err, code = call(ctx, cmd, verbosity=CallVerbosity.QUIET)
+ return out, err, code
+
+
+def _parse_container_image_stats(
+ image_name: str,
+ out: str,
+ err: str,
+ code: int,
+) -> Optional[ContainerInfo]:
+ if code != 0:
+ return None
+ (image_id, start, version) = out.strip().split(',')
+ # keep in mind, the daemon container is not running, so no container id here
+ return ContainerInfo(
+ container_id='',
+ image_name=image_name,
+ image_id=image_id,
+ start=start,
+ version=version,
+ )
+
+
+def parsed_container_image_stats(
+ ctx: CephadmContext, image_name: str, *, container_path: str = ''
+) -> Optional[ContainerInfo]:
+ out, err, code = _container_image_stats(
+ ctx, image_name, container_path=container_path
+ )
+ return _parse_container_image_stats(image_name, out, err, code)