def extract_uid_gid(img='', file_path='/var/lib/ceph'):
- # type: (str, str) -> Tuple[int, int]
+ # type: (str, Union[str, Iterabe[str]]) -> Tuple[int, int]
if not img:
img = args.image
- out = CephContainer(
- image=img,
- entrypoint='stat',
- args=['-c', '%u %g', file_path]
- ).run()
- (uid, gid) = out.split(' ')
- return int(uid), int(gid)
+ if isinstance(file_path, str):
+ paths = [file_path]
+ else:
+ paths = file_path
+
+ for fp in paths:
+ try:
+ out = CephContainer(
+ image=img,
+ entrypoint='stat',
+ args=['-c', '%u %g', fp]
+ ).run()
+ uid, gid = out.split(' ')
+ return int(uid), int(gid)
+ except RuntimeError:
+ pass
+ raise RuntimeError('uid/gid not found')
def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
elif daemon_type == 'grafana':
uid, gid = extract_uid_gid(file_path='/var/lib/grafana')
elif daemon_type == 'alertmanager':
- uid, gid = extract_uid_gid(file_path='/etc/alertmanager')
+ uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
raise Error("{} not implemented yet".format(daemon_type))
return uid, gid