from socketserver import ThreadingMixIn
from http.server import BaseHTTPRequestHandler, HTTPServer
import signal
+import io
+from contextlib import redirect_stdout
+
try:
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
"health": {},
"host": {},
"daemons": {},
+ "disks": {},
}
cephadm_cache_lock = RLock()
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
+ supported_daemons.append(CephadmDaemon.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
config, keyring)
if not reconfig:
- deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
- osd_fsid=osd_fsid)
+ if daemon_type == CephadmDaemon.daemon_type:
+ if ports:
+ port = ports[0]
+ else:
+ port = CephadmDaemon.default_port
+ ports = [port]
+ cephadmd = CephadmDaemon(fsid, daemon_id, port)
+ cephadmd.deploy_daemon_unit()
+ else:
+ deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
+ osd_fsid=osd_fsid)
if not os.path.exists(data_dir + '/unit.created'):
with open(data_dir + '/unit.created', 'w') as f:
os.rename(data_dir + '/unit.poststop.new',
data_dir + '/unit.poststop')
- with open(data_dir + '/unit.image.new', 'w') as f:
- f.write(c.image + '\n')
- os.fchmod(f.fileno(), 0o600)
- os.rename(data_dir + '/unit.image.new',
- data_dir + '/unit.image')
+ if c:
+ with open(data_dir + '/unit.image.new', 'w') as f:
+ f.write(c.image + '\n')
+ os.fchmod(f.fileno(), 0o600)
+ os.rename(data_dir + '/unit.image.new',
+ data_dir + '/unit.image')
# systemd
install_base_units(fsid)
else:
logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
+ def close_ports(self, fw_ports):
+ # type: (List[int]) -> None
+ if not self.available:
+ logger.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports)
+ return
+
+ for port in fw_ports:
+ tcp_port = str(port) + '/tcp'
+ out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
+ if not ret:
+ logger.info('Disabling port %s in current zone...' % tcp_port)
+ out, err, ret = call([self.cmd, '--permanent', '--remove-port', tcp_port])
+ if ret:
+ raise RuntimeError('unable to remove port %s from current zone: %s' %
+ (tcp_port, err))
+ else:
+ logger.info(f"Port {tcp_port} disabled")
+ else:
+ logger.info(f"firewalld port {tcp_port} already closed")
+
def apply_rules(self):
# type: () -> None
if not self.available:
config=config, keyring=keyring,
reconfig=args.reconfig,
ports=daemon_ports)
+
+ elif daemon_type == CephadmDaemon.daemon_type:
+ # get current user gid and uid
+ uid = os.getuid()
+ gid = os.getgid()
+ deploy_daemon(args.fsid, daemon_type, daemon_id, None,
+ uid, gid, ports=daemon_ports)
+
else:
raise Error("{} not implemented in command_deploy function".format(daemon_type))
privileged=True,
volume_mounts=mounts,
)
- out, err, code = call_throws(c.run_cmd(), verbose=True)
+ out, err, code = call_throws(c.run_cmd(), verbose=not args.no_log_output)
if not code:
print(out)
l = FileLock(args.fsid)
l.acquire()
+ (daemon_type, daemon_id) = args.name.split('.', 1)
+ if daemon_type == CephadmDaemon.daemon_type:
+ unit_name = f'ceph-{args.fsid}-cephadm.{daemon_id}.service'
+ else:
+ unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
- unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
- (daemon_type, daemon_id) = args.name.split('.', 1)
if daemon_type in ['mon', 'osd'] and not args.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
os.path.join(backup_dir, dirname))
else:
call_throws(['rm', '-rf', data_dir])
+ if daemon_type == CephadmDaemon.daemon_type:
+ CephadmDaemon.uninstall(args.fsid, daemon_type, daemon_id)
##################################
daemon_threads = True
-def cephadmd_install():
- # open firewall port
- # copy self to /usr/bin somewhere
- # create the systemd unit file
- # enable the systemd unit file
- # start the systemd unit
- pass
-
-def cephadmd_uninstall():
- # shutdown the systemd unit
- # disable it and remove it
- # close the firewall port
- pass
-
-def cephadmd_upgrade():
- # copy self to the /usr/bin
- # restart the systemd unit
- pass
-
-
class CephadmDaemon():
- def __init__(self, fsid, port=None):
+ daemon_type = "cephadm"
+ default_port = 5003
+ bin_name = 'cephadm'
+ loop_delay = 0.1
+ thread_check_interval = 5
+
+ def __init__(self, fsid, daemon_id, port=5003):
self.fsid = fsid
+ self.daemon_id = daemon_id
self.port = port
self.workers = []
self.http_server = None
@property
def can_run(self):
return not self.port_active
-
+
+ @staticmethod
+ def _unit_name(fsid, daemon_id):
+ return f"ceph-{fsid}-cephadm.{daemon_id}.service"
+
+ @property
+ def unit_name(self):
+ # Our name here will cause a problem with the the old list_daemons code,
+ # since that only expects daemons to be containers and cephadmd is not!
+ return CephadmDaemon._unit_name(self.fsid, self.daemon_id)
+
+ @property
+ def binary_path(self):
+ return os.path.join(
+ args.data_dir,
+ self.fsid,
+ f'cephadm.{self.daemon_id}',
+ CephadmDaemon.bin_name)
+
def _scrape_host_facts(self, refresh_interval=10):
- loop_delay = 0.1
ctr = 0
while True:
if ctr >= refresh_interval:
ctr = 0
- logger.debug("executing host_facts scrape")
+ logger.debug("executing host-facts scrape")
+ errors = []
s_time = time.time()
facts = HostFacts()
elapsed = time.time() - s_time
- data = json.loads(facts.dump())
+ try:
+ data = json.loads(facts.dump())
+ except json.decoder.JSONDecodeError:
+ errors.append("host-facts received invalid JSON")
+ logger.warning(errors[-1])
+ data = {}
with cephadm_cache_lock:
cephadm_cache['host'] = {
"scrape_timestamp": s_time,
"scrape_duration_secs": elapsed,
+ "scrape_errors": errors,
"data": data,
}
logger.debug(f"completed host-facts scrape - {elapsed}s")
- time.sleep(loop_delay)
- ctr += loop_delay
+ time.sleep(CephadmDaemon.loop_delay)
+ ctr += CephadmDaemon.loop_delay
logger.info("host-facts thread stopped")
+ def _scrape_ceph_volume(self, refresh_interval=15):
+ args.command = "inventory --format=json".split()
+ args.fsid = self.fsid
+ args.no_log_output = True
+ ctr = 0
+ while True:
+ if self.stop:
+ break
+ if ctr >= refresh_interval:
+ ctr = 0
+ logger.debug("executing ceph-volume scrape")
+ errors = []
+ s_time = time.time()
+ stream = io.StringIO()
+
+ with redirect_stdout(stream):
+ command_ceph_volume()
+ elapsed = time.time() - s_time
+
+ # if the call to ceph-volume returns junk with the
+ # json, it won't parse
+ stdout = stream.getvalue()
+
+ if stdout:
+ try:
+ data = json.loads(stdout)
+ except json.decoder.JSONDecodeError:
+ errors.append("ceph-volume thread received bad json data")
+ logger.warning(errors[-1])
+ data = []
+ else:
+ errors.append("ceph-volume didn't return any data")
+ logger.warning(errors[-1])
+
+ with cephadm_cache_lock:
+ cephadm_cache['disks'] = {
+ "scrape_timestamp": s_time,
+ "scrape_duration_secs": elapsed,
+ "scrape_errors": errors,
+ "data": data,
+ }
+
+ logger.debug(f"completed ceph-volume scrape - {elapsed}s")
+ time.sleep(CephadmDaemon.loop_delay)
+ ctr += CephadmDaemon.loop_delay
+
+ logger.info("ceph-volume thread stopped")
+
def _scrape_list_daemons(self, refresh_interval=20):
- loop_delay = 0.1
ctr = 0
while True:
if self.stop:
if ctr >= refresh_interval:
ctr = 0
- logger.debug("executing list_daemons scrape")
+ logger.debug("executing list-daemons scrape")
+ errors = []
s_time = time.time()
ld = list_daemons()
elapsed = time.time() - s_time
cephadm_cache['daemons'] = {
"scrape_timestamp": s_time,
"scrape_duration_secs": elapsed,
+ "scrape_errors": errors,
"data": ld,
}
- logger.debug(f"completed list_daemons scrape - {elapsed}s")
+ logger.debug(f"completed list-daemons scrape - {elapsed}s")
- time.sleep(loop_delay)
- ctr += loop_delay
+ time.sleep(CephadmDaemon.loop_delay)
+ ctr += CephadmDaemon.loop_delay
logger.info("list-daemons thread stopped")
- def _create_thread(self, target, name):
- t = Thread(target=target)
+ def _create_thread(self, target, name, refresh_interval=None):
+ if refresh_interval:
+ t = Thread(target=target, args=(refresh_interval,))
+ else:
+ t = Thread(target=target)
t.daemon = True
t.name = name
with cephadm_cache_lock:
cephadm_cache['health'][name] = "active"
t.start()
- logger.info(f"Started '{name}' thread")
- return t
- def create_daemon_dirs(self):
- pass
+ start_msg = f"Started {name} thread"
+ if refresh_interval:
+ logger.info(f"{start_msg}, with a refresh interval of {refresh_interval}s")
+ else:
+ logger.info(f"{start_msg}")
+ return t
def reload(self, *args):
- logger.info("caught a reload")
+ logger.info("Reload request received")
def shutdown(self, *args):
- logger.info("Shutting down")
+ logger.info("Shutdown request received")
self.stop = True
self.http_server.shutdown()
signal.signal(signal.SIGHUP, self.reload)
logger.debug("Signal handlers attached")
- host_facts = self._create_thread(self._scrape_host_facts, 'host_facts')
+ host_facts = self._create_thread(self._scrape_host_facts, 'host_facts', 5)
self.workers.append(host_facts)
- daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons')
+ daemons = self._create_thread(self._scrape_list_daemons, 'list_daemons', 20)
self.workers.append(daemons)
+ disks = self._create_thread(self._scrape_ceph_volume, 'ceph_volume', 20)
+ self.workers.append(disks)
+
self.http_server = CephadmHTTPServer(('0.0.0.0', self.port), CephadmDaemonHandler) # IPv4 only
server_thread = self._create_thread(self.http_server.serve_forever, 'http_server')
logger.info(f"http server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}")
- # run an event driven loop to check the scrape thread is active
- # if not, start it again
- # if started >5 times in 10 seconds, abort with an error
+
+ ctr = 0
while server_thread.is_alive():
+ if self.stop:
+ break
- for worker in self.workers:
- if not worker.is_alive():
- logger.warning(f"{worker.name} thread not running")
- with cephadm_cache_lock:
- # update health in the cache
- cephadm_cache['health'][worker.name] = "inactive"
- else:
- pass
- time.sleep(10)
+ if ctr >= CephadmDaemon.thread_check_interval:
+ ctr = 0
+ for worker in self.workers:
+ if not worker.is_alive():
+ logger.warning(f"{worker.name} thread not running")
+ with cephadm_cache_lock:
+ # update health in the cache
+ cephadm_cache['health'][worker.name] = "inactive"
+
+ time.sleep(CephadmDaemon.loop_delay)
+ ctr += CephadmDaemon.loop_delay
+
+ logger.info("Main http server thread stopped")
+
+ @property
+ def unit_file(self):
+ py3 = shutil.which('python3')
+ return """#generated by cephadm
+ [Unit]
+Description=cephadm data service (web interface) for cluster {fsid}
+After=network-online.target
+Wants=network-online.target
+
+PartOf=ceph-{fsid}.target
+Before=ceph-{fsid}.target
+
+[Service]
+Type=simple
+ExecStart={py3} {bin_path} cephadmd --fsid {fsid} --port {port}
+Restart=on-failure
+RestartSec=10s
+
+[Install]
+WantedBy=ceph-{fsid}.target
+""".format(
+ py3=py3,
+ fsid=self.fsid,
+ bin_path=self.binary_path,
+ port=self.port)
+
+ def deploy_daemon_unit(self):
+ """deploy a specific unit file for cephadm
+
+ the normal deploy_daemon_units doesn't apply for this
+ daemon since it's not a container, so we just create a
+ simple service definition and add it to the fsid's target
+ """
+ shutil.copy(__file__,
+ self.binary_path)
+
+ with open(os.path.join(args.unit_dir, f"{self.unit_name}.new"), "w") as f:
+ f.write(self.unit_file)
+ os.rename(
+ os.path.join(args.unit_dir, f"{self.unit_name}.new"),
+ os.path.join(args.unit_dir, self.unit_name))
+
+ call_throws(['systemctl', 'daemon-reload'])
+ call(['systemctl', 'stop', self.unit_name],
+ verbose_on_failure=False)
+ call(['systemctl', 'reset-failed', self.unit_name],
+ verbose_on_failure=False)
+ call_throws(['systemctl', 'enable', '--now', self.unit_name])
+
+ @classmethod
+ def uninstall(cls, fsid, daemon_type, daemon_id):
+ unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
+ unit_path = os.path.join(args.unit_dir, unit_name)
+ try:
+ with open(unit_path, "r") as u:
+ contents = u.read().strip()
+ except OSError:
+ logger.warning(f"Unable to access the systemd file @ {unit_path}")
+ return
+
+ for line in contents.split('\n'):
+ if '--port ' in line:
+ try:
+ port = int(line.split('--port ')[-1])
+ except ValueError:
+ logger.warning("Unexpected format in unit file: port is not numeric")
+ logger.warning("Unable to remove the systemd file and close the port")
+ return
+ break
+
+ if port:
+ fw = Firewalld()
+ try:
+ fw.close_ports([port])
+ except RuntimeError:
+ logger.error(f"Unable to close port {port}")
+
+ stdout, stderr, rc = call(["rm", "-f", unit_path])
+ if rc:
+ logger.error(f"Unable to remove the systemd file @ {unit_path}")
+ else:
+ logger.info(f"removed systemd unit file @ {unit_path}")
+ stdout, stderr, rc = call(["systemctl", "daemon-reload"])
def command_cephadmd():
- if args.start:
- data_dir = '/var/lib/ceph'
- if args.fsid not in os.listdir(data_dir):
- raise Error(f"cluster fsid '{args.fsid}' not found in '{data_dir}'")
-
- cephadmd = CephadmDaemon(args.fsid, args.port)
- cephadmd.run()
+ cephadmd = CephadmDaemon(args.fsid, args.port)
+
+ if args.fsid not in os.listdir(args.data_dir):
+ raise Error(f"cluster fsid '{args.fsid}' not found in '{args.data_dir}'")
+
+ cephadmd.run()
parser_ceph_volume.add_argument(
'--keyring', '-k',
help='ceph.keyring to pass through to the container')
+ parser_ceph_volume.add_argument(
+ '--no-log-output',
+ action='store_true',
+ help='suppress ceph volume output from the log')
parser_ceph_volume.add_argument(
'command', nargs=argparse.REMAINDER,
help='command')
parser_gather_facts.set_defaults(func=command_gather_facts)
parser_cephadmd = subparsers.add_parser(
- 'cephadmd', help='Manage cephadm running as a service')
- parser_cephadmd.add_argument(
- '--install',
- action='store_true',
- help='install the cephadmd exporter service')
- parser_cephadmd.add_argument(
- '--start',
- default=False,
- action='store_true',
- help='start the cephadmd service')
+ 'cephadmd', help='Start cephadm as a web service, providing host/daemon metadata')
parser_cephadmd.add_argument(
'--fsid',
required=True,
type=str,
- default=5003,
help='fsid of the cephadmd to run against')
parser_cephadmd.add_argument(
'--port',
type=int,
default=5003,
help='port number for the cephadmd service')
- parser_cephadmd.add_argument(
- '--uninstall',
- action='store_true',
- help='uninstall the cephadmd service')
parser_cephadmd.set_defaults(func=command_cephadmd)
return parser