# Default container images -----------------------------------------------------
DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-pacific-devel'
DEFAULT_IMAGE_IS_MASTER = False
-DEFAULT_PROMETHEUS_IMAGE = "docker.io/prom/prometheus:v2.18.1"
-DEFAULT_NODE_EXPORTER_IMAGE = "docker.io/prom/node-exporter:v0.18.1"
-DEFAULT_GRAFANA_IMAGE = "docker.io/ceph/ceph-grafana:6.7.4"
-DEFAULT_ALERT_MANAGER_IMAGE = "docker.io/prom/alertmanager:v0.20.0"
+DEFAULT_PROMETHEUS_IMAGE = 'docker.io/prom/prometheus:v2.18.1'
+DEFAULT_NODE_EXPORTER_IMAGE = 'docker.io/prom/node-exporter:v0.18.1'
+DEFAULT_GRAFANA_IMAGE = 'docker.io/ceph/ceph-grafana:6.7.4'
+DEFAULT_ALERT_MANAGER_IMAGE = 'docker.io/prom/alertmanager:v0.20.0'
# ------------------------------------------------------------------------------
LATEST_STABLE_RELEASE = 'pacific'
class BaseConfig:
def __init__(self):
- self.image: str = ""
+ self.image: str = ''
self.docker: bool = False
self.data_dir: str = DATA_DIR
self.log_dir: str = LOG_DIR
self.memory_limit: Optional[int] = None
self.container_init: bool = CONTAINER_INIT
- self.container_path: str = ""
+ self.container_path: str = ''
def set_from_args(self, args: argparse.Namespace):
argdict: Dict[str, Any] = vars(args)
class CephadmContext:
def __init__(self):
- self.__dict__["_args"] = None
- self.__dict__["_conf"] = BaseConfig()
+ self.__dict__['_args'] = None
+ self.__dict__['_conf'] = BaseConfig()
def set_args(self, args: argparse.Namespace) -> None:
self._conf.set_from_args(args)
self._args = args
def has_function(self) -> bool:
- return "func" in self._args
+ return 'func' in self._args
def __contains__(self, name: str) -> bool:
return hasattr(self, name)
def __getattr__(self, name: str) -> Any:
- if "_conf" in self.__dict__ and hasattr(self._conf, name):
+ if '_conf' in self.__dict__ and hasattr(self._conf, name):
return getattr(self._conf, name)
- elif "_args" in self.__dict__ and hasattr(self._args, name):
+ elif '_args' in self.__dict__ and hasattr(self._args, name):
return getattr(self._args, name)
else:
return super().__getattribute__(name)
"""Define the configs for the monitoring containers"""
port_map = {
- "prometheus": [9095], # Avoid default 9090, due to conflict with cockpit UI
- "node-exporter": [9100],
- "grafana": [3000],
- "alertmanager": [9093, 9094],
+ 'prometheus': [9095], # Avoid default 9090, due to conflict with cockpit UI
+ 'node-exporter': [9100],
+ 'grafana': [3000],
+ 'alertmanager': [9093, 9094],
}
components = {
- "prometheus": {
- "image": DEFAULT_PROMETHEUS_IMAGE,
- "cpus": '2',
- "memory": '4GB',
- "args": [
- "--config.file=/etc/prometheus/prometheus.yml",
- "--storage.tsdb.path=/prometheus",
- "--web.listen-address=:{}".format(port_map['prometheus'][0]),
+ 'prometheus': {
+ 'image': DEFAULT_PROMETHEUS_IMAGE,
+ 'cpus': '2',
+ 'memory': '4GB',
+ 'args': [
+ '--config.file=/etc/prometheus/prometheus.yml',
+ '--storage.tsdb.path=/prometheus',
+ '--web.listen-address=:{}'.format(port_map['prometheus'][0]),
],
- "config-json-files": [
- "prometheus.yml",
+ 'config-json-files': [
+ 'prometheus.yml',
],
},
- "node-exporter": {
- "image": DEFAULT_NODE_EXPORTER_IMAGE,
- "cpus": "1",
- "memory": "1GB",
- "args": [
- "--no-collector.timex",
+ 'node-exporter': {
+ 'image': DEFAULT_NODE_EXPORTER_IMAGE,
+ 'cpus': '1',
+ 'memory': '1GB',
+ 'args': [
+ '--no-collector.timex',
],
},
- "grafana": {
- "image": DEFAULT_GRAFANA_IMAGE,
- "cpus": "2",
- "memory": "4GB",
- "args": [],
- "config-json-files": [
- "grafana.ini",
- "provisioning/datasources/ceph-dashboard.yml",
- "certs/cert_file",
- "certs/cert_key",
+ 'grafana': {
+ 'image': DEFAULT_GRAFANA_IMAGE,
+ 'cpus': '2',
+ 'memory': '4GB',
+ 'args': [],
+ 'config-json-files': [
+ 'grafana.ini',
+ 'provisioning/datasources/ceph-dashboard.yml',
+ 'certs/cert_file',
+ 'certs/cert_key',
],
},
- "alertmanager": {
- "image": DEFAULT_ALERT_MANAGER_IMAGE,
- "cpus": "2",
- "memory": "2GB",
- "args": [
- "--web.listen-address=:{}".format(port_map['alertmanager'][0]),
- "--cluster.listen-address=:{}".format(port_map['alertmanager'][1]),
+ 'alertmanager': {
+ 'image': DEFAULT_ALERT_MANAGER_IMAGE,
+ 'cpus': '2',
+ 'memory': '2GB',
+ 'args': [
+ '--web.listen-address=:{}'.format(port_map['alertmanager'][0]),
+ '--cluster.listen-address=:{}'.format(port_map['alertmanager'][1]),
],
- "config-json-files": [
- "alertmanager.yml",
+ 'config-json-files': [
+ 'alertmanager.yml',
],
- "config-json-args": [
- "peers",
+ 'config-json-args': [
+ 'peers',
],
},
} # type: ignore
required_files = ['ganesha.conf']
port_map = {
- "nfs": 2049,
+ 'nfs': 2049,
}
def __init__(self,
# type: (str, bool) -> List[str]
mount_path = os.path.join(data_dir, 'configfs')
if mount:
- cmd = "if ! grep -qs {0} /proc/mounts; then " \
- "mount -t configfs none {0}; fi".format(mount_path)
+ cmd = 'if ! grep -qs {0} /proc/mounts; then ' \
+ 'mount -t configfs none {0}; fi'.format(mount_path)
else:
- cmd = "if grep -qs {0} /proc/mounts; then " \
- "umount {0}; fi".format(mount_path)
+ cmd = 'if grep -qs {0} /proc/mounts; then ' \
+ 'umount {0}; fi'.format(mount_path)
return cmd.split()
def get_tcmu_runner_container(self):
# type: () -> CephContainer
tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, self.daemon_id)
- tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
+ tcmu_container.entrypoint = '/usr/bin/tcmu-runner'
tcmu_container.cname = self.get_container_name(desc='tcmu')
# remove extra container args for tcmu container.
# extra args could cause issue with forking service type
threads = [thread for thread in list(self._threads.values())
if thread.is_alive()]
if threads:
- _warn(f"{self.__class__} has registered but not finished child processes",
+ _warn(f'{self.__class__} has registered but not finished child processes',
ResourceWarning,
source=self)
def add_child_handler(self, pid, callback, *args):
loop = events.get_event_loop()
thread = threading.Thread(target=self._do_waitpid,
- name=f"waitpid-{next(self._pid_counter)}",
+ name=f'waitpid-{next(self._pid_counter)}',
args=(loop, pid, callback, args),
daemon=True)
self._threads[pid] = thread
pid = expected_pid
returncode = 255
logger.warning(
- "Unknown child process pid %d, will report returncode 255",
+ 'Unknown child process pid %d, will report returncode 255',
pid)
else:
if os.WIFEXITED(status):
expected_pid, returncode)
if loop.is_closed():
- logger.warning("Loop %r that handles pid %r is closed", loop, pid)
+ logger.warning('Loop %r that handles pid %r is closed', loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
prefix += ': '
timeout = timeout or ctx.timeout
- logger.debug("Running command: %s" % ' '.join(command))
+ logger.debug('Running command: %s' % ' '.join(command))
async def tee(reader: asyncio.StreamReader) -> str:
collected = StringIO()
if not is_fsid(daemon['fsid']):
# 'unknown' fsid
continue
- elif "name" not in ctx or not ctx.name:
+ elif 'name' not in ctx or not ctx.name:
# ctx.name not specified
fsids_set.add(daemon['fsid'])
elif daemon['name'] == ctx.name:
@wraps(func)
def _default_image(ctx: CephadmContext):
if not ctx.image:
- if "name" in ctx and ctx.name:
+ if 'name' in ctx and ctx.name:
type_ = ctx.name.split('.', 1)[0]
if type_ in Monitoring.components:
ctx.image = Monitoring.components[type_]['image']
path = os.environ.get('PATH', None)
if path is None:
try:
- path = os.confstr("CS_PATH")
+ path = os.confstr('CS_PATH')
except (AttributeError, ValueError):
# os.confstr() or CS_PATH is not available
path = os.defpath
if code == 0:
enabled = True
installed = True
- elif "disabled" in out:
+ elif 'disabled' in out:
installed = True
except Exception as e:
logger.warning('unable to run systemctl: %s' % e)
config = get_parm(ctx.config_json)
peers = config.get('peers', list()) # type: ignore
for peer in peers:
- r += ["--cluster.peer={}".format(peer)]
+ r += ['--cluster.peer={}'.format(peer)]
# some alertmanager, by default, look elsewhere for a config
- r += ["--config.file=/etc/alertmanager/alertmanager.yml"]
+ r += ['--config.file=/etc/alertmanager/alertmanager.yml']
elif daemon_type == NFSGanesha.daemon_type:
nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
with open(option, 'r') as f:
j = f.read()
else:
- raise Error("Config file {} not found".format(option))
+ raise Error('Config file {} not found'.format(option))
try:
js = json.loads(j)
except ValueError as e:
- raise Error("Invalid JSON in {}: {}".format(option, e))
+ raise Error('Invalid JSON in {}: {}'.format(option, e))
else:
return js
config = None
keyring = None
- if "config_json" in ctx and ctx.config_json:
+ if 'config_json' in ctx and ctx.config_json:
d = get_parm(ctx.config_json)
config = d.get('config')
keyring = d.get('keyring')
- if "config" in ctx and ctx.config:
+ if 'config' in ctx and ctx.config:
with open(ctx.config, 'r') as f:
config = f.read()
- if "key" in ctx and ctx.key:
+ if 'key' in ctx and ctx.key:
keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key)
- elif "keyring" in ctx and ctx.keyring:
+ elif 'keyring' in ctx and ctx.keyring:
with open(ctx.keyring, 'r') as f:
keyring = f.read()
mounts['/proc'] = '/host/proc:ro'
mounts['/sys'] = '/host/sys:ro'
mounts['/'] = '/rootfs:ro'
- elif daemon_type == "grafana":
+ elif daemon_type == 'grafana':
mounts[os.path.join(data_dir, 'etc/grafana/grafana.ini')] = '/etc/grafana/grafana.ini:Z'
mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
ports = ports or []
if any([port_in_use(ctx, port) for port in ports]):
- raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type))
+ raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type))
data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
if reconfig and not os.path.exists(data_dir):
deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
c, osd_fsid=osd_fsid)
else:
- raise RuntimeError("attempting to deploy a daemon without a container image")
+ raise RuntimeError('attempting to deploy a daemon without a container image')
if not os.path.exists(data_dir + '/unit.created'):
with open(data_dir + '/unit.created', 'w') as f:
'memory_request': int(ctx.memory_request) if ctx.memory_request else None,
'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
})
- metaf.write(json.dumps(meta, indent=4) + "\n")
+ metaf.write(json.dumps(meta, indent=4) + '\n')
os.fchmod(f.fileno(), 0o600)
os.fchmod(metaf.fileno(), 0o600)
if not enabled:
logger.debug('firewalld.service is not enabled')
return False
- if state != "running":
+ if state != 'running':
logger.debug('firewalld.service is not running')
return False
- logger.info("firewalld ready")
+ logger.info('firewalld ready')
return True
def enable_service_for(self, daemon_type):
return
if not self.cmd:
- raise RuntimeError("command not defined")
+ raise RuntimeError('command not defined')
out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
if ret:
return
if not self.cmd:
- raise RuntimeError("command not defined")
+ raise RuntimeError('command not defined')
for port in fw_ports:
tcp_port = str(port) + '/tcp'
return
if not self.cmd:
- raise RuntimeError("command not defined")
+ raise RuntimeError('command not defined')
for port in fw_ports:
tcp_port = str(port) + '/tcp'
raise RuntimeError('unable to remove port %s from current zone: %s' %
(tcp_port, err))
else:
- logger.info(f"Port {tcp_port} disabled")
+ logger.info(f'Port {tcp_port} disabled')
else:
- logger.info(f"firewalld port {tcp_port} already closed")
+ logger.info(f'firewalld port {tcp_port} already closed')
def apply_rules(self):
# type: () -> None
return
if not self.cmd:
- raise RuntimeError("command not defined")
+ raise RuntimeError('command not defined')
call_throws(self.ctx, [self.cmd, '--reload'])
compress
sharedscripts
postrotate
- killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror || pkill -1 -x "ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror" || true
+ killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror || pkill -1 -x 'ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror' || true
endscript
missingok
notifempty
logger.info('Pulling container image %s...' % image)
ignorelist = [
- "error creating read-write layer with ID",
- "net/http: TLS handshake timeout",
- "Digest did not match, expected",
+ 'error creating read-write layer with ID',
+ 'net/http: TLS handshake timeout',
+ 'Digest did not match, expected',
]
cmd = [ctx.container_path, 'pull', image]
# ensure the format of the string is as expected address/netmask
if not re.search(r'\/\d+$', subnet):
rc = 1
- errors.append(f"{subnet} is not in CIDR format (address/netmask)")
+ errors.append(f'{subnet} is not in CIDR format (address/netmask)')
continue
try:
v = ipaddress.ip_network(subnet).version
versions.add(v)
except ValueError as e:
rc = 1
- errors.append(f"{subnet} invalid: {str(e)}")
+ errors.append(f'{subnet} invalid: {str(e)}')
- return rc, list(versions), ", ".join(errors)
+ return rc, list(versions), ', '.join(errors)
def unwrap_ipv6(address):
# the ValueError
try:
if ipaddress.ip_address(address).version == 6:
- return f"[{address}]"
+ return f'[{address}]'
except ValueError:
pass
try:
return ipaddress.ip_address(address).version == 6
except ValueError:
- logger.warning("Address: {} isn't a valid IP address".format(address))
+ logger.warning('Address: {} is not a valid IP address'.format(address))
return False
ctx: CephadmContext
) -> Tuple[str, bool, Optional[str]]:
r = re.compile(r':(\d+)$')
- base_ip = ""
+ base_ip = ''
ipv6 = False
if ctx.mon_ip:
def prepare_cluster_network(ctx: CephadmContext) -> Tuple[str, bool]:
- cluster_network = ""
+ cluster_network = ''
ipv6_cluster_network = False
# the cluster network may not exist on this node, so all we can do is
# validate that the address given is valid ipv4 or ipv6 subnet
if ctx.cluster_network:
rc, versions, err_msg = check_subnet(ctx.cluster_network)
if rc:
- raise Error(f"Invalid --cluster-network parameter: {err_msg}")
+ raise Error(f'Invalid --cluster-network parameter: {err_msg}')
cluster_network = ctx.cluster_network
ipv6_cluster_network = True if 6 in versions else False
else:
- logger.info("- internal network (--cluster-network) has not "
- "been provided, OSD replication will default to "
- "the public_network")
+ logger.info('- internal network (--cluster-network) has not '
+ 'been provided, OSD replication will default to '
+ 'the public_network')
return cluster_network, ipv6_cluster_network
monmap.name: '/tmp/monmap:z',
},
).run()
- logger.debug(f"monmaptool for {mon_id} {mon_addr} on {out}")
+ logger.debug(f'monmaptool for {mon_id} {mon_addr} on {out}')
# pass monmap file to ceph user for use by ceph-mon --mkfs below
os.fchown(monmap.fileno(), uid, gid)
monmap_path: '/tmp/monmap:z',
},
).run()
- logger.debug(f"create mon.{mon_id} on {out}")
+ logger.debug(f'create mon.{mon_id} on {out}')
return (mon_dir, log_dir)
fsid: str, mon_id: str
) -> None:
mon_c = get_container(ctx, fsid, 'mon', mon_id)
- ctx.meta_json = json.dumps({"service_name": "mon"})
+ ctx.meta_json = json.dumps({'service_name': 'mon'})
deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid,
config=None, keyring=None)
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
# Note:the default port used by the Prometheus node exporter is opened in fw
- ctx.meta_json = json.dumps({"service_name": "mgr"})
+ ctx.meta_json = json.dumps({'service_name': 'mgr'})
deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
config=config, keyring=mgr_keyring, ports=[9283])
# Configure SSL port (cephadm only allows to configure dashboard SSL port)
# if the user does not want to use SSL he can change this setting once the cluster is up
- cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port", str(ctx.ssl_dashboard_port)])
+ cli(['config', 'set', 'mgr', 'mgr/dashboard/ssl_server_port', str(ctx.ssl_dashboard_port)])
# configuring dashboard parameters
logger.info('Enabling the dashboard module...')
])
if mon_network:
- logger.info(f"Setting mon public_network to {mon_network}")
+ logger.info(f'Setting mon public_network to {mon_network}')
cli(['config', 'set', 'mon', 'public_network', mon_network])
if cluster_network:
- logger.info(f"Setting cluster_network to {cluster_network}")
+ logger.info(f'Setting cluster_network to {cluster_network}')
cli(['config', 'set', 'global', 'cluster_network', cluster_network])
if ipv6 or ipv6_cluster_network:
dirname = os.path.dirname(f)
if dirname and not os.path.exists(dirname):
fname = os.path.basename(f)
- logger.info(f"Creating directory {dirname} for {fname}")
+ logger.info(f'Creating directory {dirname} for {fname}')
try:
# use makedirs to create intermediate missing dirs
os.makedirs(dirname, 0o755)
except PermissionError:
- raise Error(f"Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.")
+ raise Error(f'Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.')
if not ctx.skip_prepare_host:
command_prepare_host(ctx)
if ctx.with_exporter:
cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true'])
if ctx.exporter_config:
- logger.info("Applying custom cephadm exporter settings")
+ logger.info('Applying custom cephadm exporter settings')
# validated within the parser, so we can just apply to the store
with tempfile.NamedTemporaryFile(buffering=0) as tmp:
tmp.write(json.dumps(ctx.exporter_config).encode('utf-8'))
mounts = {
- tmp.name: "/tmp/exporter-config.json:z"
+ tmp.name: '/tmp/exporter-config.json:z'
}
- cli(["cephadm", "set-exporter-config", "-i", "/tmp/exporter-config.json"], extra_mounts=mounts)
- logger.info("-> Use ceph orch apply cephadm-exporter to deploy")
+ cli(['cephadm', 'set-exporter-config', '-i', '/tmp/exporter-config.json'], extra_mounts=mounts)
+ logger.info('-> Use ceph orch apply cephadm-exporter to deploy')
else:
# generate a default SSL configuration for the exporter(s)
- logger.info("Generating a default cephadm exporter configuration (self-signed)")
+ logger.info('Generating a default cephadm exporter configuration (self-signed)')
cli(['cephadm', 'generate-exporter-config'])
#
# deploy the service (commented out until the cephadm changes are in the ceph container build)
def command_registry_login(ctx: CephadmContext):
if ctx.registry_json:
- logger.info("Pulling custom registry login info from %s." % ctx.registry_json)
+ logger.info('Pulling custom registry login info from %s.' % ctx.registry_json)
d = get_parm(ctx.registry_json)
if d.get('url') and d.get('username') and d.get('password'):
ctx.registry_url = d.get('url')
ctx.registry_password = d.get('password')
registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password)
else:
- raise Error("json provided for custom registry login did not include all necessary fields. "
- "Please setup json file as\n"
- "{\n"
- " \"url\": \"REGISTRY_URL\",\n"
- " \"username\": \"REGISTRY_USERNAME\",\n"
- " \"password\": \"REGISTRY_PASSWORD\"\n"
- "}\n")
+ raise Error('json provided for custom registry login did not include all necessary fields. '
+ 'Please setup json file as\n'
+ '{\n'
+ ' "url": "REGISTRY_URL",\n'
+ ' "username": "REGISTRY_USERNAME",\n'
+ ' "password": "REGISTRY_PASSWORD"\n'
+ '}\n')
elif ctx.registry_url and ctx.registry_username and ctx.registry_password:
registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password)
else:
- raise Error("Invalid custom registry arguments received. To login to a custom registry include "
- "--registry-url, --registry-username and --registry-password "
- "options or --registry-json option")
+ raise Error('Invalid custom registry arguments received. To login to a custom registry include '
+ '--registry-url, --registry-username and --registry-password '
+ 'options or --registry-json option')
return 0
def registry_login(ctx: CephadmContext, url, username, password):
- logger.info("Logging into custom registry.")
+ logger.info('Logging into custom registry.')
try:
container_path = ctx.container_path
cmd = [container_path, 'login',
if 'podman' in container_path:
os.chmod('/etc/ceph/podman-auth.json', 0o600)
except Exception:
- raise Error("Failed to login to custom registry @ %s as %s with given password" % (ctx.registry_url, ctx.registry_username))
+ raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
##################################
elif daemon_type == 'alertmanager':
uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
- raise Error("{} not implemented yet".format(daemon_type))
+ raise Error('{} not implemented yet'.format(daemon_type))
return uid, gid
required_args = Monitoring.components[daemon_type].get('config-json-args', list())
if required_files:
if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore
- raise Error("{} deployment requires config-json which must "
- "contain file content for {}".format(daemon_type.capitalize(), ', '.join(required_files)))
+ raise Error('{} deployment requires config-json which must '
+ 'contain file content for {}'.format(daemon_type.capitalize(), ', '.join(required_files)))
if required_args:
if not config or not all(c in config.keys() for c in required_args): # type: ignore
- raise Error("{} deployment requires config-json which must "
- "contain arg for {}".format(daemon_type.capitalize(), ', '.join(required_args)))
+ raise Error('{} deployment requires config-json which must '
+ 'contain arg for {}'.format(daemon_type.capitalize(), ', '.join(required_args)))
uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
gid = os.getgid()
config_js = get_parm(ctx.config_json) # type: Dict[str, str]
if not daemon_ports:
- logger.info("cephadm-exporter will use default port ({})".format(CephadmDaemon.default_port))
+ logger.info('cephadm-exporter will use default port ({})'.format(CephadmDaemon.default_port))
daemon_ports = [CephadmDaemon.default_port]
CephadmDaemon.validate_config(config_js)
container_args += [
'-it',
'-e', 'LANG=C',
- '-e', "PS1=%s" % CUSTOM_PS1,
+ '-e', 'PS1=%s' % CUSTOM_PS1,
]
if ctx.fsid:
home = os.path.join(ctx.data_dir, ctx.fsid, 'home')
container_args += [
'-it',
'-e', 'LANG=C',
- '-e', "PS1=%s" % CUSTOM_PS1,
+ '-e', 'PS1=%s' % CUSTOM_PS1,
]
c = CephContainer(
ctx,
# call this directly, without our wrapper, so that we get an unmolested
# stdout with logger prefixing.
- logger.debug("Running command: %s" % ' '.join(cmd))
+ logger.debug('Running command: %s' % ' '.join(cmd))
subprocess.call(cmd) # type: ignore
##################################
try:
with open(path, 'r') as f:
osd_fsid = f.read().strip()
- logger.info("Found online OSD at %s" % path)
+ logger.info('Found online OSD at %s' % path)
except IOError:
logger.info('Unable to read OSD fsid from %s' % path)
if os.path.exists(os.path.join(self.osd_data_dir, 'type')):
try:
js = json.loads(out)
if self.osd_id in js:
- logger.info("Found offline LVM OSD {}".format(self.osd_id))
+ logger.info('Found offline LVM OSD {}'.format(self.osd_id))
osd_fsid = js[self.osd_id][0]['tags']['ceph.osd_fsid']
for device in js[self.osd_id]:
if device['tags']['ceph.type'] == 'block':
osd_type = 'filestore'
break
except ValueError as e:
- logger.info("Invalid JSON in ceph-volume lvm list: {}".format(e))
+ logger.info('Invalid JSON in ceph-volume lvm list: {}'.format(e))
return osd_fsid, osd_type
# type: () -> Tuple[Optional[str], Optional[str]]
osd_fsid, osd_type = None, None
- osd_file = glob("/etc/ceph/osd/{}-[a-f0-9-]*.json".format(self.osd_id))
+ osd_file = glob('/etc/ceph/osd/{}-[a-f0-9-]*.json'.format(self.osd_id))
if len(osd_file) == 1:
with open(osd_file[0], 'r') as f:
try:
js = json.loads(f.read())
- logger.info("Found offline simple OSD {}".format(self.osd_id))
- osd_fsid = js["fsid"]
- osd_type = js["type"]
- if osd_type != "filestore":
+ logger.info('Found offline simple OSD {}'.format(self.osd_id))
+ osd_fsid = js['fsid']
+ osd_type = js['type']
+ if osd_type != 'filestore':
# need this to be mounted for the adopt to work, as it
# needs to move files from this directory
- call_throws(self.ctx, ['mount', js["data"]["path"], self.osd_data_dir])
+ call_throws(self.ctx, ['mount', js['data']['path'], self.osd_data_dir])
except ValueError as e:
- logger.info("Invalid JSON in {}: {}".format(osd_file, e))
+ logger.info('Invalid JSON in {}: {}'.format(osd_file, e))
return osd_fsid, osd_type
if not os.path.exists(data_dir_src):
raise Error("{}.{} data directory '{}' does not exist. "
- "Incorrect ID specified, or daemon alrady adopted?".format(
+ 'Incorrect ID specified, or daemon alrady adopted?'.format(
daemon_type, daemon_id, data_dir_src))
osd_fsid = None
_adjust_grafana_ini(os.path.join(config_dst, 'grafana.ini'))
else:
- logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key))
+ logger.debug('Skipping ssl, missing cert {} or key {}'.format(cert, key))
# data - possible custom dashboards/plugins
data_src = '/var/lib/grafana/'
# Update cert_file, cert_key pathnames in server section
# ConfigParser does not preserve comments
try:
- with open(filename, "r") as grafana_ini:
+ with open(filename, 'r') as grafana_ini:
lines = grafana_ini.readlines()
- with open("{}.new".format(filename), "w") as grafana_ini:
+ with open('{}.new'.format(filename), 'w') as grafana_ini:
server_section = False
for line in lines:
if line.startswith('['):
line = re.sub(r'^cert_key.*',
'cert_key = /etc/grafana/certs/cert_key', line)
grafana_ini.write(line)
- os.rename("{}.new".format(filename), filename)
+ os.rename('{}.new'.format(filename), filename)
except OSError as err:
- raise Error("Cannot update {}: {}".format(filename, err))
+ raise Error('Cannot update {}: {}'.format(filename, err))
def _stop_and_disable(ctx, unit_name):
if not check_time_sync(ctx):
errors.append('No time synchronization is active')
- if "expect_hostname" in ctx and ctx.expect_hostname:
+ if 'expect_hostname' in ctx and ctx.expect_hostname:
if get_hostname().lower() != ctx.expect_hostname.lower():
errors.append('hostname "%s" does not match expected hostname "%s"' % (
get_hostname(), ctx.expect_hostname))
# the service
check_time_sync(ctx, enabler=pkg)
- if "expect_hostname" in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname():
+ if 'expect_hostname' in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname():
logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), ctx.expect_hostname))
call_throws(ctx, ['hostname', ctx.expect_hostname])
with open('/etc/hostname', 'w') as f:
(daemon_type, daemon_id) = values.split('.', 1)
except ValueError:
raise argparse.ArgumentError(self,
- "must be of the format <type>.<id>. For example, osd.1 or prometheus.myhost.com")
+ 'must be of the format <type>.<id>. For example, osd.1 or prometheus.myhost.com')
daemons = get_supported_daemons()
if daemon_type not in daemons:
raise argparse.ArgumentError(self,
- "name must declare the type of daemon e.g. "
- "{}".format(', '.join(daemons)))
+ 'name must declare the type of daemon e.g. '
+ '{}'.format(', '.join(daemons)))
def __call__(self, parser, namespace, values, option_string=None):
- if self.dest == "name":
+ if self.dest == 'name':
self._check_name(values)
setattr(namespace, self.dest, values)
elif self.dest == 'exporter_config':
field = iface_setting.split()
if field[-1] == ifname:
ipv6_raw = field[0]
- ipv6_fmtd = ":".join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)])
+ ipv6_fmtd = ':'.join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)])
# apply naming rules using ipaddress module
ipv6 = ipaddress.ip_address(ipv6_fmtd)
- return "{}/{}".format(str(ipv6), int('0x{}'.format(field[2]), 16))
+ return '{}/{}'.format(str(ipv6), int('0x{}'.format(field[2]), 16))
return ''
"""
unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
divisor = 1000.0
- yotta = "YB"
+ yotta = 'YB'
if mode == 'binary':
unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
divisor = 1024.0
- yotta = "YiB"
+ yotta = 'YiB'
for unit in unit_list:
if abs(num) < divisor:
- return "%3.1f%s" % (num, unit)
+ return '%3.1f%s' % (num, unit)
num /= divisor
- return "%.1f%s" % (num, yotta)
+ return '%.1f%s' % (num, yotta)
def read_file(path_list, file_name=''):
except OSError:
# sysfs may populate the file, but for devices like
# virtio reads can fail
- return "Unknown"
+ return 'Unknown'
else:
return content
- return "Unknown"
+ return 'Unknown'
##################################
_selinux_path_list = ['/etc/selinux/config']
_apparmor_path_list = ['/etc/apparmor']
_disk_vendor_workarounds = {
- "0x1af4": "Virtio Block Device"
+ '0x1af4': 'Virtio Block Device'
}
def __init__(self, ctx: CephadmContext):
for line in output:
field = [f.strip() for f in line.split(':')]
- if "model name" in line:
+ if 'model name' in line:
self.cpu_model = field[1]
- if "physical id" in line:
+ if 'physical id' in line:
cpu_set.add(field[1])
- if "siblings" in line:
+ if 'siblings' in line:
self.cpu_threads = int(field[1].strip())
- if "cpu cores" in line:
+ if 'cpu cores' in line:
self.cpu_cores = int(field[1].strip())
pass
self.cpu_count = len(cpu_set)
rel_dict = dict()
for line in os_release:
- if "=" in line:
+ if '=' in line:
var_name, var_value = line.split('=')
rel_dict[var_name] = var_value.strip('"')
# Would normally use PRETTY_NAME, but NAME and VERSION are more
# consistent
- if all(_v in rel_dict for _v in ["NAME", "VERSION"]):
- rel_str = "{} {}".format(rel_dict['NAME'], rel_dict['VERSION'])
+ if all(_v in rel_dict for _v in ['NAME', 'VERSION']):
+ rel_str = '{} {}'.format(rel_dict['NAME'], rel_dict['VERSION'])
return rel_str
@property
if os.path.exists(entitlements_dir):
pems = glob('{}/*.pem'.format(entitlements_dir))
if len(pems) >= 2:
- return "Yes"
+ return 'Yes'
- return "No"
+ return 'No'
os_name = self.operating_system
- if os_name.upper().startswith("RED HAT"):
+ if os_name.upper().startswith('RED HAT'):
return _red_hat()
- return "Unknown"
+ return 'Unknown'
@property
def hdd_count(self):
disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
disk_size_bytes = self._get_capacity(dev)
disk_list.append({
- "description": "{} {} ({})".format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
- "vendor": disk_vendor,
- "model": disk_model,
- "rev": disk_rev,
- "wwid": disk_wwid,
- "dev_name": dev,
- "disk_size_bytes": disk_size_bytes,
+ 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
+ 'vendor': disk_vendor,
+ 'model': disk_model,
+ 'rev': disk_rev,
+ 'wwid': disk_wwid,
+ 'dev_name': dev,
+ 'disk_size_bytes': disk_size_bytes,
})
return disk_list
raw = read_file(['/proc/loadavg']).strip()
data = raw.split()
return {
- "1min": float(data[0]),
- "5min": float(data[1]),
- "15min": float(data[2]),
+ '1min': float(data[0]),
+ '5min': float(data[1]),
+ '15min': float(data[2]),
}
@property
"""Look at the NIC devices and extract network related metadata"""
# from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h
hw_lookup = {
- "1": "ethernet",
- "32": "infiniband",
- "772": "loopback",
+ '1': 'ethernet',
+ '32': 'infiniband',
+ '772': 'loopback',
}
for nic_path in HostFacts._nic_path_list:
continue
for iface in os.listdir(nic_path):
- lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))]
- upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))]
+ lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
+ upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
try:
mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
speed = -1
if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
- nic_type = "bridge"
+ nic_type = 'bridge'
elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
- nic_type = "bonding"
+ nic_type = 'bonding'
else:
- nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), "Unknown")
+ nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
dev_link = os.path.join(nic_path, iface, 'device')
if os.path.exists(dev_link):
driver = ''
self.interfaces[iface] = {
- "mtu": mtu,
- "upper_devs_list": upper_devs_list,
- "lower_devs_list": lower_devs_list,
- "operstate": operstate,
- "iftype": iftype,
- "nic_type": nic_type,
- "driver": driver,
- "speed": speed,
- "ipv4_address": get_ipv4_address(iface),
- "ipv6_address": get_ipv6_address(iface),
+ 'mtu': mtu,
+ 'upper_devs_list': upper_devs_list,
+ 'lower_devs_list': lower_devs_list,
+ 'operstate': operstate,
+ 'iftype': iftype,
+ 'nic_type': nic_type,
+ 'driver': driver,
+ 'speed': speed,
+ 'ipv4_address': get_ipv4_address(iface),
+ 'ipv6_address': get_ipv6_address(iface),
}
@property
"""Return a total count of all physical NICs detected in the host"""
phys_devs = []
for iface in self.interfaces:
- if self.interfaces[iface]["iftype"] == 'physical':
+ if self.interfaces[iface]['iftype'] == 'physical':
phys_devs.append(iface)
return len(phys_devs)
def vendor(self):
# type: () -> str
"""Determine server vendor from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, "sys_vendor")
+ return read_file(HostFacts._dmi_path_list, 'sys_vendor')
@property
def model(self):
# type: () -> str
"""Determine server model information from DMI data in sysfs"""
- family = read_file(HostFacts._dmi_path_list, "product_family")
- product = read_file(HostFacts._dmi_path_list, "product_name")
+ family = read_file(HostFacts._dmi_path_list, 'product_family')
+ product = read_file(HostFacts._dmi_path_list, 'product_name')
if family == 'Unknown' and product:
- return "{}".format(product)
+ return '{}'.format(product)
- return "{} ({})".format(family, product)
+ return '{} ({})'.format(family, product)
@property
def bios_version(self):
# type: () -> str
"""Determine server BIOS version from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, "bios_version")
+ return read_file(HostFacts._dmi_path_list, 'bios_version')
@property
def bios_date(self):
# type: () -> str
"""Determine server BIOS date from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, "bios_date")
+ return read_file(HostFacts._dmi_path_list, 'bios_date')
@property
def timestamp(self):
continue
k, v = line.split('=')
security[k] = v
- if security['SELINUX'].lower() == "disabled":
- security['description'] = "SELinux: Disabled"
+ if security['SELINUX'].lower() == 'disabled':
+ security['description'] = 'SELinux: Disabled'
else:
- security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE'])
+ security['description'] = 'SELinux: Enabled({}, {})'.format(security['SELINUX'], security['SELINUXTYPE'])
return security
return {}
security = {}
for apparmor_path in HostFacts._apparmor_path_list:
if os.path.exists(apparmor_path):
- security['type'] = "AppArmor"
- security['description'] = "AppArmor: Enabled"
+ security['type'] = 'AppArmor'
+ security['description'] = 'AppArmor: Enabled'
try:
profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
except OSError:
summary[mode] += 1
else:
summary[mode] = 0
- summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()])
+ summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()])
security = {**security, **summary} # type: ignore
- security['description'] += "({})".format(summary_str)
+ security['description'] += '({})'.format(summary_str)
return security
return {}
ret = _fetch_apparmor()
else:
return {
- "type": "Unknown",
- "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor"
+ 'type': 'Unknown',
+ 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor'
}
if ret:
return ret
return {
- "type": "None",
- "description": "Linux Security Module framework is not available"
+ 'type': 'None',
+ 'description': 'Linux Security Module framework is not available'
}
@property
def selinux_enabled(self):
- return (self.kernel_security["type"] == "SELinux") and \
- (self.kernel_security["description"] != "SELinux: Disabled")
+ return (self.kernel_security['type'] == 'SELinux') and \
+ (self.kernel_security['description'] != 'SELinux: Disabled')
@property
def kernel_parameters(self):
out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
if out:
param_list = out.split('\n')
- param_dict = {param.split(" = ")[0]: param.split(" = ")[-1] for param in param_list}
+ param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list}
# return only desired parameters
if 'net.ipv4.ip_nonlocal_bind' in param_dict:
out, err, code = call(
ctx, ['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind']
)
- if out.strip() != "1":
+ if out.strip() != '1':
raise Error('net.ipv4.ip_nonlocal_bind not set to 1')
##################################
def __init__(self):
self.started_epoch_secs = time.time()
self.tasks = {
- "daemons": "inactive",
- "disks": "inactive",
- "host": "inactive",
- "http_server": "inactive",
+ 'daemons': 'inactive',
+ 'disks': 'inactive',
+ 'host': 'inactive',
+ 'http_server': 'inactive',
}
self.errors = []
self.disks = {}
@property
def health(self):
return {
- "started_epoch_secs": self.started_epoch_secs,
- "tasks": self.tasks,
- "errors": self.errors,
+ 'started_epoch_secs': self.started_epoch_secs,
+ 'tasks': self.tasks,
+ 'errors': self.errors,
}
def to_json(self):
return {
- "health": self.health,
- "host": self.host,
- "daemons": self.daemons,
- "disks": self.disks,
+ 'health': self.health,
+ 'host': self.host,
+ 'daemons': self.daemons,
+ 'disks': self.disks,
}
def update_health(self, task_type, task_status, error_msg=None):
ensure we only respond to callers who know our token i.e. mgr
"""
def wrapper(self, *args, **kwargs):
- auth = self.headers.get("Authorization", None)
- if auth != "Bearer " + self.server.token:
+ auth = self.headers.get('Authorization', None)
+ if auth != 'Bearer ' + self.server.token:
self.send_error(401)
return
f(self, *args, **kwargs)
self.wfile.write(data.encode('utf-8'))
else:
# Invalid GET URL
- bad_request_msg = "Valid URLs are: {}".format(', '.join(CephadmDaemonHandler.valid_routes))
+ bad_request_msg = 'Valid URLs are: {}'.format(', '.join(CephadmDaemonHandler.valid_routes))
self.send_response(404, message=bad_request_msg) # reason
self.send_header('Content-type', 'application/json')
self.end_headers()
- self.wfile.write(json.dumps({"message": bad_request_msg}).encode('utf-8'))
+ self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8'))
def log_message(self, format, *args):
- rqst = " ".join(str(a) for a in args)
- logger.info(f"client:{self.address_string()} [{self.log_date_time_string()}] {rqst}")
+ rqst = ' '.join(str(a) for a in args)
+ logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}')
class CephadmDaemon():
- daemon_type = "cephadm-exporter"
+ daemon_type = 'cephadm-exporter'
default_port = 9443
bin_name = 'cephadm'
- key_name = "key"
- crt_name = "crt"
- token_name = "token"
+ key_name = 'key'
+ crt_name = 'crt'
+ token_name = 'token'
config_requirements = [
key_name,
crt_name,
@classmethod
def validate_config(cls, config):
- reqs = ", ".join(CephadmDaemon.config_requirements)
+ reqs = ', '.join(CephadmDaemon.config_requirements)
errors = []
if not config or not all([k_name in config for k_name in CephadmDaemon.config_requirements]):
- raise Error(f"config must contain the following fields : {reqs}")
+ raise Error(f'config must contain the following fields : {reqs}')
if not all([isinstance(config[k_name], str) for k_name in CephadmDaemon.config_requirements]):
- errors.append(f"the following fields must be strings: {reqs}")
+ errors.append(f'the following fields must be strings: {reqs}')
crt = config[CephadmDaemon.crt_name]
key = config[CephadmDaemon.key_name]
token = config[CephadmDaemon.token_name]
if not crt.startswith('-----BEGIN CERTIFICATE-----') or not crt.endswith('-----END CERTIFICATE-----\n'):
- errors.append("crt field is not a valid SSL certificate")
+ errors.append('crt field is not a valid SSL certificate')
if not key.startswith('-----BEGIN PRIVATE KEY-----') or not key.endswith('-----END PRIVATE KEY-----\n'):
- errors.append("key is not a valid SSL private key")
+ errors.append('key is not a valid SSL private key')
if len(token) < 8:
errors.append("'token' must be more than 8 characters long")
if p <= 1024:
raise ValueError
except (TypeError, ValueError):
- errors.append("port must be an integer > 1024")
+ errors.append('port must be an integer > 1024')
if errors:
- raise Error("Parameter errors : {}".format(", ".join(errors)))
+ raise Error('Parameter errors : {}'.format(', '.join(errors)))
@property
def port_active(self):
def can_run(self):
# if port is in use
if self.port_active:
- self.errors.append(f"TCP port {self.port} already in use, unable to bind")
+ self.errors.append(f'TCP port {self.port} already in use, unable to bind')
if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.key_name)):
self.errors.append(f"Key file '{CephadmDaemon.key_name}' is missing from {self.daemon_path}")
if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.crt_name)):
self.errors.append(f"Certificate file '{CephadmDaemon.crt_name}' is missing from {self.daemon_path}")
- if self.token == "Unknown":
+ if self.token == 'Unknown':
self.errors.append(f"Authentication token '{CephadmDaemon.token_name}' is missing from {self.daemon_path}")
return len(self.errors) == 0
@staticmethod
def _unit_name(fsid, daemon_id):
- return "{}.service".format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id))
+ return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id))
@property
def unit_name(self):
)
def _handle_thread_exception(self, exc, thread_type):
- e_msg = f"{exc.__class__.__name__} exception: {str(exc)}"
+ e_msg = f'{exc.__class__.__name__} exception: {str(exc)}'
thread_info = getattr(self.cephadm_cache, thread_type)
errors = thread_info.get('scrape_errors', [])
errors.append(e_msg)
self.cephadm_cache.update_task(
thread_type,
{
- "scrape_errors": errors,
- "data": None,
+ 'scrape_errors': errors,
+ 'data': None,
}
)
if ctr >= refresh_interval:
ctr = 0
- logger.debug("executing host-facts scrape")
+ logger.debug('executing host-facts scrape')
errors = []
s_time = time.time()
try:
data = json.loads(facts.dump())
except json.decoder.JSONDecodeError:
- errors.append("host-facts provided invalid JSON")
+ errors.append('host-facts provided invalid JSON')
logger.warning(errors[-1])
data = {}
self.cephadm_cache.update_task(
'host',
{
- "scrape_timestamp": s_time,
- "scrape_duration_secs": elapsed,
- "scrape_errors": errors,
- "data": data,
+ 'scrape_timestamp': s_time,
+ 'scrape_duration_secs': elapsed,
+ 'scrape_errors': errors,
+ 'data': data,
}
)
- logger.debug(f"completed host-facts scrape - {elapsed}s")
+ logger.debug(f'completed host-facts scrape - {elapsed}s')
time.sleep(CephadmDaemon.loop_delay)
ctr += CephadmDaemon.loop_delay
- logger.info("host-facts thread stopped")
+ logger.info('host-facts thread stopped')
def _scrape_ceph_volume(self, refresh_interval=15):
# we're invoking the ceph_volume command, so we need to set the args that it
# expects to use
- self.ctx.command = "inventory --format=json".split()
+ self.ctx.command = 'inventory --format=json'.split()
self.ctx.fsid = self.fsid
self.ctx.log_output = False
if ctr >= refresh_interval:
ctr = 0
- logger.debug("executing ceph-volume scrape")
+ logger.debug('executing ceph-volume scrape')
errors = []
s_time = time.time()
stream = io.StringIO()
try:
data = json.loads(stdout)
except json.decoder.JSONDecodeError:
- errors.append("ceph-volume thread provided bad json data")
+ errors.append('ceph-volume thread provided bad json data')
logger.warning(errors[-1])
else:
- errors.append("ceph-volume didn't return any data")
+ errors.append('ceph-volume did not return any data')
logger.warning(errors[-1])
self.cephadm_cache.update_task(
'disks',
{
- "scrape_timestamp": s_time,
- "scrape_duration_secs": elapsed,
- "scrape_errors": errors,
- "data": data,
+ 'scrape_timestamp': s_time,
+ 'scrape_duration_secs': elapsed,
+ 'scrape_errors': errors,
+ 'data': data,
}
)
- logger.debug(f"completed ceph-volume scrape - {elapsed}s")
+ logger.debug(f'completed ceph-volume scrape - {elapsed}s')
time.sleep(CephadmDaemon.loop_delay)
ctr += CephadmDaemon.loop_delay
- logger.info("ceph-volume thread stopped")
+ logger.info('ceph-volume thread stopped')
def _scrape_list_daemons(self, refresh_interval=20):
ctr = 0
if ctr >= refresh_interval:
ctr = 0
- logger.debug("executing list-daemons scrape")
+ logger.debug('executing list-daemons scrape')
errors = []
s_time = time.time()
exception_encountered = True
else:
if not isinstance(data, list):
- errors.append("list-daemons didn't supply a list?")
+ errors.append('list-daemons did not supply a list?')
logger.warning(errors[-1])
data = []
elapsed = time.time() - s_time
self.cephadm_cache.update_task(
'daemons',
{
- "scrape_timestamp": s_time,
- "scrape_duration_secs": elapsed,
- "scrape_errors": errors,
- "data": data,
+ 'scrape_timestamp': s_time,
+ 'scrape_duration_secs': elapsed,
+ 'scrape_errors': errors,
+ 'data': data,
}
)
- logger.debug(f"completed list-daemons scrape - {elapsed}s")
+ logger.debug(f'completed list-daemons scrape - {elapsed}s')
time.sleep(CephadmDaemon.loop_delay)
ctr += CephadmDaemon.loop_delay
- logger.info("list-daemons thread stopped")
+ logger.info('list-daemons thread stopped')
def _create_thread(self, target, name, refresh_interval=None):
if refresh_interval:
t = Thread(target=target)
t.daemon = True
t.name = name
- self.cephadm_cache.update_health(name, "active")
+ self.cephadm_cache.update_health(name, 'active')
t.start()
- start_msg = f"Started {name} thread"
+ start_msg = f'Started {name} thread'
if refresh_interval:
- logger.info(f"{start_msg}, with a refresh interval of {refresh_interval}s")
+ logger.info(f'{start_msg}, with a refresh interval of {refresh_interval}s')
else:
- logger.info(f"{start_msg}")
+ logger.info(f'{start_msg}')
return t
def reload(self, *args):
This is a placeholder function only, and serves to provide the hook that could
be exploited later if the exporter evolves to incorporate a config file
"""
- logger.info("Reload request received - ignoring, no action needed")
+ logger.info('Reload request received - ignoring, no action needed')
def shutdown(self, *args):
- logger.info("Shutdown request received")
+ logger.info('Shutdown request received')
self.stop = True
self.http_server.shutdown()
def run(self):
logger.info(f"cephadm exporter starting for FSID '{self.fsid}'")
if not self.can_run:
- logger.error("Unable to start the exporter daemon")
+ logger.error('Unable to start the exporter daemon')
for e in self.errors:
logger.error(e)
return
signal.signal(signal.SIGTERM, self.shutdown)
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGHUP, self.reload)
- logger.debug("Signal handlers attached")
+ logger.debug('Signal handlers attached')
host_facts = self._create_thread(self._scrape_host_facts, 'host', 5)
self.workers.append(host_facts)
self.http_server.cephadm_cache = self.cephadm_cache
self.http_server.token = self.token
server_thread = self._create_thread(self.http_server.serve_forever, 'http_server')
- logger.info(f"https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}")
+ logger.info(f'https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}')
ctr = 0
while server_thread.is_alive():
if self.cephadm_cache.tasks[worker.name] == 'inactive':
continue
if not worker.is_alive():
- logger.warning(f"{worker.name} thread not running")
- stop_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
- self.cephadm_cache.update_health(worker.name, "inactive", f"{worker.name} stopped at {stop_time}")
+ logger.warning(f'{worker.name} thread not running')
+ stop_time = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S')
+ self.cephadm_cache.update_health(worker.name, 'inactive', f'{worker.name} stopped at {stop_time}')
time.sleep(CephadmDaemon.loop_delay)
ctr += CephadmDaemon.loop_delay
- logger.info("Main http server thread stopped")
+ logger.info('Main http server thread stopped')
@property
def unit_run(self):
simple service definition and add it to the fsid's target
"""
if not config:
- raise Error("Attempting to deploy cephadm daemon without a config")
+ raise Error('Attempting to deploy cephadm daemon without a config')
assert isinstance(config, dict)
# Create the required config files in the daemons dir, with restricted permissions
for filename in config:
- with open(os.open(os.path.join(self.daemon_path, filename), os.O_CREAT | os.O_WRONLY, mode=0o600), "w") as f:
+ with open(os.open(os.path.join(self.daemon_path, filename), os.O_CREAT | os.O_WRONLY, mode=0o600), 'w') as f:
f.write(config[filename])
# When __file__ is <stdin> we're being invoked over remoto via the orchestrator, so
shutil.copy(__file__,
self.binary_path)
- with open(os.path.join(self.daemon_path, 'unit.run'), "w") as f:
+ with open(os.path.join(self.daemon_path, 'unit.run'), 'w') as f:
f.write(self.unit_run)
with open(
os.path.join(self.ctx.unit_dir,
- f"{self.unit_name}.new"),
- "w"
+ f'{self.unit_name}.new'),
+ 'w'
) as f:
f.write(self.unit_file)
os.rename(
- os.path.join(self.ctx.unit_dir, f"{self.unit_name}.new"),
+ os.path.join(self.ctx.unit_dir, f'{self.unit_name}.new'),
os.path.join(self.ctx.unit_dir, self.unit_name))
call_throws(self.ctx, ['systemctl', 'daemon-reload'])
def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id):
unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
unit_path = os.path.join(ctx.unit_dir, unit_name)
- unit_run = os.path.join(ctx.data_dir, fsid, f"{daemon_type}.{daemon_id}", "unit.run")
+ unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run')
port = None
try:
- with open(unit_run, "r") as u:
- contents = u.read().strip(" &")
+ with open(unit_run, 'r') as u:
+ contents = u.read().strip(' &')
except OSError:
- logger.warning(f"Unable to access the unit.run file @ {unit_run}")
+ logger.warning(f'Unable to access the unit.run file @ {unit_run}')
return
port = None
try:
port = int(line.split('--port ')[-1])
except ValueError:
- logger.warning("Unexpected format in unit.run file: port is not numeric")
- logger.warning("Unable to remove the systemd file and close the port")
+ logger.warning('Unexpected format in unit.run file: port is not numeric')
+ logger.warning('Unable to remove the systemd file and close the port')
return
break
try:
fw.close_ports([port])
except RuntimeError:
- logger.error(f"Unable to close port {port}")
+ logger.error(f'Unable to close port {port}')
- stdout, stderr, rc = call(ctx, ["rm", "-f", unit_path])
+ stdout, stderr, rc = call(ctx, ['rm', '-f', unit_path])
if rc:
- logger.error(f"Unable to remove the systemd file @ {unit_path}")
+ logger.error(f'Unable to remove the systemd file @ {unit_path}')
else:
- logger.info(f"removed systemd unit file @ {unit_path}")
- stdout, stderr, rc = call(ctx, ["systemctl", "daemon-reload"])
+ logger.info(f'removed systemd unit file @ {unit_path}')
+ stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload'])
def command_exporter(ctx: CephadmContext):
return os.path.exists(
os.path.join(
UNIT_DIR,
- f"{subsystem}.target.wants",
+ f'{subsystem}.target.wants',
target_name
)
)
if not ctx.fsid:
raise Error('must pass --fsid to specify cluster')
- target = f"ceph-{ctx.fsid}.target"
+ target = f'ceph-{ctx.fsid}.target'
if ctx.maintenance_action.lower() == 'enter':
- logger.info("Requested to place host into maintenance")
+ logger.info('Requested to place host into maintenance')
if systemd_target_state(target):
_out, _err, code = call(ctx,
['systemctl', 'disable', target],
verbosity=CallVerbosity.DEBUG)
if code:
- logger.error(f"Failed to disable the {target} target")
- return "failed - to disable the target"
+ logger.error(f'Failed to disable the {target} target')
+ return 'failed - to disable the target'
else:
# stopping a target waits by default
_out, _err, code = call(ctx,
['systemctl', 'stop', target],
verbosity=CallVerbosity.DEBUG)
if code:
- logger.error(f"Failed to stop the {target} target")
- return "failed - to disable the target"
+ logger.error(f'Failed to stop the {target} target')
+ return 'failed - to disable the target'
else:
- return f"success - systemd target {target} disabled"
+ return f'success - systemd target {target} disabled'
else:
- return "skipped - target already disabled"
+ return 'skipped - target already disabled'
else:
- logger.info("Requested to exit maintenance state")
+ logger.info('Requested to exit maintenance state')
# exit maintenance request
if not systemd_target_state(target):
_out, _err, code = call(ctx,
['systemctl', 'enable', target],
verbosity=CallVerbosity.DEBUG)
if code:
- logger.error(f"Failed to enable the {target} target")
- return "failed - unable to enable the target"
+ logger.error(f'Failed to enable the {target} target')
+ return 'failed - unable to enable the target'
else:
# starting a target waits by default
_out, _err, code = call(ctx,
['systemctl', 'start', target],
verbosity=CallVerbosity.DEBUG)
if code:
- logger.error(f"Failed to start the {target} target")
- return "failed - unable to start the target"
+ logger.error(f'Failed to start the {target} target')
+ return 'failed - unable to start the target'
else:
- return f"success - systemd target {target} enabled and started"
+ return f'success - systemd target {target} enabled and started'
##################################
parser_adopt.add_argument(
'--force-start',
action='store_true',
- help="start newly adoped daemon, even if it wasn't running previously")
+ help='start newly adoped daemon, even if it was not running previously')
parser_adopt.add_argument(
'--container-init',
action='store_true',
help='ceph.keyring to pass through to the container')
parser_shell.add_argument(
'--mount', '-m',
- help=("mount a file or directory in the container. "
- "Support multiple mounts. "
- "ie: `--mount /foo /bar:/bar`. "
- "When no destination is passed, default is /mnt"),
+ help=('mount a file or directory in the container. '
+ 'Support multiple mounts. '
+ 'ie: `--mount /foo /bar:/bar`. '
+ 'When no destination is passed, default is /mnt'),
nargs='+')
parser_shell.add_argument(
'--env', '-e',
'--fsid',
help='cluster FSID')
parser_maintenance.add_argument(
- "maintenance_action",
+ 'maintenance_action',
type=str,
choices=['enter', 'exit'],
- help="Maintenance action - enter maintenance, or exit maintenance")
+ help='Maintenance action - enter maintenance, or exit maintenance')
parser_maintenance.set_defaults(func=command_maintenance)
parser_verify_prereqs = subparsers.add_parser(
parser = _get_parser()
args = parser.parse_args(av)
- if 'command' in args and args.command and args.command[0] == "--":
+ if 'command' in args and args.command and args.command[0] == '--':
args.command.pop(0)
# workaround argparse to deprecate the subparser `--container-init` flag
if ctx.verbose:
for handler in logger.handlers:
- if handler.name == "console":
+ if handler.name == 'console':
handler.setLevel(logging.DEBUG)
if not ctx.has_function():
- sys.stderr.write("No command specified; pass -h or --help for usage\n")
+ sys.stderr.write('No command specified; pass -h or --help for usage\n')
return None
return ctx
sys.exit(r)
-if __name__ == "__main__":
+if __name__ == '__main__':
main()