DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
-logger: logging.Logger = None # type: ignore
+logger: logging.Logger = None # type: ignore
##################################
"""
if require and key not in d.keys():
raise Error('{} missing from dict'.format(key))
- return d.get(key, default) # type: ignore
+ return d.get(key, default) # type: ignore
##################################
logger.debug('copy directory \'%s\' -> \'%s\'' % (src_dir, dst_dir))
shutil.rmtree(dst_dir, ignore_errors=True)
- shutil.copytree(src_dir, dst_dir) # dirs_exist_ok needs python 3.8
+ shutil.copytree(src_dir, dst_dir) # dirs_exist_ok needs python 3.8
for dirpath, dirnames, filenames in os.walk(dst_dir):
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dirpath))
raise Error('inspect {}: empty result'.format(image))
r = {
'image_id': normalize_container_id(image_id)
- } # type: Dict[str, Union[str,List[str]]]
+ } # type: Dict[str, Union[str,List[str]]]
if digests:
r['repo_digests'] = digests[1:-1].split(' ')
return r
ctx: CephadmContext,
uid: int, gid: int,
mgr_id: str
-) -> Tuple[str, str, str, Any, Any]: # type: ignore
+) -> Tuple[str, str, str, Any, Any]: # type: ignore
_image = ctx.image
# wait for the service to become available
def is_mon_available():
# type: () -> bool
- timeout = ctx.timeout if ctx.timeout else 60 # seconds
+ timeout = ctx.timeout if ctx.timeout else 60 # seconds
out, err, ret = call(ctx, c.run_cmd(),
desc=c.entrypoint,
timeout=timeout)
logger.info('Waiting for mgr to start...')
def is_mgr_available():
# type: () -> bool
- timeout = ctx.timeout if ctx.timeout else 60 # seconds
+ timeout = ctx.timeout if ctx.timeout else 60 # seconds
try:
out = clifunc(['status', '-f', 'json-pretty'], timeout=timeout)
j = json.loads(out)
with open(auth_keys_file, 'r') as f:
f.seek(0, os.SEEK_END)
if f.tell() > 0:
- f.seek(f.tell() - 1, os.SEEK_SET) # go to last char
+ f.seek(f.tell() - 1, os.SEEK_SET) # go to last char
if f.read() != '\n':
add_newline = True
with open(auth_keys_file, 'a') as f:
- os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
+ os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
os.fchmod(f.fileno(), 0o600) # just in case we created it
if add_newline:
f.write('\n')
logger.info('%s daemon %s ...' % ('Deploy', ctx.name))
# Get and check ports explicitly required to be opened
- daemon_ports = [] # type: List[int]
+ daemon_ports = [] # type: List[int]
# only check port in use if not reconfig or redeploy since service
# we are redeploying/reconfiguring will already be using the port
daemon_ports.extend(Monitoring.port_map[daemon_type])
# make sure provided config-json is sufficient
- config = get_parm(ctx.config_json) # type: ignore
+ config = get_parm(ctx.config_json) # type: ignore
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
required_args = Monitoring.components[daemon_type].get('config-json-args', list())
if required_files:
if not ctx.keyring and os.path.exists(SHELL_DEFAULT_KEYRING):
ctx.keyring = SHELL_DEFAULT_KEYRING
- container_args = [] # type: List[str]
+ container_args = [] # type: List[str]
mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id,
no_config=True if ctx.config else False)
binds = get_container_binds(ctx, ctx.fsid, daemon_type, daemon_id)
if not ctx.fsid:
raise Error('must pass --fsid to specify cluster')
(daemon_type, daemon_id) = ctx.name.split('.', 1)
- container_args = [] # type: List[str]
+ container_args = [] # type: List[str]
if ctx.command:
command = ctx.command
else:
l = FileLock(ctx, ctx.fsid)
l.acquire()
- (uid, gid) = (0, 0) # ceph-volume runs as root
+ (uid, gid) = (0, 0) # ceph-volume runs as root
mounts = get_container_mounts(ctx, ctx.fsid, 'osd', None)
tmp_config = None
# call this directly, without our wrapper, so that we get an unmolested
# stdout with logger prefixing.
logger.debug("Running command: %s" % ' '.join(cmd))
- subprocess.call(cmd) # type: ignore
+ subprocess.call(cmd) # type: ignore
##################################
else:
logger.warning('version for unknown daemon type %s' % daemon_type)
else:
- vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore
+ vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore
try:
with open(vfile, 'r') as f:
image_name = f.read().strip() or None
pass
# unit.meta?
- mfile = os.path.join(data_dir, fsid, j, 'unit.meta') # type: ignore
+ mfile = os.path.join(data_dir, fsid, j, 'unit.meta') # type: ignore
try:
with open(mfile, 'r') as f:
meta = json.loads(f.read())
# type: (CephadmContext, Optional[Packager]) -> bool
units = [
'chrony.service', # 18.04 (at least)
- 'chronyd.service', # el / opensuse
+ 'chronyd.service', # el / opensuse
'systemd-timesyncd.service',
- 'ntpd.service', # el7 (at least)
+ 'ntpd.service', # el7 (at least)
'ntp.service', # 18.04 (at least)
'ntpsec.service', # 20.04 (at least) / buster
]
else:
summary[mode] = 0
summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()])
- security = {**security, **summary} # type: ignore
+ security = {**security, **summary} # type: ignore
security['description'] += "({})".format(summary_str)
return security
av = sys.argv[1:]
ctx = cephadm_init(av)
- if not ctx: # error, exit
+ if not ctx: # error, exit
sys.exit(1)
try: