},
},
'handlers': {
- 'console':{
- 'level':'INFO',
- 'class':'logging.StreamHandler',
+ 'console': {
+ 'level': 'INFO',
+ 'class': 'logging.StreamHandler',
},
'log_file': {
'level': 'DEBUG',
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
- mounts[os.path.join(data_dir,'haproxy')] = '/var/lib/haproxy'
+ mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy'
return mounts
##################################
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
- mounts[os.path.join(data_dir,'keepalived.conf')] = '/etc/keepalived/keepalived.conf'
+ mounts[os.path.join(data_dir, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf'
return mounts
##################################
##################################
-def check_subnet(subnets:str) -> Tuple[int, List[int], str]:
+def check_subnet(subnets: str) -> Tuple[int, List[int], str]:
"""Determine whether the given string is a valid subnet
:param subnets: subnet string, a single definition or comma separated list of CIDR subnets
if not version or '.' not in version:
version = seen_versions.get(image_id, None)
if daemon_type == NFSGanesha.daemon_type:
- version = NFSGanesha.get_version(ctx,container_id)
+ version = NFSGanesha.get_version(ctx, container_id)
if daemon_type == CephIscsi.daemon_type:
- version = CephIscsi.get_version(ctx,container_id)
+ version = CephIscsi.get_version(ctx, container_id)
elif not version:
if daemon_type in Ceph.daemons:
out, err, code = call(ctx,
field = iface_setting.split()
if field[-1] == ifname:
ipv6_raw = field[0]
- ipv6_fmtd = ":".join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]),4)])
+ ipv6_fmtd = ":".join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)])
# apply naming rules using ipaddress module
ipv6 = ipaddress.ip_address(ipv6_fmtd)
return "{}/{}".format(str(ipv6), int('0x{}'.format(field[2]), 16))
out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
if out:
param_list = out.split('\n')
- param_dict = {param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list}
+ param_dict = {param.split(" = ")[0]: param.split(" = ")[-1] for param in param_list}
# return only desired parameters
if 'net.ipv4.ip_nonlocal_bind' in param_dict:
data = json.dumps(self.server.cephadm_cache.health)
self.send_response(status_code)
- self.send_header('Content-type','application/json')
+ self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(data.encode('utf-8'))
else:
# Invalid GET URL
bad_request_msg = "Valid URLs are: {}".format(', '.join(CephadmDaemonHandler.valid_routes))
self.send_response(404, message=bad_request_msg) # reason
- self.send_header('Content-type','application/json')
+ self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"message": bad_request_msg}).encode('utf-8'))