injected_stdin = '...'
"""
-
import argparse
import datetime
import fcntl
import ipaddress
import json
import logging
+from logging.config import dictConfig
import os
import platform
import pwd
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
pass
+
+import re
import uuid
from functools import wraps
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+# Log and console output config
+logging_config = {
+ 'version': 1,
+ 'disable_existing_loggers': True,
+ 'formatters': {
+ 'cephadm': {
+ 'format': '%(asctime)s %(levelname)s %(message)s'
+ },
+ },
+ 'handlers': {
+ 'console':{
+ 'level':'INFO',
+ 'class':'logging.StreamHandler',
+ },
+ 'log_file': {
+ 'level': 'DEBUG',
+ 'class': 'logging.handlers.RotatingFileHandler',
+ 'formatter': 'cephadm',
+ 'filename': '%s/cephadm.log' % LOG_DIR,
+ 'maxBytes': 1024000,
+ 'backupCount': 1,
+ }
+ },
+ 'loggers': {
+ '': {
+ 'level': 'DEBUG',
+ 'handlers': ['console', 'log_file'],
+ }
+ }
+}
class termcolor:
yellow = '\033[93m'
self.daemon_id = daemon_id
self.image = image
- def json_get(key, default=None, require=False):
- if require and not key in config_json.keys():
- raise Error('{} missing from config-json'.format(key))
- return config_json.get(key, default)
-
# config-json options
- self.pool = json_get('pool', require=True)
- self.namespace = json_get('namespace')
- self.userid = json_get('userid')
- self.extra_args = json_get('extra_args', [])
- self.files = json_get('files', {})
+ self.pool = dict_get(config_json, 'pool', require=True)
+ self.namespace = dict_get(config_json, 'namespace')
+ self.userid = dict_get(config_json, 'userid')
+ self.extra_args = dict_get(config_json, 'extra_args', [])
+ self.files = dict_get(config_json, 'files', {})
# validate the supplied args
self.validate()
# type: () -> List[str]
return self.daemon_args + self.extra_args
- def get_file_content(self, fname):
- # type: (str) -> str
- """Normalize the json file content into a string"""
- content = self.files.get(fname)
- if isinstance(content, list):
- content = '\n'.join(content)
- return content
-
def create_daemon_dirs(self, data_dir, uid, gid):
# type: (str, int, int) -> None
"""Create files under the container data dir"""
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(config_dir, fname)
- config_content = self.get_file_content(fname)
+ config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
self.daemon_id = daemon_id
self.image = image
- def json_get(key, default=None, require=False):
- if require and not key in config_json.keys():
- raise Error('{} missing from config-json'.format(key))
- return config_json.get(key, default)
-
# config-json options
- self.files = json_get('files', {})
+ self.files = dict_get(config_json, 'files', {})
# validate the supplied args
self.validate()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
- mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config:z'
+ mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
mounts[log_dir] = '/var/log/rbd-target-api:z'
- mounts['/dev'] = '/dev:z'
+ mounts['/dev'] = '/dev'
return mounts
@staticmethod
cname = '%s-%s' % (cname, desc)
return cname
- def get_file_content(self, fname):
- # type: (str) -> str
- """Normalize the json file content into a string"""
- content = self.files.get(fname)
- if isinstance(content, list):
- content = '\n'.join(content)
- return content
-
def create_daemon_dirs(self, data_dir, uid, gid):
# type: (str, int, int) -> None
"""Create files under the container data dir"""
# populate files from the config-json
for fname in self.files:
config_file = os.path.join(data_dir, fname)
- config_content = self.get_file_content(fname)
+ config_content = dict_get_join(self.files, fname)
logger.info('Write file: %s' % (config_file))
with open(config_file, 'w') as f:
os.fchown(f.fileno(), uid, gid)
tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id)
tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
tcmu_container.cname = self.get_container_name(desc='tcmu')
+ # remove extra container args for tcmu container.
+ # extra args could cause issue with forking service type
+ tcmu_container.container_args = []
return tcmu_container
##################################
+class CustomContainer(object):
+ """Defines a custom container"""
+ daemon_type = 'container'
+
+ def __init__(self, fsid: str, daemon_id: Union[int, str],
+ config_json: Dict, image: str) -> None:
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.entrypoint = dict_get(config_json, 'entrypoint')
+ self.uid = dict_get(config_json, 'uid', 65534) # nobody
+ self.gid = dict_get(config_json, 'gid', 65534) # nobody
+ self.volume_mounts = dict_get(config_json, 'volume_mounts', {})
+ self.args = dict_get(config_json, 'args', [])
+ self.envs = dict_get(config_json, 'envs', [])
+ self.privileged = dict_get(config_json, 'privileged', False)
+ self.bind_mounts = dict_get(config_json, 'bind_mounts', [])
+ self.ports = dict_get(config_json, 'ports', [])
+ self.dirs = dict_get(config_json, 'dirs', [])
+ self.files = dict_get(config_json, 'files', {})
+
+ @classmethod
+ def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
+ return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+
+ def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
+ """
+ Create dirs/files below the container data directory.
+ """
+ logger.info('Creating custom container configuration '
+ 'dirs/files in {} ...'.format(data_dir))
+
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % data_dir)
+
+ for dir_path in self.dirs:
+ logger.info('Creating directory: {}'.format(dir_path))
+ dir_path = os.path.join(data_dir, dir_path.strip('/'))
+ makedirs(dir_path, uid, gid, 0o755)
+
+ for file_path in self.files:
+ logger.info('Creating file: {}'.format(file_path))
+ content = dict_get_join(self.files, file_path)
+ file_path = os.path.join(data_dir, file_path.strip('/'))
+ with open(file_path, 'w', encoding='utf-8') as f:
+ os.fchown(f.fileno(), uid, gid)
+ os.fchmod(f.fileno(), 0o600)
+ f.write(content)
+
+ def get_daemon_args(self) -> List[str]:
+ return []
+
+ def get_container_args(self) -> List[str]:
+ return self.args
+
+ def get_container_envs(self) -> List[str]:
+ return self.envs
+
+ def get_container_mounts(self, data_dir: str) -> Dict[str, str]:
+ """
+ Get the volume mounts. Relative source paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ {
+ /foo/conf: /conf
+ foo/conf: /conf
+ }
+ becomes
+ {
+ /foo/conf: /conf
+ /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf
+ }
+ """
+ mounts = {}
+ for source, destination in self.volume_mounts.items():
+ source = os.path.join(data_dir, source)
+ mounts[source] = destination
+ return mounts
+
+ def get_container_binds(self, data_dir: str) -> List[List[str]]:
+ """
+ Get the bind mounts. Relative `source=...` paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ [
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]
+ becomes
+ [
+ ...
+ 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules',
+ ...
+ ]
+ """
+ binds = self.bind_mounts.copy()
+ for bind in binds:
+ for index, value in enumerate(bind):
+ match = re.match(r'^source=(.+)$', value)
+ if match:
+ bind[index] = 'source={}'.format(os.path.join(
+ data_dir, match.group(1)))
+ return binds
+
+##################################
+
+
+def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any:
+ """
+ Helper function to get a key from a dictionary.
+ :param d: The dictionary to process.
+ :param key: The name of the key to get.
+ :param default: The default value in case the key does not
+ exist. Default is `None`.
+ :param require: Set to `True` if the key is required. An
+ exception will be raised if the key does not exist in
+ the given dictionary.
+ :return: Returns the value of the given key.
+ :raises: :exc:`self.Error` if the given key does not exist
+ and `require` is set to `True`.
+ """
+ if require and key not in d.keys():
+ raise Error('{} missing from dict'.format(key))
+ return d.get(key, default)
+
+##################################
+
+
+def dict_get_join(d: Dict, key: str) -> Any:
+ """
+ Helper function to get the value of a given key from a dictionary.
+ `List` values will be converted to a string by joining them with a
+ line break.
+ :param d: The dictionary to process.
+ :param key: The name of the key to get.
+ :return: Returns the value of the given key. If it was a `list`, it
+ will be joining with a line break.
+ """
+ value = d.get(key)
+ if isinstance(value, list):
+ value = '\n'.join(map(str, value))
+ return value
+
+##################################
+
+
def get_supported_daemons():
# type: () -> List[str]
supported_daemons = list(Ceph.daemons)
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
+ supported_daemons.append(CustomContainer.daemon_type)
supported_daemons.append(CephadmDaemon.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
assert False
except (IOError, OSError):
pass
- logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s'
+ if verbose:
+ logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s'
% (time.time()-start_time, stop, process.poll(), reads))
returncode = process.wait()
out, _, _ = call_throws(
[container_path, 'images',
'--filter', 'label=ceph=True',
+ '--filter', 'dangling=false',
'--format', '{{.Repository}} {{.Tag}}'])
for line in out.splitlines():
if len(line.split()) == 2:
"""
Copy a directory tree from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_dir in src:
"""
Copy a files from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_file in src:
"""
Move files from src to dst
"""
- if not uid or not gid:
+ if uid is None or gid is None:
(uid, gid) = extract_uid_gid()
for src_file in src:
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ r.extend(cc.get_daemon_args())
return r
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(config)
+
if keyring:
keyring_path = os.path.join(data_dir, 'keyring')
with open(keyring_path, 'w') as f:
f.write(keyring)
if daemon_type in Monitoring.components.keys():
- config = get_parm(args.config_json) # type: ignore
+ config_json: Dict[str, Any] = get_parm(args.config_json)
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
# Set up directories specific to the monitoring component
# populate the config directory for the component from the config-json
for fname in required_files:
- if 'files' in config: # type: ignore
- if isinstance(config['files'][fname], list): # type: ignore
- content = '\n'.join(config['files'][fname]) # type: ignore
- else:
- content = config['files'][fname] # type: ignore
-
+ if 'files' in config_json: # type: ignore
+ content = dict_get_join(config_json['files'], fname)
with open(os.path.join(data_dir_root, config_dir, fname), 'w') as f:
os.fchown(f.fileno(), uid, gid)
os.fchmod(f.fileno(), 0o600)
f.write(content)
- if daemon_type == NFSGanesha.daemon_type:
+ elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
nfs_ganesha.create_daemon_dirs(data_dir, uid, gid)
- if daemon_type == CephIscsi.daemon_type:
+ elif daemon_type == CephIscsi.daemon_type:
ceph_iscsi = CephIscsi.init(fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ cc.create_daemon_dirs(data_dir, uid, gid)
+
def get_parm(option):
# type: (str) -> Dict[str, str]
binds = list()
if daemon_type == CephIscsi.daemon_type:
- assert daemon_id
binds.extend(CephIscsi.get_container_binds())
+ elif daemon_type == CustomContainer.daemon_type:
+ assert daemon_id
+ cc = CustomContainer.init(fsid, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ binds.extend(cc.get_container_binds(data_dir))
return binds
log_dir = get_log_dir(fsid)
mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
+ if daemon_type == CustomContainer.daemon_type:
+ assert daemon_id
+ cc = CustomContainer.init(fsid, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ mounts.update(cc.get_container_mounts(data_dir))
+
return mounts
-def get_container(fsid, daemon_type, daemon_id,
- privileged=False,
- ptrace=False,
- container_args=[]):
- # type: (str, str, Union[int, str], bool, bool, List[str]) -> CephContainer
+def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str],
+ privileged: bool = False,
+ ptrace: bool = False,
+ container_args: Optional[List[str]] = None) -> 'CephContainer':
+ entrypoint: str = ''
+ name: str = ''
+ ceph_args: List[str] = []
+ envs: List[str] = []
+ host_network: bool = True
+
+ if container_args is None:
+ container_args = []
if daemon_type in ['mon', 'osd']:
# mon and osd need privileged in order for libudev to query devices
privileged = True
name = '%s.%s' % (daemon_type, daemon_id)
elif daemon_type in Monitoring.components:
entrypoint = ''
- name = ''
elif daemon_type == NFSGanesha.daemon_type:
entrypoint = NFSGanesha.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
+ envs.extend(NFSGanesha.get_container_envs())
elif daemon_type == CephIscsi.daemon_type:
entrypoint = CephIscsi.entrypoint
name = '%s.%s' % (daemon_type, daemon_id)
# So the container can modprobe iscsi_target_mod and have write perms
# to configfs we need to make this a privileged container.
privileged = True
- else:
- entrypoint = ''
- name = ''
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(fsid, daemon_id)
+ entrypoint = cc.entrypoint
+ host_network = False
+ envs.extend(cc.get_container_envs())
+ container_args.extend(cc.get_container_args())
- ceph_args = [] # type: List[str]
if daemon_type in Monitoring.components:
uid, gid = extract_uid_gid_monitoring(daemon_type)
monitoring_args = [
elif daemon_type in Ceph.daemons:
ceph_args = ['-n', name, '-f']
- envs = [] # type: List[str]
- if daemon_type == NFSGanesha.daemon_type:
- envs.extend(NFSGanesha.get_container_envs())
-
# if using podman, set -d, --conmon-pidfile & --cidfile flags
# so service can have Type=Forking
if 'podman' in container_path:
envs=envs,
privileged=privileged,
ptrace=ptrace,
+ init=args.container_init,
+ host_network=host_network,
)
def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
# type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
if comment:
- # Sometimes adding a comment, espectially if there are multiple containers in one
+ # Sometimes adding a comment, especially if there are multiple containers in one
# unit file, makes it easier to read and grok.
file_obj.write('# ' + comment + '\n')
# Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
# container run command
file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
+
def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
with open(data_dir + '/unit.run.new', 'w') as f:
f.write('set -e\n')
+
+ if daemon_type in Ceph.daemons:
+ install_path = find_program('install')
+ f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
+
# pre-start cmd(s)
if daemon_type == 'osd':
# osds have a pre-start step
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
_write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
- if daemon_type in Ceph.daemons:
- install_path = find_program('install')
- f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
-
_write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id)))
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.run.new',
class CephContainer:
def __init__(self,
- image,
- entrypoint,
- args=[],
- volume_mounts={},
- cname='',
- container_args=[],
- envs=None,
- privileged=False,
- ptrace=False,
- bind_mounts=None):
- # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool, Optional[List[List[str]]]) -> None
+ image: str,
+ entrypoint: str,
+ args: List[str] = [],
+ volume_mounts: Dict[str, str] = {},
+ cname: str = '',
+ container_args: List[str] = [],
+ envs: Optional[List[str]] = None,
+ privileged: bool = False,
+ ptrace: bool = False,
+ bind_mounts: Optional[List[List[str]]] = None,
+ init: bool = False,
+ host_network: bool = True,
+ ) -> None:
self.image = image
self.entrypoint = entrypoint
self.args = args
self.privileged = privileged
self.ptrace = ptrace
self.bind_mounts = bind_mounts if bind_mounts else []
+ self.init = init
+ self.host_network = host_network
- def run_cmd(self):
- # type: () -> List[str]
- vols = [] # type: List[str]
- envs = [] # type: List[str]
- cname = [] # type: List[str]
- binds = [] # type: List[str]
- entrypoint = [] # type: List[str]
- if self.entrypoint:
- entrypoint = ['--entrypoint', self.entrypoint]
+ def run_cmd(self) -> List[str]:
+ cmd_args: List[str] = [
+ str(container_path),
+ 'run',
+ '--rm',
+ '--ipc=host',
+ ]
+ envs: List[str] = [
+ '-e', 'CONTAINER_IMAGE=%s' % self.image,
+ '-e', 'NODE_NAME=%s' % get_hostname(),
+ ]
+ vols: List[str] = []
+ binds: List[str] = []
- priv = [] # type: List[str]
+ if self.host_network:
+ cmd_args.append('--net=host')
+ if self.entrypoint:
+ cmd_args.extend(['--entrypoint', self.entrypoint])
if self.privileged:
- priv = ['--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk']
- if self.ptrace:
- priv.append('--cap-add=SYS_PTRACE')
+ cmd_args.extend([
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk'])
+ if self.ptrace and not self.privileged:
+ # if privileged, the SYS_PTRACE cap is already added
+ # in addition, --cap-add and --privileged are mutually
+ # exclusive since podman >= 2.0
+ cmd_args.append('--cap-add=SYS_PTRACE')
+ if self.init:
+ cmd_args.append('--init')
+ if self.cname:
+ cmd_args.extend(['--name', self.cname])
+ if self.envs:
+ for env in self.envs:
+ envs.extend(['-e', env])
+
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
binds = sum([['--mount', '{}'.format(','.join(bind))]
- for bind in self.bind_mounts],[])
- envs = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
- ]
- if self.envs:
- for e in self.envs:
- envs.extend(['-e', e])
- cname = ['--name', self.cname] if self.cname else []
- return [
+ for bind in self.bind_mounts], [])
+
+ return cmd_args + self.container_args + envs + vols + binds + [
+ self.image,
+ ] + self.args # type: ignore
+
+ def shell_cmd(self, cmd: List[str]) -> List[str]:
+ cmd_args: List[str] = [
str(container_path),
'run',
'--rm',
- '--net=host',
'--ipc=host',
- ] + self.container_args + priv + \
- cname + envs + \
- vols + binds + entrypoint + \
- [
- self.image
- ] + self.args # type: ignore
+ ]
+ envs: List[str] = [
+ '-e', 'CONTAINER_IMAGE=%s' % self.image,
+ '-e', 'NODE_NAME=%s' % get_hostname(),
+ ]
+ vols: List[str] = []
+ binds: List[str] = []
- def shell_cmd(self, cmd):
- # type: (List[str]) -> List[str]
- priv = [] # type: List[str]
+ if self.host_network:
+ cmd_args.append('--net=host')
if self.privileged:
- priv = ['--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk']
- vols = [] # type: List[str]
+ cmd_args.extend([
+ '--privileged',
+ # let OSD etc read block devs that haven't been chowned
+ '--group-add=disk',
+ ])
+ if self.envs:
+ for env in self.envs:
+ envs.extend(['-e', env])
+
vols = sum(
[['-v', '%s:%s' % (host_dir, container_dir)]
for host_dir, container_dir in self.volume_mounts.items()], [])
- binds = [] # type: List[str]
binds = sum([['--mount', '{}'.format(','.join(bind))]
for bind in self.bind_mounts], [])
- envs = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
- ]
- if self.envs:
- for e in self.envs:
- envs.extend(['-e', e])
- cmd_args = [] # type: List[str]
- if cmd:
- cmd_args = ['-c'] + cmd
- return [
- str(container_path),
- 'run',
- '--rm',
- '--net=host',
- '--ipc=host',
- ] + self.container_args + priv + envs + vols + binds + [
+
+ return cmd_args + self.container_args + envs + vols + binds + [
'--entrypoint', cmd[0],
- self.image
+ self.image,
] + cmd[1:]
def exec_cmd(self, cmd):
def run(self, timeout=DEFAULT_TIMEOUT):
# type: (Optional[int]) -> str
- logger.debug(self.run_cmd())
out, _, _ = call_throws(
self.run_cmd(), desc=self.entrypoint, timeout=timeout)
return out
# type: () -> int
out, err, ret = call_throws([
container_path, 'inspect',
- '--format', '{{.Id}}',
+ '--format', '{{.ID}},{{json .RepoDigests}}',
args.image])
if ret:
return errno.ENOENT
- image_id = normalize_container_id(out.strip())
+ info_from = get_image_info_from_inspect(out.strip(), args.image)
+
ver = CephContainer(args.image, 'ceph', ['--version']).run().strip()
+ info_from['ceph_version'] = ver
+
+ print(json.dumps(info_from, indent=4, sort_keys=True))
+ return 0
+
+
+def get_image_info_from_inspect(out, image):
+ # type: (str, str) -> Dict[str, str]
+ image_id, digests = out.split(',', 1)
+ if not out:
+ raise Error('inspect {}: empty result'.format(image))
r = {
- 'image_id': image_id,
- 'ceph_version': ver,
+ 'image_id': normalize_container_id(image_id)
}
- print(json.dumps(r, indent=4, sort_keys=True))
- return 0
+ if digests:
+ json_digests = json.loads(digests)
+ if json_digests:
+ r['repo_digest'] = json_digests[0]
+ return r
+
##################################
raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
mon_id = args.mon_id or hostname
mgr_id = args.mgr_id or generate_service_id()
- logging.info('Cluster fsid: %s' % fsid)
+ logger.info('Cluster fsid: %s' % fsid)
ipv6 = False
l = FileLock(fsid)
cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force'])
cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force'])
+ if args.container_init:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(args.container_init), '--force'])
+
if not args.skip_dashboard:
# Configure SSL port (cephadm only allows to configure dashboard SSL port)
# if the user does not want to use SSL he can change this setting once the cluster is up
reconfig=args.reconfig,
ports=daemon_ports)
+ elif daemon_type == CustomContainer.daemon_type:
+ cc = CustomContainer.init(args.fsid, daemon_id)
+ if not args.reconfig and not redeploy:
+ daemon_ports.extend(cc.ports)
+ c = get_container(args.fsid, daemon_type, daemon_id,
+ privileged=cc.privileged,
+ ptrace=args.allow_ptrace)
+ deploy_daemon(args.fsid, daemon_type, daemon_id, c,
+ uid=cc.uid, gid=cc.gid, config=None,
+ keyring=None, reconfig=args.reconfig,
+ ports=daemon_ports)
elif daemon_type == CephadmDaemon.daemon_type:
# get current user gid and uid
uid = os.getuid()
uid, gid, ports=daemon_ports)
else:
- raise Error("{} not implemented in command_deploy function".format(daemon_type))
+ raise Error('daemon type {} not implemented in command_deploy function'
+ .format(daemon_type))
##################################
if args.keyring:
mounts[pathify(args.keyring)] = '/etc/ceph/ceph.keyring:z'
if args.mount:
- mount = pathify(args.mount)
- filename = os.path.basename(mount)
- mounts[mount] = '/mnt/{}:z'.format(filename)
+ for _mount in args.mount:
+ split_src_dst = _mount.split(':')
+ mount = pathify(split_src_dst[0])
+ filename = os.path.basename(split_src_dst[0])
+ if len(split_src_dst) > 1:
+ dst = split_src_dst[1] + ':z' if len(split_src_dst) == 3 else split_src_dst[1]
+ mounts[mount] = dst
+ else:
+ mounts[mount] = '/mnt/{}:z'.format(filename)
if args.command:
command = args.command
else:
def command_ls():
# type: () -> None
+
ls = list_daemons(detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
err.startswith('%s, version ' % cmd):
version = err.split(' ')[2]
seen_versions[image_id] = version
+ elif daemon_type == CustomContainer.daemon_type:
+ # Because a custom container can contain
+ # everything, we do not know which command
+ # to execute to get the version.
+ pass
else:
- logging.warning('version for unknown daemon type %s' % daemon_type)
+ logger.warning('version for unknown daemon type %s' % daemon_type)
else:
vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore
try:
'systemd-timesyncd.service',
'ntpd.service', # el7 (at least)
'ntp.service', # 18.04 (at least)
+ 'ntpsec.service', # 20.04 (at least) / buster
]
if not check_units(units, enabler):
logger.warning('No time sync service is running; checked for %s' % units)
def query_shaman(self, distro, distro_version, branch, commit):
# query shaman
- logging.info('Fetching repo metadata from shaman and chacra...')
+ logger.info('Fetching repo metadata from shaman and chacra...')
shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
distro=distro,
distro_version=distro_version,
try:
shaman_response = urlopen(shaman_url)
except HTTPError as err:
- logging.error('repository not found in shaman (might not be available yet)')
+ logger.error('repository not found in shaman (might not be available yet)')
raise Error('%s, failed to fetch %s' % (err, shaman_url))
try:
chacra_url = shaman_response.geturl()
chacra_response = urlopen(chacra_url)
except HTTPError as err:
- logging.error('repository not found in chacra (might not be available yet)')
+ logger.error('repository not found in chacra (might not be available yet)')
raise Error('%s, failed to fetch %s' % (err, chacra_url))
return chacra_response.read().decode('utf-8')
def add_repo(self):
url, name = self.repo_gpgkey()
- logging.info('Installing repo GPG key from %s...' % url)
+ logger.info('Installing repo GPG key from %s...' % url)
try:
response = urlopen(url)
except HTTPError as err:
- logging.error('failed to fetch GPG repo key from %s: %s' % (
+ logger.error('failed to fetch GPG repo key from %s: %s' % (
url, err))
raise Error('failed to fetch GPG key')
key = response.read().decode('utf-8')
content = self.query_shaman(self.distro, self.distro_codename, self.branch,
self.commit)
- logging.info('Installing repo file at %s...' % self.repo_path())
+ logger.info('Installing repo file at %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
for name in ['autobuild', 'release']:
p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
if os.path.exists(p):
- logging.info('Removing repo GPG key %s...' % p)
+ logger.info('Removing repo GPG key %s...' % p)
os.unlink(p)
if os.path.exists(self.repo_path()):
- logging.info('Removing repo at %s...' % self.repo_path())
+ logger.info('Removing repo at %s...' % self.repo_path())
os.unlink(self.repo_path())
def install(self, ls):
- logging.info('Installing packages %s...' % ls)
+ logger.info('Installing packages %s...' % ls)
call_throws(['apt', 'install', '-y'] + ls)
def install_podman(self):
if self.distro == 'ubuntu':
- logging.info('Setting up repo for pdoman...')
+ logger.info('Setting up repo for pdoman...')
self.install(['software-properties-common'])
call_throws(['add-apt-repository', '-y', 'ppa:projectatomic/ppa'])
call_throws(['apt', 'update'])
- logging.info('Attempting podman install...')
+ logger.info('Attempting podman install...')
try:
self.install(['podman'])
except Error as e:
- logging.info('Podman did not work. Falling back to docker...')
+ logger.info('Podman did not work. Falling back to docker...')
self.install(['docker.io'])
self.branch,
self.commit)
- logging.info('Writing repo to %s...' % self.repo_path())
+ logger.info('Writing repo to %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
self.branch,
self.commit)
- logging.info('Writing repo to %s...' % self.repo_path())
+ logger.info('Writing repo to %s...' % self.repo_path())
with open(self.repo_path(), 'w') as f:
f.write(content)
offset,
struct.pack('256s', bytes(ifname[:15], 'utf-8'))
)[20:24])
-
+
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR
except OSError:
# interface does not have an ipv4 address
return ''
-
+
dec_mask = sum([bin(int(i)).count('1')
for i in dq_mask.split('.')])
return '{}/{}'.format(addr, dec_mask)
def bytes_to_human(num, mode='decimal'):
# type: (float, str) -> str
- """Convert a bytes value into it's human-readable form.
-
+ """Convert a bytes value into it's human-readable form.
+
:param num: number, in bytes, to convert
:param mode: Either decimal (default) or binary to determine divisor
:returns: string representing the bytes value in a more readable format
def read_file(path_list, file_name=''):
# type: (List[str], str) -> str
"""Returns the content of the first file found within the `path_list`
-
+
:param path_list: list of file paths to search
:param file_name: optional file_name to be applied to a file path
:returns: content of the file or 'Unknown'
if not os.path.exists(nic_path):
continue
for iface in os.listdir(nic_path):
-
+
lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))]
upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))]
# type: () -> int
"""Determine the memory installed (kb)"""
return self._get_mem_data('MemTotal')
-
+
@property
def memory_free_kb(self):
# type: () -> int
# type: () -> float
"""Return the current time as Epoch seconds"""
return time.time()
-
+
@property
def system_uptime(self):
# type: () -> float
for selinux_path in HostFacts._selinux_path_list:
if os.path.exists(selinux_path):
selinux_config = read_file([selinux_path]).splitlines()
- security['type'] = 'SELinux'
+ security['type'] = 'SELinux'
for line in selinux_config:
if line.strip().startswith('#'):
continue
summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()])
security = {**security, **summary} # type: ignore
security['description'] += "({})".format(summary_str)
-
+
return security
if os.path.exists('/sys/kernel/security/lsm'):
"""Return the attributes of this HostFacts object as json"""
data = {k: getattr(self, k) for k in dir(self)
if not k.startswith('_') and
- isinstance(getattr(self, k),
+ isinstance(getattr(self, k),
(float, int, str, list, dict, tuple))
}
return json.dumps(data, indent=2, sort_keys=True)
'--force-start',
action='store_true',
help="start newly adoped daemon, even if it wasn't running previously")
+ parser_adopt.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_rm_daemon = subparsers.add_parser(
'rm-daemon', help='remove daemon instance')
help='ceph.keyring to pass through to the container')
parser_shell.add_argument(
'--mount', '-m',
- help='mount a file or directory under /mnt in the container')
+ help=("mount a file or directory in the container. "
+ "Support multiple mounts. "
+ "ie: `--mount /foo /bar:/bar`. "
+ "When no destination is passed, default is /mnt"),
+ nargs='+')
parser_shell.add_argument(
'--env', '-e',
action='append',
parser_bootstrap.add_argument(
'--registry-json',
help='json file with custom registry login info (URL, Username, Password)')
+ parser_bootstrap.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_deploy = subparsers.add_parser(
'deploy', help='deploy a daemon')
'--allow-ptrace',
action='store_true',
help='Allow SYS_PTRACE on daemon container')
+ parser_deploy.add_argument(
+ '--container-init',
+ action='store_true',
+ help='Run podman/docker with `--init`')
parser_check_host = subparsers.add_parser(
'check-host', help='check host configuration')
if __name__ == "__main__":
+
+ # Logger configuration
+ if not os.path.exists(LOG_DIR):
+ os.makedirs(LOG_DIR)
+ dictConfig(logging_config)
+ logger = logging.getLogger()
+
# allow argv to be injected
try:
av = injected_argv # type: ignore
except NameError:
av = sys.argv[1:]
+ logger.debug("%s\ncephadm %s" % ("-" * 80, av))
args = _parse_args(av)
+ # More verbose console output
if args.verbose:
- logging.basicConfig(level=logging.DEBUG)
- else:
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger('cephadm')
+ for handler in logger.handlers:
+ if handler.name == "console":
+ handler.setLevel(logging.DEBUG)
# root?
if os.geteuid() != 0: