From 424f4809e6bb5f2ce9abc23e546e15a32a82b778 Mon Sep 17 00:00:00 2001 From: Paul Cuzner Date: Wed, 4 Nov 2020 10:34:49 +1300 Subject: [PATCH] cephadm: rebase patch Signed-off-by: Paul Cuzner --- src/cephadm/cephadm | 626 +++++++++++++++++++++++++++++++------------- 1 file changed, 444 insertions(+), 182 deletions(-) diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 822f4dc8fe9fe..6040cea777ef6 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -37,13 +37,13 @@ You can invoke cephadm in two ways: injected_stdin = '...' """ - import argparse import datetime import fcntl import ipaddress import json import logging +from logging.config import dictConfig import os import platform import pwd @@ -71,6 +71,8 @@ try: from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO except ImportError: pass + +import re import uuid from functools import wraps @@ -101,6 +103,36 @@ cached_stdin = None DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' +# Log and console output config +logging_config = { + 'version': 1, + 'disable_existing_loggers': True, + 'formatters': { + 'cephadm': { + 'format': '%(asctime)s %(levelname)s %(message)s' + }, + }, + 'handlers': { + 'console':{ + 'level':'INFO', + 'class':'logging.StreamHandler', + }, + 'log_file': { + 'level': 'DEBUG', + 'class': 'logging.handlers.RotatingFileHandler', + 'formatter': 'cephadm', + 'filename': '%s/cephadm.log' % LOG_DIR, + 'maxBytes': 1024000, + 'backupCount': 1, + } + }, + 'loggers': { + '': { + 'level': 'DEBUG', + 'handlers': ['console', 'log_file'], + } + } +} class termcolor: yellow = '\033[93m' @@ -209,17 +241,12 @@ class NFSGanesha(object): self.daemon_id = daemon_id self.image = image - def json_get(key, default=None, require=False): - if require and not key in config_json.keys(): - raise Error('{} missing from config-json'.format(key)) - return config_json.get(key, default) - # config-json options - self.pool = json_get('pool', require=True) - self.namespace = json_get('namespace') - self.userid = json_get('userid') - self.extra_args = json_get('extra_args', []) - self.files = json_get('files', {}) + self.pool = dict_get(config_json, 'pool', require=True) + self.namespace = dict_get(config_json, 'namespace') + self.userid = dict_get(config_json, 'userid') + self.extra_args = dict_get(config_json, 'extra_args', []) + self.files = dict_get(config_json, 'files', {}) # validate the supplied args self.validate() @@ -289,14 +316,6 @@ class NFSGanesha(object): # type: () -> List[str] return self.daemon_args + self.extra_args - def get_file_content(self, fname): - # type: (str) -> str - """Normalize the json file content into a string""" - content = self.files.get(fname) - if isinstance(content, list): - content = '\n'.join(content) - return content - def create_daemon_dirs(self, data_dir, uid, gid): # type: (str, int, int) -> None """Create files under the container data dir""" @@ -312,7 +331,7 @@ class NFSGanesha(object): # populate files from the config-json for fname in self.files: config_file = os.path.join(config_dir, fname) - config_content = self.get_file_content(fname) + config_content = dict_get_join(self.files, fname) logger.info('Write file: %s' % (config_file)) with open(config_file, 'w') as f: os.fchown(f.fileno(), uid, gid) @@ -368,13 +387,8 @@ class CephIscsi(object): self.daemon_id = daemon_id self.image = image - def json_get(key, default=None, require=False): - if require and not key in config_json.keys(): - raise Error('{} missing from config-json'.format(key)) - return config_json.get(key, default) - # config-json options - self.files = json_get('files', {}) + self.files = dict_get(config_json, 'files', {}) # validate the supplied args self.validate() @@ -391,9 +405,9 @@ class CephIscsi(object): mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z' - mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config:z' + mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' mounts[log_dir] = '/var/log/rbd-target-api:z' - mounts['/dev'] = '/dev:z' + mounts['/dev'] = '/dev' return mounts @staticmethod @@ -444,14 +458,6 @@ class CephIscsi(object): cname = '%s-%s' % (cname, desc) return cname - def get_file_content(self, fname): - # type: (str) -> str - """Normalize the json file content into a string""" - content = self.files.get(fname) - if isinstance(content, list): - content = '\n'.join(content) - return content - def create_daemon_dirs(self, data_dir, uid, gid): # type: (str, int, int) -> None """Create files under the container data dir""" @@ -465,7 +471,7 @@ class CephIscsi(object): # populate files from the config-json for fname in self.files: config_file = os.path.join(data_dir, fname) - config_content = self.get_file_content(fname) + config_content = dict_get_join(self.files, fname) logger.info('Write file: %s' % (config_file)) with open(config_file, 'w') as f: os.fchown(f.fileno(), uid, gid) @@ -489,17 +495,173 @@ class CephIscsi(object): tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id) tcmu_container.entrypoint = "/usr/bin/tcmu-runner" tcmu_container.cname = self.get_container_name(desc='tcmu') + # remove extra container args for tcmu container. + # extra args could cause issue with forking service type + tcmu_container.container_args = [] return tcmu_container ################################## +class CustomContainer(object): + """Defines a custom container""" + daemon_type = 'container' + + def __init__(self, fsid: str, daemon_id: Union[int, str], + config_json: Dict, image: str) -> None: + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.entrypoint = dict_get(config_json, 'entrypoint') + self.uid = dict_get(config_json, 'uid', 65534) # nobody + self.gid = dict_get(config_json, 'gid', 65534) # nobody + self.volume_mounts = dict_get(config_json, 'volume_mounts', {}) + self.args = dict_get(config_json, 'args', []) + self.envs = dict_get(config_json, 'envs', []) + self.privileged = dict_get(config_json, 'privileged', False) + self.bind_mounts = dict_get(config_json, 'bind_mounts', []) + self.ports = dict_get(config_json, 'ports', []) + self.dirs = dict_get(config_json, 'dirs', []) + self.files = dict_get(config_json, 'files', {}) + + @classmethod + def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer': + return cls(fsid, daemon_id, get_parm(args.config_json), args.image) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """ + Create dirs/files below the container data directory. + """ + logger.info('Creating custom container configuration ' + 'dirs/files in {} ...'.format(data_dir)) + + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % data_dir) + + for dir_path in self.dirs: + logger.info('Creating directory: {}'.format(dir_path)) + dir_path = os.path.join(data_dir, dir_path.strip('/')) + makedirs(dir_path, uid, gid, 0o755) + + for file_path in self.files: + logger.info('Creating file: {}'.format(file_path)) + content = dict_get_join(self.files, file_path) + file_path = os.path.join(data_dir, file_path.strip('/')) + with open(file_path, 'w', encoding='utf-8') as f: + os.fchown(f.fileno(), uid, gid) + os.fchmod(f.fileno(), 0o600) + f.write(content) + + def get_daemon_args(self) -> List[str]: + return [] + + def get_container_args(self) -> List[str]: + return self.args + + def get_container_envs(self) -> List[str]: + return self.envs + + def get_container_mounts(self, data_dir: str) -> Dict[str, str]: + """ + Get the volume mounts. Relative source paths will be located below + `/var/lib/ceph//`. + + Example: + { + /foo/conf: /conf + foo/conf: /conf + } + becomes + { + /foo/conf: /conf + /var/lib/ceph///foo/conf: /conf + } + """ + mounts = {} + for source, destination in self.volume_mounts.items(): + source = os.path.join(data_dir, source) + mounts[source] = destination + return mounts + + def get_container_binds(self, data_dir: str) -> List[List[str]]: + """ + Get the bind mounts. Relative `source=...` paths will be located below + `/var/lib/ceph//`. + + Example: + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + becomes + [ + ... + 'source=/var/lib/ceph///lib/modules', + ... + ] + """ + binds = self.bind_mounts.copy() + for bind in binds: + for index, value in enumerate(bind): + match = re.match(r'^source=(.+)$', value) + if match: + bind[index] = 'source={}'.format(os.path.join( + data_dir, match.group(1))) + return binds + +################################## + + +def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any: + """ + Helper function to get a key from a dictionary. + :param d: The dictionary to process. + :param key: The name of the key to get. + :param default: The default value in case the key does not + exist. Default is `None`. + :param require: Set to `True` if the key is required. An + exception will be raised if the key does not exist in + the given dictionary. + :return: Returns the value of the given key. + :raises: :exc:`self.Error` if the given key does not exist + and `require` is set to `True`. + """ + if require and key not in d.keys(): + raise Error('{} missing from dict'.format(key)) + return d.get(key, default) + +################################## + + +def dict_get_join(d: Dict, key: str) -> Any: + """ + Helper function to get the value of a given key from a dictionary. + `List` values will be converted to a string by joining them with a + line break. + :param d: The dictionary to process. + :param key: The name of the key to get. + :return: Returns the value of the given key. If it was a `list`, it + will be joining with a line break. + """ + value = d.get(key) + if isinstance(value, list): + value = '\n'.join(map(str, value)) + return value + +################################## + + def get_supported_daemons(): # type: () -> List[str] supported_daemons = list(Ceph.daemons) supported_daemons.extend(Monitoring.components) supported_daemons.append(NFSGanesha.daemon_type) supported_daemons.append(CephIscsi.daemon_type) + supported_daemons.append(CustomContainer.daemon_type) supported_daemons.append(CephadmDaemon.daemon_type) assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons @@ -849,7 +1011,8 @@ def call(command, # type: List[str] assert False except (IOError, OSError): pass - logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s' + if verbose: + logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s' % (time.time()-start_time, stop, process.poll(), reads)) returncode = process.wait() @@ -1231,6 +1394,7 @@ def get_last_local_ceph_image(): out, _, _ = call_throws( [container_path, 'images', '--filter', 'label=ceph=True', + '--filter', 'dangling=false', '--format', '{{.Repository}} {{.Tag}}']) for line in out.splitlines(): if len(line.split()) == 2: @@ -1312,7 +1476,7 @@ def copy_tree(src, dst, uid=None, gid=None): """ Copy a directory tree from src to dst """ - if not uid or not gid: + if uid is None or gid is None: (uid, gid) = extract_uid_gid() for src_dir in src: @@ -1337,7 +1501,7 @@ def copy_files(src, dst, uid=None, gid=None): """ Copy a files from src to dst """ - if not uid or not gid: + if uid is None or gid is None: (uid, gid) = extract_uid_gid() for src_file in src: @@ -1357,7 +1521,7 @@ def move_files(src, dst, uid=None, gid=None): """ Move files from src to dst """ - if not uid or not gid: + if uid is None or gid is None: (uid, gid) = extract_uid_gid() for src_file in src: @@ -1558,6 +1722,9 @@ def get_daemon_args(fsid, daemon_type, daemon_id): elif daemon_type == NFSGanesha.daemon_type: nfs_ganesha = NFSGanesha.init(fsid, daemon_id) r += nfs_ganesha.get_daemon_args() + elif daemon_type == CustomContainer.daemon_type: + cc = CustomContainer.init(fsid, daemon_id) + r.extend(cc.get_daemon_args()) return r @@ -1574,6 +1741,7 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, os.fchown(f.fileno(), uid, gid) os.fchmod(f.fileno(), 0o600) f.write(config) + if keyring: keyring_path = os.path.join(data_dir, 'keyring') with open(keyring_path, 'w') as f: @@ -1582,7 +1750,7 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, f.write(keyring) if daemon_type in Monitoring.components.keys(): - config = get_parm(args.config_json) # type: ignore + config_json: Dict[str, Any] = get_parm(args.config_json) required_files = Monitoring.components[daemon_type].get('config-json-files', list()) # Set up directories specific to the monitoring component @@ -1608,25 +1776,25 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid, # populate the config directory for the component from the config-json for fname in required_files: - if 'files' in config: # type: ignore - if isinstance(config['files'][fname], list): # type: ignore - content = '\n'.join(config['files'][fname]) # type: ignore - else: - content = config['files'][fname] # type: ignore - + if 'files' in config_json: # type: ignore + content = dict_get_join(config_json['files'], fname) with open(os.path.join(data_dir_root, config_dir, fname), 'w') as f: os.fchown(f.fileno(), uid, gid) os.fchmod(f.fileno(), 0o600) f.write(content) - if daemon_type == NFSGanesha.daemon_type: + elif daemon_type == NFSGanesha.daemon_type: nfs_ganesha = NFSGanesha.init(fsid, daemon_id) nfs_ganesha.create_daemon_dirs(data_dir, uid, gid) - if daemon_type == CephIscsi.daemon_type: + elif daemon_type == CephIscsi.daemon_type: ceph_iscsi = CephIscsi.init(fsid, daemon_id) ceph_iscsi.create_daemon_dirs(data_dir, uid, gid) + elif daemon_type == CustomContainer.daemon_type: + cc = CustomContainer.init(fsid, daemon_id) + cc.create_daemon_dirs(data_dir, uid, gid) + def get_parm(option): # type: (str) -> Dict[str, str] @@ -1691,8 +1859,12 @@ def get_container_binds(fsid, daemon_type, daemon_id): binds = list() if daemon_type == CephIscsi.daemon_type: - assert daemon_id binds.extend(CephIscsi.get_container_binds()) + elif daemon_type == CustomContainer.daemon_type: + assert daemon_id + cc = CustomContainer.init(fsid, daemon_id) + data_dir = get_data_dir(fsid, daemon_type, daemon_id) + binds.extend(cc.get_container_binds(data_dir)) return binds @@ -1778,14 +1950,27 @@ def get_container_mounts(fsid, daemon_type, daemon_id, log_dir = get_log_dir(fsid) mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir)) + if daemon_type == CustomContainer.daemon_type: + assert daemon_id + cc = CustomContainer.init(fsid, daemon_id) + data_dir = get_data_dir(fsid, daemon_type, daemon_id) + mounts.update(cc.get_container_mounts(data_dir)) + return mounts -def get_container(fsid, daemon_type, daemon_id, - privileged=False, - ptrace=False, - container_args=[]): - # type: (str, str, Union[int, str], bool, bool, List[str]) -> CephContainer +def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str], + privileged: bool = False, + ptrace: bool = False, + container_args: Optional[List[str]] = None) -> 'CephContainer': + entrypoint: str = '' + name: str = '' + ceph_args: List[str] = [] + envs: List[str] = [] + host_network: bool = True + + if container_args is None: + container_args = [] if daemon_type in ['mon', 'osd']: # mon and osd need privileged in order for libudev to query devices privileged = True @@ -1803,21 +1988,23 @@ def get_container(fsid, daemon_type, daemon_id, name = '%s.%s' % (daemon_type, daemon_id) elif daemon_type in Monitoring.components: entrypoint = '' - name = '' elif daemon_type == NFSGanesha.daemon_type: entrypoint = NFSGanesha.entrypoint name = '%s.%s' % (daemon_type, daemon_id) + envs.extend(NFSGanesha.get_container_envs()) elif daemon_type == CephIscsi.daemon_type: entrypoint = CephIscsi.entrypoint name = '%s.%s' % (daemon_type, daemon_id) # So the container can modprobe iscsi_target_mod and have write perms # to configfs we need to make this a privileged container. privileged = True - else: - entrypoint = '' - name = '' + elif daemon_type == CustomContainer.daemon_type: + cc = CustomContainer.init(fsid, daemon_id) + entrypoint = cc.entrypoint + host_network = False + envs.extend(cc.get_container_envs()) + container_args.extend(cc.get_container_args()) - ceph_args = [] # type: List[str] if daemon_type in Monitoring.components: uid, gid = extract_uid_gid_monitoring(daemon_type) monitoring_args = [ @@ -1832,10 +2019,6 @@ def get_container(fsid, daemon_type, daemon_id, elif daemon_type in Ceph.daemons: ceph_args = ['-n', name, '-f'] - envs = [] # type: List[str] - if daemon_type == NFSGanesha.daemon_type: - envs.extend(NFSGanesha.get_container_envs()) - # if using podman, set -d, --conmon-pidfile & --cidfile flags # so service can have Type=Forking if 'podman' in container_path: @@ -1857,6 +2040,8 @@ def get_container(fsid, daemon_type, daemon_id, envs=envs, privileged=privileged, ptrace=ptrace, + init=args.container_init, + host_network=host_network, ) @@ -1984,7 +2169,7 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid, def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False): # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None if comment: - # Sometimes adding a comment, espectially if there are multiple containers in one + # Sometimes adding a comment, especially if there are multiple containers in one # unit file, makes it easier to read and grok. file_obj.write('# ' + comment + '\n') # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually @@ -1996,6 +2181,7 @@ def _write_container_cmd_to_bash(file_obj, container, comment=None, background=F # container run command file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n') + def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, enable=True, start=True, osd_fsid=None): @@ -2004,6 +2190,11 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, data_dir = get_data_dir(fsid, daemon_type, daemon_id) with open(data_dir + '/unit.run.new', 'w') as f: f.write('set -e\n') + + if daemon_type in Ceph.daemons: + install_path = find_program('install') + f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid)) + # pre-start cmd(s) if daemon_type == 'osd': # osds have a pre-start step @@ -2041,10 +2232,6 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c, tcmu_container = ceph_iscsi.get_tcmu_runner_container() _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True) - if daemon_type in Ceph.daemons: - install_path = find_program('install') - f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid)) - _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id))) os.fchmod(f.fileno(), 0o600) os.rename(data_dir + '/unit.run.new', @@ -2342,17 +2529,19 @@ WantedBy=ceph-{fsid}.target class CephContainer: def __init__(self, - image, - entrypoint, - args=[], - volume_mounts={}, - cname='', - container_args=[], - envs=None, - privileged=False, - ptrace=False, - bind_mounts=None): - # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool, Optional[List[List[str]]]) -> None + image: str, + entrypoint: str, + args: List[str] = [], + volume_mounts: Dict[str, str] = {}, + cname: str = '', + container_args: List[str] = [], + envs: Optional[List[str]] = None, + privileged: bool = False, + ptrace: bool = False, + bind_mounts: Optional[List[List[str]]] = None, + init: bool = False, + host_network: bool = True, + ) -> None: self.image = image self.entrypoint = entrypoint self.args = args @@ -2363,83 +2552,90 @@ class CephContainer: self.privileged = privileged self.ptrace = ptrace self.bind_mounts = bind_mounts if bind_mounts else [] + self.init = init + self.host_network = host_network - def run_cmd(self): - # type: () -> List[str] - vols = [] # type: List[str] - envs = [] # type: List[str] - cname = [] # type: List[str] - binds = [] # type: List[str] - entrypoint = [] # type: List[str] - if self.entrypoint: - entrypoint = ['--entrypoint', self.entrypoint] + def run_cmd(self) -> List[str]: + cmd_args: List[str] = [ + str(container_path), + 'run', + '--rm', + '--ipc=host', + ] + envs: List[str] = [ + '-e', 'CONTAINER_IMAGE=%s' % self.image, + '-e', 'NODE_NAME=%s' % get_hostname(), + ] + vols: List[str] = [] + binds: List[str] = [] - priv = [] # type: List[str] + if self.host_network: + cmd_args.append('--net=host') + if self.entrypoint: + cmd_args.extend(['--entrypoint', self.entrypoint]) if self.privileged: - priv = ['--privileged', - # let OSD etc read block devs that haven't been chowned - '--group-add=disk'] - if self.ptrace: - priv.append('--cap-add=SYS_PTRACE') + cmd_args.extend([ + '--privileged', + # let OSD etc read block devs that haven't been chowned + '--group-add=disk']) + if self.ptrace and not self.privileged: + # if privileged, the SYS_PTRACE cap is already added + # in addition, --cap-add and --privileged are mutually + # exclusive since podman >= 2.0 + cmd_args.append('--cap-add=SYS_PTRACE') + if self.init: + cmd_args.append('--init') + if self.cname: + cmd_args.extend(['--name', self.cname]) + if self.envs: + for env in self.envs: + envs.extend(['-e', env]) + vols = sum( [['-v', '%s:%s' % (host_dir, container_dir)] for host_dir, container_dir in self.volume_mounts.items()], []) binds = sum([['--mount', '{}'.format(','.join(bind))] - for bind in self.bind_mounts],[]) - envs = [ - '-e', 'CONTAINER_IMAGE=%s' % self.image, - '-e', 'NODE_NAME=%s' % get_hostname(), - ] - if self.envs: - for e in self.envs: - envs.extend(['-e', e]) - cname = ['--name', self.cname] if self.cname else [] - return [ + for bind in self.bind_mounts], []) + + return cmd_args + self.container_args + envs + vols + binds + [ + self.image, + ] + self.args # type: ignore + + def shell_cmd(self, cmd: List[str]) -> List[str]: + cmd_args: List[str] = [ str(container_path), 'run', '--rm', - '--net=host', '--ipc=host', - ] + self.container_args + priv + \ - cname + envs + \ - vols + binds + entrypoint + \ - [ - self.image - ] + self.args # type: ignore + ] + envs: List[str] = [ + '-e', 'CONTAINER_IMAGE=%s' % self.image, + '-e', 'NODE_NAME=%s' % get_hostname(), + ] + vols: List[str] = [] + binds: List[str] = [] - def shell_cmd(self, cmd): - # type: (List[str]) -> List[str] - priv = [] # type: List[str] + if self.host_network: + cmd_args.append('--net=host') if self.privileged: - priv = ['--privileged', - # let OSD etc read block devs that haven't been chowned - '--group-add=disk'] - vols = [] # type: List[str] + cmd_args.extend([ + '--privileged', + # let OSD etc read block devs that haven't been chowned + '--group-add=disk', + ]) + if self.envs: + for env in self.envs: + envs.extend(['-e', env]) + vols = sum( [['-v', '%s:%s' % (host_dir, container_dir)] for host_dir, container_dir in self.volume_mounts.items()], []) - binds = [] # type: List[str] binds = sum([['--mount', '{}'.format(','.join(bind))] for bind in self.bind_mounts], []) - envs = [ - '-e', 'CONTAINER_IMAGE=%s' % self.image, - '-e', 'NODE_NAME=%s' % get_hostname(), - ] - if self.envs: - for e in self.envs: - envs.extend(['-e', e]) - cmd_args = [] # type: List[str] - if cmd: - cmd_args = ['-c'] + cmd - return [ - str(container_path), - 'run', - '--rm', - '--net=host', - '--ipc=host', - ] + self.container_args + priv + envs + vols + binds + [ + + return cmd_args + self.container_args + envs + vols + binds + [ '--entrypoint', cmd[0], - self.image + self.image, ] + cmd[1:] def exec_cmd(self, cmd): @@ -2472,7 +2668,6 @@ class CephContainer: def run(self, timeout=DEFAULT_TIMEOUT): # type: (Optional[int]) -> str - logger.debug(self.run_cmd()) out, _, _ = call_throws( self.run_cmd(), desc=self.entrypoint, timeout=timeout) return out @@ -2531,18 +2726,33 @@ def command_inspect_image(): # type: () -> int out, err, ret = call_throws([ container_path, 'inspect', - '--format', '{{.Id}}', + '--format', '{{.ID}},{{json .RepoDigests}}', args.image]) if ret: return errno.ENOENT - image_id = normalize_container_id(out.strip()) + info_from = get_image_info_from_inspect(out.strip(), args.image) + ver = CephContainer(args.image, 'ceph', ['--version']).run().strip() + info_from['ceph_version'] = ver + + print(json.dumps(info_from, indent=4, sort_keys=True)) + return 0 + + +def get_image_info_from_inspect(out, image): + # type: (str, str) -> Dict[str, str] + image_id, digests = out.split(',', 1) + if not out: + raise Error('inspect {}: empty result'.format(image)) r = { - 'image_id': image_id, - 'ceph_version': ver, + 'image_id': normalize_container_id(image_id) } - print(json.dumps(r, indent=4, sort_keys=True)) - return 0 + if digests: + json_digests = json.loads(digests) + if json_digests: + r['repo_digest'] = json_digests[0] + return r + ################################## @@ -2613,7 +2823,7 @@ def command_bootstrap(): raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0])) mon_id = args.mon_id or hostname mgr_id = args.mgr_id or generate_service_id() - logging.info('Cluster fsid: %s' % fsid) + logger.info('Cluster fsid: %s' % fsid) ipv6 = False l = FileLock(fsid) @@ -3011,6 +3221,9 @@ def command_bootstrap(): cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force']) cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force']) + if args.container_init: + cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(args.container_init), '--force']) + if not args.skip_dashboard: # Configure SSL port (cephadm only allows to configure dashboard SSL port) # if the user does not want to use SSL he can change this setting once the cluster is up @@ -3237,6 +3450,17 @@ def command_deploy(): reconfig=args.reconfig, ports=daemon_ports) + elif daemon_type == CustomContainer.daemon_type: + cc = CustomContainer.init(args.fsid, daemon_id) + if not args.reconfig and not redeploy: + daemon_ports.extend(cc.ports) + c = get_container(args.fsid, daemon_type, daemon_id, + privileged=cc.privileged, + ptrace=args.allow_ptrace) + deploy_daemon(args.fsid, daemon_type, daemon_id, c, + uid=cc.uid, gid=cc.gid, config=None, + keyring=None, reconfig=args.reconfig, + ports=daemon_ports) elif daemon_type == CephadmDaemon.daemon_type: # get current user gid and uid uid = os.getuid() @@ -3248,7 +3472,8 @@ def command_deploy(): uid, gid, ports=daemon_ports) else: - raise Error("{} not implemented in command_deploy function".format(daemon_type)) + raise Error('daemon type {} not implemented in command_deploy function' + .format(daemon_type)) ################################## @@ -3299,9 +3524,15 @@ def command_shell(): if args.keyring: mounts[pathify(args.keyring)] = '/etc/ceph/ceph.keyring:z' if args.mount: - mount = pathify(args.mount) - filename = os.path.basename(mount) - mounts[mount] = '/mnt/{}:z'.format(filename) + for _mount in args.mount: + split_src_dst = _mount.split(':') + mount = pathify(split_src_dst[0]) + filename = os.path.basename(split_src_dst[0]) + if len(split_src_dst) > 1: + dst = split_src_dst[1] + ':z' if len(split_src_dst) == 3 else split_src_dst[1] + mounts[mount] = dst + else: + mounts[mount] = '/mnt/{}:z'.format(filename) if args.command: command = args.command else: @@ -3523,6 +3754,7 @@ def command_list_networks(): def command_ls(): # type: () -> None + ls = list_daemons(detail=not args.no_detail, legacy_dir=args.legacy_dir) print(json.dumps(ls, indent=4)) @@ -3648,8 +3880,13 @@ def list_daemons(detail=True, legacy_dir=None): err.startswith('%s, version ' % cmd): version = err.split(' ')[2] seen_versions[image_id] = version + elif daemon_type == CustomContainer.daemon_type: + # Because a custom container can contain + # everything, we do not know which command + # to execute to get the version. + pass else: - logging.warning('version for unknown daemon type %s' % daemon_type) + logger.warning('version for unknown daemon type %s' % daemon_type) else: vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore try: @@ -4194,6 +4431,7 @@ def check_time_sync(enabler=None): 'systemd-timesyncd.service', 'ntpd.service', # el7 (at least) 'ntp.service', # 18.04 (at least) + 'ntpsec.service', # 20.04 (at least) / buster ] if not check_units(units, enabler): logger.warning('No time sync service is running; checked for %s' % units) @@ -4349,7 +4587,7 @@ class Packager(object): def query_shaman(self, distro, distro_version, branch, commit): # query shaman - logging.info('Fetching repo metadata from shaman and chacra...') + logger.info('Fetching repo metadata from shaman and chacra...') shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format( distro=distro, distro_version=distro_version, @@ -4360,13 +4598,13 @@ class Packager(object): try: shaman_response = urlopen(shaman_url) except HTTPError as err: - logging.error('repository not found in shaman (might not be available yet)') + logger.error('repository not found in shaman (might not be available yet)') raise Error('%s, failed to fetch %s' % (err, shaman_url)) try: chacra_url = shaman_response.geturl() chacra_response = urlopen(chacra_url) except HTTPError as err: - logging.error('repository not found in chacra (might not be available yet)') + logger.error('repository not found in chacra (might not be available yet)') raise Error('%s, failed to fetch %s' % (err, chacra_url)) return chacra_response.read().decode('utf-8') @@ -4403,11 +4641,11 @@ class Apt(Packager): def add_repo(self): url, name = self.repo_gpgkey() - logging.info('Installing repo GPG key from %s...' % url) + logger.info('Installing repo GPG key from %s...' % url) try: response = urlopen(url) except HTTPError as err: - logging.error('failed to fetch GPG repo key from %s: %s' % ( + logger.error('failed to fetch GPG repo key from %s: %s' % ( url, err)) raise Error('failed to fetch GPG key') key = response.read().decode('utf-8') @@ -4424,7 +4662,7 @@ class Apt(Packager): content = self.query_shaman(self.distro, self.distro_codename, self.branch, self.commit) - logging.info('Installing repo file at %s...' % self.repo_path()) + logger.info('Installing repo file at %s...' % self.repo_path()) with open(self.repo_path(), 'w') as f: f.write(content) @@ -4432,28 +4670,28 @@ class Apt(Packager): for name in ['autobuild', 'release']: p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name if os.path.exists(p): - logging.info('Removing repo GPG key %s...' % p) + logger.info('Removing repo GPG key %s...' % p) os.unlink(p) if os.path.exists(self.repo_path()): - logging.info('Removing repo at %s...' % self.repo_path()) + logger.info('Removing repo at %s...' % self.repo_path()) os.unlink(self.repo_path()) def install(self, ls): - logging.info('Installing packages %s...' % ls) + logger.info('Installing packages %s...' % ls) call_throws(['apt', 'install', '-y'] + ls) def install_podman(self): if self.distro == 'ubuntu': - logging.info('Setting up repo for pdoman...') + logger.info('Setting up repo for pdoman...') self.install(['software-properties-common']) call_throws(['add-apt-repository', '-y', 'ppa:projectatomic/ppa']) call_throws(['apt', 'update']) - logging.info('Attempting podman install...') + logger.info('Attempting podman install...') try: self.install(['podman']) except Error as e: - logging.info('Podman did not work. Falling back to docker...') + logger.info('Podman did not work. Falling back to docker...') self.install(['docker.io']) @@ -4567,7 +4805,7 @@ class YumDnf(Packager): self.branch, self.commit) - logging.info('Writing repo to %s...' % self.repo_path()) + logger.info('Writing repo to %s...' % self.repo_path()) with open(self.repo_path(), 'w') as f: f.write(content) @@ -4664,7 +4902,7 @@ class Zypper(Packager): self.branch, self.commit) - logging.info('Writing repo to %s...' % self.repo_path()) + logger.info('Writing repo to %s...' % self.repo_path()) with open(self.repo_path(), 'w') as f: f.write(content) @@ -4737,7 +4975,7 @@ def get_ipv4_address(ifname): offset, struct.pack('256s', bytes(ifname[:15], 'utf-8')) )[20:24]) - + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR @@ -4745,7 +4983,7 @@ def get_ipv4_address(ifname): except OSError: # interface does not have an ipv4 address return '' - + dec_mask = sum([bin(int(i)).count('1') for i in dq_mask.split('.')]) return '{}/{}'.format(addr, dec_mask) @@ -4773,8 +5011,8 @@ def get_ipv6_address(ifname): def bytes_to_human(num, mode='decimal'): # type: (float, str) -> str - """Convert a bytes value into it's human-readable form. - + """Convert a bytes value into it's human-readable form. + :param num: number, in bytes, to convert :param mode: Either decimal (default) or binary to determine divisor :returns: string representing the bytes value in a more readable format @@ -4798,7 +5036,7 @@ def bytes_to_human(num, mode='decimal'): def read_file(path_list, file_name=''): # type: (List[str], str) -> str """Returns the content of the first file found within the `path_list` - + :param path_list: list of file paths to search :param file_name: optional file_name to be applied to a file path :returns: content of the file or 'Unknown' @@ -5046,7 +5284,7 @@ class HostFacts(): if not os.path.exists(nic_path): continue for iface in os.listdir(nic_path): - + lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))] upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))] @@ -5123,7 +5361,7 @@ class HostFacts(): # type: () -> int """Determine the memory installed (kb)""" return self._get_mem_data('MemTotal') - + @property def memory_free_kb(self): # type: () -> int @@ -5170,7 +5408,7 @@ class HostFacts(): # type: () -> float """Return the current time as Epoch seconds""" return time.time() - + @property def system_uptime(self): # type: () -> float @@ -5189,7 +5427,7 @@ class HostFacts(): for selinux_path in HostFacts._selinux_path_list: if os.path.exists(selinux_path): selinux_config = read_file([selinux_path]).splitlines() - security['type'] = 'SELinux' + security['type'] = 'SELinux' for line in selinux_config: if line.strip().startswith('#'): continue @@ -5224,7 +5462,7 @@ class HostFacts(): summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()]) security = {**security, **summary} # type: ignore security['description'] += "({})".format(summary_str) - + return security if os.path.exists('/sys/kernel/security/lsm'): @@ -5249,7 +5487,7 @@ class HostFacts(): """Return the attributes of this HostFacts object as json""" data = {k: getattr(self, k) for k in dir(self) if not k.startswith('_') and - isinstance(getattr(self, k), + isinstance(getattr(self, k), (float, int, str, list, dict, tuple)) } return json.dumps(data, indent=2, sort_keys=True) @@ -5945,6 +6183,10 @@ def _get_parser(): '--force-start', action='store_true', help="start newly adoped daemon, even if it wasn't running previously") + parser_adopt.add_argument( + '--container-init', + action='store_true', + help='Run podman/docker with `--init`') parser_rm_daemon = subparsers.add_parser( 'rm-daemon', help='remove daemon instance') @@ -6008,7 +6250,11 @@ def _get_parser(): help='ceph.keyring to pass through to the container') parser_shell.add_argument( '--mount', '-m', - help='mount a file or directory under /mnt in the container') + help=("mount a file or directory in the container. " + "Support multiple mounts. " + "ie: `--mount /foo /bar:/bar`. " + "When no destination is passed, default is /mnt"), + nargs='+') parser_shell.add_argument( '--env', '-e', action='append', @@ -6231,6 +6477,10 @@ def _get_parser(): parser_bootstrap.add_argument( '--registry-json', help='json file with custom registry login info (URL, Username, Password)') + parser_bootstrap.add_argument( + '--container-init', + action='store_true', + help='Run podman/docker with `--init`') parser_deploy = subparsers.add_parser( 'deploy', help='deploy a daemon') @@ -6274,6 +6524,10 @@ def _get_parser(): '--allow-ptrace', action='store_true', help='Allow SYS_PTRACE on daemon container') + parser_deploy.add_argument( + '--container-init', + action='store_true', + help='Run podman/docker with `--init`') parser_check_host = subparsers.add_parser( 'check-host', help='check host configuration') @@ -6379,18 +6633,26 @@ def _parse_args(av): if __name__ == "__main__": + + # Logger configuration + if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR) + dictConfig(logging_config) + logger = logging.getLogger() + # allow argv to be injected try: av = injected_argv # type: ignore except NameError: av = sys.argv[1:] + logger.debug("%s\ncephadm %s" % ("-" * 80, av)) args = _parse_args(av) + # More verbose console output if args.verbose: - logging.basicConfig(level=logging.DEBUG) - else: - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger('cephadm') + for handler in logger.handlers: + if handler.name == "console": + handler.setLevel(logging.DEBUG) # root? if os.geteuid() != 0: -- 2.39.5