#!/usr/bin/python3
-DEFAULT_IMAGE='docker.io/ceph/daemon-base:latest-master-devel'
-DEFAULT_IMAGE_IS_MASTER=True
-LATEST_STABLE_RELEASE='octopus'
-DATA_DIR='/var/lib/ceph'
-LOG_DIR='/var/log/ceph'
-LOCK_DIR='/run/cephadm'
-LOGROTATE_DIR='/etc/logrotate.d'
-UNIT_DIR='/etc/systemd/system'
-LOG_DIR_MODE=0o770
-DATA_DIR_MODE=0o700
+DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-master-devel'
+DEFAULT_IMAGE_IS_MASTER = True
+LATEST_STABLE_RELEASE = 'octopus'
+DATA_DIR = '/var/lib/ceph'
+LOG_DIR = '/var/log/ceph'
+LOCK_DIR = '/run/cephadm'
+LOGROTATE_DIR = '/etc/logrotate.d'
+UNIT_DIR = '/etc/systemd/system'
+LOG_DIR_MODE = 0o770
+DATA_DIR_MODE = 0o700
CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker
-CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '
-DEFAULT_TIMEOUT=None # in seconds
-DEFAULT_RETRY=10
-SHELL_DEFAULT_CONF='/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING='/etc/ceph/ceph.client.admin.keyring'
+CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
+DEFAULT_TIMEOUT = None # in seconds
+DEFAULT_RETRY = 10
+SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
+SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
"""
You can invoke cephadm in two ways:
red = '\033[31m'
end = '\033[0m'
+
class Error(Exception):
pass
+
class TimeoutExpired(Error):
pass
##################################
+
class Ceph(object):
daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
'crash')
##################################
+
class Monitoring(object):
"""Define the configs for the monitoring containers"""
##################################
+
class NFSGanesha(object):
"""Defines a NFS-Ganesha container"""
volume_mounts = self.get_container_mounts(data_dir)
envs = self.get_container_envs()
- logger.info('Creating RADOS grace for action: %s' % (action))
+ logger.info('Creating RADOS grace for action: %s' % action)
c = CephContainer(
image=self.image,
entrypoint=entrypoint,
args=args,
volume_mounts=volume_mounts,
- cname=self.get_container_name(desc='grace-%s' % (action)),
+ cname=self.get_container_name(desc='grace-%s' % action),
envs=envs
)
return c
##################################
+
class CephIscsi(object):
"""Defines a Ceph-Iscsi container"""
##################################
+
def get_supported_daemons():
# type: () -> List[str]
supported_daemons = list(Ceph.daemons)
##################################
+
def attempt_bind(s, address, port):
# type: (socket.socket, str, int) -> None
try:
finally:
s.close()
+
def port_in_use(port_num):
# type: (int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
else:
return False
+
def check_ip_port(ip, port):
# type: (str, int) -> None
if not args.skip_ping_check:
except NameError:
TimeoutError = OSError
+
class Timeout(TimeoutError):
"""
Raised when the lock could not be acquired in *timeout*
class FileLock(object):
- def __init__(self, name, timeout = -1):
+ def __init__(self, name, timeout=-1):
if not os.path.exists(LOCK_DIR):
os.mkdir(LOCK_DIR, 0o700)
self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
lock_id, lock_filename, poll_intervall
)
time.sleep(poll_intervall)
- except:
+ except: # noqa
# Something did go wrong, so decrement the counter.
self._lock_counter = max(0, self._lock_counter - 1)
raise
return _Acquire_ReturnProxy(lock = self)
- def release(self, force = False):
+ def release(self, force=False):
"""
Releases the file lock.
Please note, that the lock is only completly released, if the lock
return None
def __del__(self):
- self.release(force = True)
+ self.release(force=True)
return None
-
def _acquire(self):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(self._lock_file, open_mode)
##################################
+
def is_available(what, func):
# type: (str, Callable[[], bool]) -> None
"""
:param func: the callable object that determines availability
"""
retry = args.retry
- logger.info('Waiting for %s...' % (what))
+ logger.info('Waiting for %s...' % what)
num = 1
while True:
if func():
logger.info('%s is available'
- % (what))
+ % what)
break
elif num > retry:
raise Error('%s not available after %s tries'
return cp
+
def pathify(p):
# type: (str) -> str
p = os.path.expanduser(p)
return os.path.abspath(p)
+
def get_file_timestamp(fn):
# type: (str) -> Optional[str]
try:
except Exception as e:
return None
+
def try_convert_datetime(s):
# type: (str) -> Optional[str]
# This is super irritating because
pass
return None
+
def get_podman_version():
# type: () -> Tuple[int, ...]
if 'podman' not in container_path:
out, _, _ = call_throws([container_path, '--version'])
return _parse_podman_version(out)
+
def _parse_podman_version(out):
# type: (str) -> Tuple[int, ...]
_, _, version_str = out.strip().split()
# type: () -> str
return socket.gethostname()
+
def get_fqdn():
# type: () -> str
return socket.getfqdn() or socket.gethostname()
+
def get_arch():
# type: () -> str
return platform.uname().machine
+
def generate_service_id():
# type: () -> str
return get_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
for _ in range(6))
+
def generate_password():
# type: () -> str
return ''.join(random.choice(string.ascii_lowercase + string.digits)
for i in range(10))
+
def normalize_container_id(i):
# type: (str) -> str
# docker adds the sha256: prefix, but AFAICS both
i = i[len(prefix):]
return i
+
def make_fsid():
# type: () -> str
return str(uuid.uuid1())
+
def is_fsid(s):
# type: (str) -> bool
try:
return False
return True
+
def infer_fsid(func):
"""
If we only find a single fsid in /var/lib/ceph/*, use that
return _infer_fsid
+
def infer_config(func):
"""
If we find a MON daemon, use the config from that container
return _infer_config
+
def _get_default_image():
if DEFAULT_IMAGE_IS_MASTER:
warn = '''This is a development version of cephadm.
logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end))
return DEFAULT_IMAGE
+
def infer_image(func):
"""
Use the most recent ceph image
return _infer_image
+
def default_image(func):
@wraps(func)
def _default_image():
return _default_image
+
def get_last_local_ceph_image():
"""
:return: The most recent local ceph image (already pulled)
return r
return None
+
def write_tmp(s, uid, gid):
# type: (str, int, int) -> Any
tmp_f = tempfile.NamedTemporaryFile(mode='w',
return tmp_f
+
def makedirs(dir, uid, gid, mode):
# type: (str, int, int, int) -> None
if not os.path.exists(dir):
os.chown(dir, uid, gid)
os.chmod(dir, mode) # the above is masked by umask...
+
def get_data_dir(fsid, t, n):
# type: (str, str, Union[int, str]) -> str
return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
+
def get_log_dir(fsid):
# type: (str) -> str
return os.path.join(args.log_dir, fsid)
+
def make_data_dir_base(fsid, uid, gid):
# type: (str, int, int) -> str
data_dir_base = os.path.join(args.data_dir, fsid)
DATA_DIR_MODE)
return data_dir_base
+
def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
# type: (str, str, Union[int, str], int, int) -> str
if not uid or not gid:
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
+
def make_log_dir(fsid, uid=None, gid=None):
# type: (str, int, int) -> str
if not uid or not gid:
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
+
def make_var_run(fsid, uid, gid):
# type: (str, int, int) -> None
call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
'/var/run/ceph/%s' % fsid])
+
def copy_tree(src, dst, uid=None, gid=None):
# type: (List[str], str, int, int) -> None
"""
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
def move_files(src, dst, uid=None, gid=None):
# type: (List[str], str, int, int) -> None
"""
logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
os.chown(dst_file, uid, gid)
+
## copied from distutils ##
def find_executable(executable, path=None):
"""Tries to find 'executable' in the directories listed in 'path'.
return f
return None
+
def find_program(filename):
# type: (str) -> str
name = find_executable(filename)
raise ValueError('%s not found' % filename)
return name
+
def get_unit_name(fsid, daemon_type, daemon_id=None):
# type: (str, str, Optional[Union[int, str]]) -> str
# accept either name or type + id
else:
return 'ceph-%s@%s' % (fsid, daemon_type)
+
def get_unit_name_by_daemon_name(fsid, name):
daemon = get_daemon_description(fsid, name)
try:
except KeyError:
raise Error('Failed to get unit name for {}'.format(daemon))
+
def check_unit(unit_name):
# type: (str) -> Tuple[bool, str, bool]
# NOTE: we ignore the exit code here because systemctl outputs
state = 'unknown'
return (enabled, state, installed)
+
def check_units(units, enabler=None):
# type: (List[str], Optional[Packager]) -> bool
for u in units:
enabler.enable_service(u)
return False
+
def get_legacy_config_fsid(cluster, legacy_dir=None):
# type: (str, str) -> Optional[str]
config_file = '/etc/ceph/%s.conf' % cluster
return config.get('global', 'fsid')
return None
+
def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
# type: (str, str, Union[int, str], str) -> Optional[str]
fsid = None
fsid = get_legacy_config_fsid(cluster, legacy_dir=legacy_dir)
return fsid
+
def get_daemon_args(fsid, daemon_type, daemon_id):
# type: (str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
return r
+
def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
config=None, keyring=None):
# type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
-
# populate the config directory for the component from the config-json
for fname in required_files:
if 'files' in config: # type: ignore
ceph_iscsi = CephIscsi.init(fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+
def get_parm(option):
# type: (str) -> Dict[str, str]
else:
return js
+
def get_config_and_keyring():
# type: () -> Tuple[Optional[str], Optional[str]]
config = None
with open(args.keyring, 'r') as f:
keyring = f.read()
- return (config, keyring)
+ return config, keyring
+
def get_container_binds(fsid, daemon_type, daemon_id):
# type: (str, str, Union[int, str, None]) -> List[List[str]]
return binds
+
def get_container_mounts(fsid, daemon_type, daemon_id,
no_config=False):
# type: (str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str]
return mounts
+
def get_container(fsid, daemon_type, daemon_id,
privileged=False,
ptrace=False,
entrypoint = ''
name = ''
- ceph_args = [] # type: List[str]
+ ceph_args = [] # type: List[str]
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(daemon_type)
m = Monitoring.components[daemon_type] # type: ignore
elif daemon_type in Ceph.daemons:
ceph_args = ['-n', name, '-f']
- envs=[] # type: List[str]
+ envs = [] # type: List[str]
if daemon_type == NFSGanesha.daemon_type:
envs.extend(NFSGanesha.get_container_envs())
ptrace=ptrace,
)
+
def extract_uid_gid(img='', file_path='/var/lib/ceph'):
# type: (str, str) -> Tuple[int, int]
args=['-c', '%u %g', file_path]
).run()
(uid, gid) = out.split(' ')
- return (int(uid), int(gid))
+ return int(uid), int(gid)
+
def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
config=None, keyring=None,
call_throws(['systemctl', 'restart',
get_unit_name(fsid, daemon_type, daemon_id)])
+
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
if start:
call_throws(['systemctl', 'start', unit_name])
+
def update_firewalld(daemon_type):
# type: (str) -> None
if args.skip_firewalld:
logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
call_throws([cmd, '--reload'])
+
def install_base_units(fsid):
# type: (str) -> None
"""
}
""" % fsid)
+
def get_unit_file(fsid):
# type: (str) -> str
u = """# generated by cephadm
##################################
+
class CephContainer:
def __init__(self,
image,
def run_cmd(self):
# type: () -> List[str]
- vols = [] # type: List[str]
- envs = [] # type: List[str]
- cname = [] # type: List[str]
- binds = [] # type: List[str]
- entrypoint = [] # type: List[str]
+ vols = [] # type: List[str]
+ envs = [] # type: List[str]
+ cname = [] # type: List[str]
+ binds = [] # type: List[str]
+ entrypoint = [] # type: List[str]
if self.entrypoint:
entrypoint = ['--entrypoint', self.entrypoint]
- priv = [] # type: List[str]
+ priv = [] # type: List[str]
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
def shell_cmd(self, cmd):
# type: (List[str]) -> List[str]
- priv = [] # type: List[str]
+ priv = [] # type: List[str]
if self.privileged:
priv = ['--privileged',
# let OSD etc read block devs that haven't been chowned
'--group-add=disk']
- vols = [] # type: List[str]
+ vols = [] # type: List[str]
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
##################################
+
@infer_image
def command_version():
# type: () -> int
##################################
+
@infer_image
def command_pull():
# type: () -> int
##################################
+
@infer_image
def command_inspect_image():
# type: () -> int
##################################
+
def is_ipv6(address):
# type: (str) -> bool
if address.startswith('[') and address.endswith(']'):
##################################
+
def extract_uid_gid_monitoring(daemon_type):
# type: (str) -> Tuple[int, int]
##################################
+
@infer_image
def command_run():
# type: () -> int
##################################
+
@infer_fsid
@infer_config
@infer_image
##################################
+
@infer_fsid
def command_enter():
# type: () -> int
##################################
+
@infer_fsid
@infer_image
def command_ceph_volume():
##################################
+
@infer_fsid
def command_unit():
# type: () -> None
##################################
+
@infer_fsid
def command_logs():
# type: () -> None
##################################
+
def list_networks():
# type: () -> Dict[str,List[str]]
res.update(_list_ipv6_networks())
return res
+
def _list_ipv4_networks():
out, _, _ = call_throws([find_executable('ip'), 'route', 'ls'])
return _parse_ipv4_route(out)
+
def _parse_ipv4_route(out):
r = {} # type: Dict[str,List[str]]
p = re.compile(r'^(\S+) (.*)scope link (.*)src (\S+)')
r[net].append(ip)
return r
+
def _list_ipv6_networks():
routes, _, _ = call_throws([find_executable('ip'), '-6', 'route', 'ls'])
ips, _, _ = call_throws([find_executable('ip'), '-6', 'addr', 'ls'])
return _parse_ipv6_route(routes, ips)
+
def _parse_ipv6_route(routes, ips):
r = {} # type: Dict[str,List[str]]
route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) pref (\S+)$')
return r
+
def command_list_networks():
# type: () -> None
r = list_networks()
##################################
+
def command_ls():
# type: () -> None
ls = list_daemons(detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
+
def list_daemons(detail=True, legacy_dir=None):
# type: (bool, Optional[str]) -> List[Dict[str, str]]
host_version = None
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_grafana(daemon_id, fsid):
# type: (str, str) -> None
else:
logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key))
-
# data - possible custom dashboards/plugins
data_src = '/var/lib/grafana/'
data_src = os.path.abspath(args.legacy_dir + data_src)
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def command_adopt_alertmanager(daemon_id, fsid):
# type: (str, str) -> None
deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
update_firewalld(daemon_type)
+
def _adjust_grafana_ini(filename):
# type: (str) -> None
##################################
+
def command_rm_cluster():
# type: () -> None
if not args.force:
return False
return True
+
def command_check_host():
# type: () -> None
errors = []
##################################
+
def command_prepare_host():
# type: () -> None
logger.info('Verifying podman|docker is present...')
##################################
+
class CustomValidation(argparse.Action):
def _check_name(self, values):
##################################
+
def get_distro():
# type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
distro = None
distro_codename = val.lower()
return distro, distro_version, distro_codename
+
class Packager(object):
def __init__(self, stable=None, version=None, branch=None, commit=None):
assert \
logging.info('Podman did not work. Falling back to docker...')
self.install(['docker.io'])
+
class YumDnf(Packager):
DISTRO_NAMES = {
'centos': ('centos', 'el'),
commit=args.dev_commit)
pkg.add_repo()
+
def command_rm_repo():
pkg = create_packager()
pkg.rm_repo()
+
def command_install():
pkg = create_packager()
pkg.install(args.packages)
##################################
+
def _get_parser():
# type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(
return parser
+
def _parse_args(av):
parser = _get_parser()
args = parser.parse_args(av)
args.command.pop(0)
return args
+
if __name__ == "__main__":
# allow argv to be injected
try:
- av = injected_argv # type: ignore
+ av = injected_argv # type: ignore
except NameError:
av = sys.argv[1:]
args = _parse_args(av)