return "func" in self._args
- def has(self, name: str) -> bool:
+ def __contains__(self, name: str) -> bool:
return hasattr(self, name)
if not is_fsid(daemon['fsid']):
# 'unknown' fsid
continue
- elif ctx.has("name") or not ctx.name:
+ elif "name" in ctx or not ctx.name:
# ctx.name not specified
fsids_set.add(daemon['fsid'])
elif daemon['name'] == ctx.name:
@wraps(func)
def _default_image(ctx: CephadmContext):
if not ctx.image:
- if ctx.has("name") and ctx.name:
+ if "name" in ctx and ctx.name:
type_ = ctx.name.split('.', 1)[0]
if type_ in Monitoring.components:
ctx.image = Monitoring.components[type_]['image']
config = None
keyring = None
- if ctx.has("config_json") and ctx.config_json:
+ if "config_json" in ctx and ctx.config_json:
d = get_parm(ctx.config_json)
config = d.get('config')
keyring = d.get('keyring')
- if ctx.has("config") and ctx.config:
+ if "config" in ctx and ctx.config:
with open(ctx.config, 'r') as f:
config = f.read()
- if ctx.has("key") and ctx.key:
+ if "key" in ctx and ctx.key:
keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key)
- elif ctx.has("keyring") and ctx.keyring:
+ elif "keyring" in ctx and ctx.keyring:
with open(ctx.keyring, 'r') as f:
keyring = f.read()
if not check_time_sync(ctx):
errors.append('ERROR: No time synchronization is active')
- if ctx.has("expect_hostname") and ctx.expect_hostname:
+ if "expect_hostname" in ctx and ctx.expect_hostname:
if get_hostname().lower() != ctx.expect_hostname.lower():
errors.append('ERROR: hostname "%s" does not match expected hostname "%s"' % (
get_hostname(), ctx.expect_hostname))
# the service
check_time_sync(ctx, enabler=pkg)
- if ctx.has("expect_hostname") and ctx.expect_hostname and ctx.expect_hostname != get_hostname():
+ if "expect_hostname" in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname():
logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), ctx.expect_hostname))
call_throws(ctx, ['hostname', ctx.expect_hostname])
with open('/etc/hostname', 'w') as f: