##################################
+
class BaseConfig:
def __init__(self):
}
}
+
class termcolor:
yellow = '\033[93m'
red = '\033[31m'
return version
##################################
+
+
def populate_files(config_dir, config_files, uid, gid):
# type: (str, Dict, int, int) -> None
"""create config files for different services"""
os.fchmod(f.fileno(), 0o600)
f.write(config_content)
+
class NFSGanesha(object):
"""Defines a NFS-Ganesha container"""
##################################
+
class HAproxy(object):
"""Defines an HAproxy container"""
daemon_type = 'haproxy'
##################################
+
class PortOccupiedError(Error):
pass
##################################
+
# this is an abbreviated version of
# https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py
# that drops all of the compatibility (this is Unix/Linux only).
asyncio.set_event_loop(None)
loop.close()
+
def call(ctx: CephadmContext,
command: List[str],
desc: Optional[str] = None,
call_throws(ctx, ['systemctl', 'restart',
get_unit_name(fsid, daemon_type, daemon_id)])
+
def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, background=False):
# type: (CephadmContext, IO[str], CephContainer, Optional[str], Optional[bool]) -> None
if comment:
firewall.open_ports(fw_ports)
firewall.apply_rules()
+
def install_base_units(ctx, fsid):
# type: (CephadmContext, str) -> None
"""
time.sleep(sleep_secs)
raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
+
##################################
r['repo_digests'] = digests[1:-1].split(' ')
return r
-
##################################
+
+
def check_subnet(subnets: str) -> Tuple[int, List[int], str]:
"""Determine whether the given string is a valid subnet
return rc, list(versions), ", ".join(errors)
+
def unwrap_ipv6(address):
# type: (str) -> str
if address.startswith('[') and address.endswith(']'):
##################################
+
def command_registry_login(ctx: CephadmContext):
if ctx.registry_json:
logger.info("Pulling custom registry login info from %s." % ctx.registry_json)
"options or --registry-json option")
return 0
+
def registry_login(ctx: CephadmContext, url, username, password):
logger.info("Logging into custom registry.")
try:
return d
raise Error('Daemon not found: {}. See `cephadm ls`'.format(name))
-
##################################
+
@default_image
def command_adopt(ctx):
# type: (CephadmContext) -> None
logger.info('Disabling old systemd unit %s...' % unit_name)
call_throws(ctx, ['systemctl', 'disable', unit_name])
-
##################################
+
def command_rm_daemon(ctx):
# type: (CephadmContext) -> None
l = FileLock(ctx, ctx.fsid)
if os.path.exists(files[n]):
os.remove(files[n])
-
##################################
+
def check_time_sync(ctx, enabler=None):
# type: (CephadmContext, Optional[Packager]) -> bool
units = [
##################################
+
def get_ipv4_address(ifname):
# type: (str) -> str
def _extract(sock, offset):
return content
return "Unknown"
-
##################################
+
+
class HostFacts():
_dmi_path_list = ['/sys/class/dmi/id']
_nic_path_list = ['/sys/class/net']
##################################
+
def command_gather_facts(ctx: CephadmContext):
"""gather_facts is intended to provide host releated metadata to the caller"""
host = HostFacts(ctx)
print(host.dump())
-
##################################
+
def command_verify_prereqs(ctx: CephadmContext):
if ctx.service_type == 'haproxy' or ctx.service_type == 'keepalived':
out, err, code = call(
cephadm_cache: CephadmCache
token: str
+
class CephadmDaemonHandler(BaseHTTPRequestHandler):
server: CephadmHTTPServer
api_version = 'v1'
exporter.run()
-
##################################
+
def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool:
# TODO: UNITTEST
return os.path.exists(
else:
return f"success - systemd target {target} enabled and started"
-
##################################
+
def _get_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(