import tempfile
import time
try:
- from typing import Tuple, Optional
+ from typing import Dict, List, Tuple, Optional, Union
except ImportError:
pass
import uuid
##################################
-def pathify(p): # type: (str) -> str
+def pathify(p):
+ # type: (str) -> str
if not p.startswith('/'):
return os.path.join(os.getcwd(), p)
return p
-def get_hostname(): # type: () -> str
+def get_hostname():
+ # type: () -> str
return socket.gethostname()
-def get_fqdn(): # type: () -> str
+def get_fqdn():
+ # type: () -> str
return socket.getfqdn() or socket.gethostname()
-def generate_password(): # type: () -> str
+def generate_password():
+ # type: () -> str
return ''.join(random.choice(string.ascii_lowercase + string.digits)
for i in range(10))
-def make_fsid(): # type: () -> str
+def make_fsid():
+ # type: () -> str
return str(uuid.uuid1())
-def is_fsid(s): # type: (str) -> bool
+def is_fsid(s):
+ # type: (str) -> bool
try:
uuid.UUID(s)
except ValueError:
return False
return True
-def makedirs(dir, uid, gid, mode): # type: (str, int, int, int) -> None
+def makedirs(dir, uid, gid, mode):
+ # type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.makedirs(dir, mode=mode)
else:
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
-def get_data_dir(fsid, t, n): # type: (str, str, int) -> str
+def get_data_dir(fsid, t, n):
+ # type: (str, str, Union[int, str]) -> str
return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
-def get_log_dir(fsid): # type: (str) -> str
+def get_log_dir(fsid):
+ # type: (str) -> str
return os.path.join(args.log_dir, fsid)
-def make_data_dir_base(fsid, uid, gid): # type: (str, int, int) -> str
+def make_data_dir_base(fsid, uid, gid):
+ # type: (str, int, int) -> str
data_dir_base = os.path.join(args.data_dir, fsid)
makedirs(data_dir_base, uid, gid, DATA_DIR_MODE)
makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE)
DATA_DIR_MODE)
return data_dir_base
-def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None): # type: (str, str, int, int, int) -> str
+def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
+ # type: (str, str, Union[int, str], int, int) -> str
if not uid or not gid:
(uid, gid) = extract_uid_gid()
make_data_dir_base(fsid, uid, gid)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
-def make_log_dir(fsid, uid=None, gid=None): # type: (str, int, int) -> str
+def make_log_dir(fsid, uid=None, gid=None):
+ # type: (str, int, int) -> str
if not uid or not gid:
(uid, gid) = extract_uid_gid()
log_dir = get_log_dir(fsid)
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
-def find_program(filename): # type: (str) -> str
+def find_program(filename):
+ # type: (str) -> str
name = find_executable(filename)
if name is None:
raise ValueError('%s not found' % filename)
return name
-def get_unit_name(fsid, daemon_type, daemon_id=None): # type (str, str, int) -> str
+def get_unit_name(fsid, daemon_type, daemon_id=None):
+ # type (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
if daemon_id is not None:
return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
-def check_unit(unit_name): # type: (str) -> Tuple[bool, str]
+def check_unit(unit_name):
+ # type: (str) -> Tuple[bool, str]
# NOTE: we ignore the exit code here because systemctl outputs
# various exit codes based on the state of the service, but the
# string result is more explicit (and sufficient).
state = 'unknown'
return (enabled, state)
-def get_legacy_config_fsid(cluster, legacy_dir=None): # type: (str, str) -> Optional[str]
+def get_legacy_config_fsid(cluster, legacy_dir=None):
+ # type: (str, str) -> Optional[str]
config_file = '/etc/ceph/%s.conf' % cluster
if legacy_dir is not None:
config_file = os.path.abspath(legacy_dir + config_file)
return None
def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
+ # type: (str, str, Union[int, str], str) -> Optional[str]
fsid = None
if daemon_type == 'osd':
try:
return fsid
def get_daemon_args(fsid, daemon_type, daemon_id):
+ # type: (str, str, Union[int, str]) -> List[str]
r = [
'--default-log-to-file=false',
'--default-log-to-stderr=true',
def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
config=None, keyring=None):
+ # type: (str, str, Union[int, str], int, int, str, str) -> None
data_dir = make_data_dir(fsid, daemon_type, daemon_id)
make_log_dir(fsid)
f.write(keyring)
def get_config_and_keyring():
+ # type: () -> Tuple[str, str]
if args.config_and_keyring:
if args.config_and_keyring == '-':
try:
- j = injected_stdin
+ j = injected_stdin # type: ignore
except NameError:
j = sys.stdin.read()
else:
return (config, keyring)
def get_config_and_both_keyrings():
+ # type: () -> Tuple[str, str, Optional[str]]
if args.config_and_keyrings:
if args.config_and_keyrings == '-':
try:
- j = injected_stdin
+ j = injected_stdin # type: ignore
except NameError:
j = sys.stdin.read()
else:
return (config, keyring, crash_keyring)
def get_container_mounts(fsid, daemon_type, daemon_id):
+ # type: (str, str, Union[int, str, None]) -> Dict[str, str]
mounts = {}
if fsid:
run_path = os.path.join('/var/run/ceph', fsid);
return mounts
def get_container(fsid, daemon_type, daemon_id, privileged=False,
- podman_args=None):
- if not podman_args:
- podman_args = []
+ podman_args=[]):
+ # type: (str, str, Union[int, str], bool, List[str]) -> CephContainer
if daemon_type in ['mon', 'osd'] or privileged:
# mon and osd need privileged in order for libudev to query devices
podman_args += ['--privileged']
cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
)
-def extract_uid_gid(): # type: () -> Tuple[int, int]
+def extract_uid_gid():
+ # type: () -> Tuple[int, int]
out = CephContainer(
image=args.image,
entrypoint='/usr/bin/grep',
return (int(uid), int(gid))
def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
- config=None, keyring=None):
+ config, keyring):
+ # type: (str, str, Union[int, str], CephContainer, int, int, str, str) -> None
if daemon_type == 'mon' and not os.path.exists(get_data_dir(fsid, 'mon',
daemon_id)):
# tmp keyring file
image=args.image,
entrypoint='/usr/bin/ceph-mon',
args=['--mkfs',
- '-i', daemon_id,
+ '-i', str(daemon_id),
'--fsid', fsid,
'-c', '/tmp/config',
'--keyring', '/tmp/keyring',
entrypoint='/usr/sbin/ceph-volume',
args=[
'lvm', 'activate',
- daemon_id, args.osd_fsid,
+ str(daemon_id), args.osd_fsid,
'--no-systemd'
],
podman_args=['--privileged'],
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True):
+ # type: (str, int, int, str, Union[int, str], CephContainer, bool, bool) -> None
# cmd
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
with open(data_dir + '/cmd', 'w') as f:
call_throws(['systemctl', 'start', unit_name])
def install_base_units(fsid):
+ # type: (str) -> None
"""
Set up ceph.target and ceph-$fsid.target units.
"""
""" % fsid)
def deploy_crash(fsid, uid, gid, config, keyring):
+ # type: (str, int, int, str, str) -> None
crash_dir = os.path.join(args.data_dir, fsid, 'crash')
makedirs(crash_dir, uid, gid, DATA_DIR_MODE)
subprocess.check_output(['systemctl', 'start', unit_name])
def get_unit_file(fsid, uid, gid):
+ # type: (str, int, int) -> str
install_path = find_program('install')
u = """[Unit]
Description=Ceph daemon for {fsid}
return u
def gen_ssh_key(fsid):
+ # type: (str) -> Tuple[str, str]
tmp_dir = TemporaryDirectory()
path = tmp_dir.name + '/key'
call_throws([
volume_mounts={},
cname='',
podman_args=[]):
+ # type: (str, str, List[str], Dict[str, str], str, List[str]) -> None
self.image = image
self.entrypoint = entrypoint
self.args = args
self.podman_args = podman_args
def run_cmd(self):
+ # type: () -> List[str]
+ vols = [] # type: List[str]
+ envs = [] # type: List[str]
+ cname = [] # type: List[str]
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
]
cname = ['--name', self.cname] if self.cname else []
return [
- podman_path,
+ str(podman_path),
'run',
'--rm',
'--net=host',
- ] + self.podman_args + cname + envs + vols + [
+ ] + self.podman_args + \
+ cname + envs + \
+ vols + \
+ [
'--entrypoint', self.entrypoint,
self.image
- ] + self.args
+ ] + self.args # type: ignore
def shell_cmd(self, cmd):
+ # type: (List[str]) -> List[str]
+ vols = [] # type: List[str]
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
'-e', 'CONTAINER_IMAGE=%s' % self.image,
'-e', 'NODE_NAME=%s' % get_hostname(),
]
- cmd_args = []
+ cmd_args = [] # type: List[str]
if cmd:
cmd_args = ['-c'] + cmd
return [
- podman_path,
+ str(podman_path),
'run',
'--net=host',
] + self.podman_args + envs + vols + [
] + cmd[1:]
def exec_cmd(self, cmd):
+ # type: (List[str]) -> List[str]
return [
- podman_path,
+ str(podman_path),
'exec',
] + self.podman_args + [
self.cname,
] + cmd
def run(self):
+ # type: () -> str
logger.debug(self.run_cmd())
out, _, _ = call_throws(self.run_cmd(), desc=self.entrypoint)
return out
##################################
def command_version():
+ # type: () -> int
out = CephContainer(args.image, 'ceph', ['--version']).run()
print(out.strip())
return 0
##################################
def command_bootstrap():
+ # type: () -> int
fsid = args.fsid or make_fsid()
hostname = get_hostname()
mon_id = args.mon_id or hostname
tmp_config.flush()
# a CLI helper to reduce our typing
- def cli(cmd, extra_mounts=None):
+ def cli(cmd, extra_mounts={}):
+ # type: (List[str], Dict[str, str]) -> str
mounts = {
log_dir: '/var/log/ceph:z',
tmp_admin_keyring.name: '/etc/ceph/ceph.client.admin.keyring:z',
tmp_config.name: '/etc/ceph/ceph.conf:z',
}
- if extra_mounts:
- for k, v in extra_mounts.items():
- mounts[k] = v
+ for k, v in extra_mounts.items():
+ mounts[k] = v
return CephContainer(
image=args.image,
entrypoint='/usr/bin/ceph',
logger.info('Enabling the dashboard module...')
cli(['mgr', 'module', 'enable', 'dashboard'])
logger.info('Waiting for the module to be available...')
+ # FIXME: potential for an endless loop?
while True:
- c = cli(['-h'])
- if 'dashboard' in c:
+ c_out = cli(['-h'])
+ if 'dashboard' in c_out:
break
logger.info('Dashboard not yet available, waiting...')
time.sleep(1)
##################################
def command_deploy():
+ # type: () -> None
(daemon_type, daemon_id) = args.name.split('.', 1)
if daemon_type not in ['mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror']:
raise RuntimeError('daemon type %s not recognized' % daemon_type)
##################################
def command_run():
+ # type: () -> int
(daemon_type, daemon_id) = args.name.split('.', 1)
c = get_container(args.fsid, daemon_type, daemon_id)
return subprocess.call(c.run_cmd())
##################################
def command_shell():
+ # type: () -> int
if args.fsid:
make_log_dir(args.fsid)
if args.name:
##################################
def command_enter():
+ # type: () -> int
(daemon_type, daemon_id) = args.name.split('.', 1)
- podman_args = []
+ podman_args = [] # type: List[str]
if args.command:
command = args.command
else:
##################################
def command_ceph_volume():
+ # type: () -> None
make_log_dir(args.fsid)
mounts = get_container_mounts(args.fsid, 'osd', None)
##################################
def command_unit():
+ # type: () -> None
(daemon_type, daemon_id) = args.name.split('.', 1)
unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
call_throws([
##################################
def command_logs():
+ # type: () -> None
cmd = [podman_path, 'logs']
if args.follow:
cmd.append('-f')
# call this directly, without our wrapper, so that we get an unmolested
# stdout with logger prefixing.
- subprocess.call(cmd)
+ subprocess.call(cmd) # type: ignore
##################################
def command_ls():
+ # type: () -> None
ls = list_daemons(detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
def list_daemons(detail=True, legacy_dir=None):
+ # type: (bool, Optional[str]) -> List[Dict[str, str]]
host_version = None
ls = []
##################################
def command_adopt():
+ # type: () -> None
(daemon_type, daemon_id) = args.name.split('.', 1)
(uid, gid) = extract_uid_gid()
if args.style == 'legacy':
##################################
def command_rm_daemon():
+ # type: () -> None
(daemon_type, daemon_id) = args.name.split('.', 1)
if daemon_type in ['mon', 'osd'] and not args.force:
raise RuntimeError('must pass --force to proceed: '
##################################
def command_rm_cluster():
+ # type: () -> None
if not args.force:
raise RuntimeError('must pass --force to proceed: '
'this command may destroy precious data!')
##################################
def _get_parser():
+ # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
description='Bootstrap Ceph daemons with systemd and containers.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)