check_container_engine,
find_container_engine,
parsed_container_mem_usage,
+ parsed_container_cpu_perc,
pull_command,
registry_login,
)
seen_memusage = {} # type: Dict[str, int]
seen_cpuperc = {} # type: Dict[str, str]
seen_memusage_cid_len, seen_memusage = parsed_container_mem_usage(ctx)
-
- out, err, code = call(
- ctx,
- [container_path, 'stats', '--format', '{{.ID}},{{.CPUPerc}}', '--no-stream'],
- verbosity=CallVerbosity.QUIET
- )
- seen_cpuperc_cid_len, seen_cpuperc = _parse_cpu_perc(code, out)
+ seen_cpuperc_cid_len, seen_cpuperc = parsed_container_cpu_perc(ctx)
# /var/lib/ceph
if os.path.exists(data_dir):
return ls
-def _parse_cpu_perc(code: int, out: str) -> Tuple[int, Dict[str, str]]:
- seen_cpuperc = {}
- seen_cpuperc_cid_len = 0
- if not code:
- for line in out.splitlines():
- (cid, cpuperc) = line.split(',')
- try:
- seen_cpuperc[cid] = cpuperc
- if not seen_cpuperc_cid_len:
- seen_cpuperc_cid_len = len(cid)
- except ValueError:
- logger.info('unable to parse cpu percentage line\n>{}'.format(line))
- pass
- return seen_cpuperc_cid_len, seen_cpuperc
-
-
def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None):
# type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str]