%endif
Requires: ceph-mgr = %{_epoch_prefix}%{version}-%{release}
Requires: python%{python3_pkgversion}-asyncssh
+Requires: python%{python3_pkgversion}-cherrypy
Requires: cephadm = %{_epoch_prefix}%{version}-%{release}
%if 0%{?suse_version}
Requires: openssh
${misc:Depends},
${python:Depends},
openssh-client,
- python3-jinja2
+ python3-jinja2,
+ python3-cherrypy
Description: cephadm orchestrator module for ceph-mgr
Ceph is a massively scalable, open-source, distributed
storage system that runs on commodity hardware and delivers object,
from io import StringIO
from threading import Thread, RLock
from urllib.error import HTTPError
-from urllib.request import urlopen
+from urllib.request import urlopen, Request
from pathlib import Path
FuncT = TypeVar('FuncT', bound=Callable)
supported_daemons.append(CephadmDaemon.daemon_type)
supported_daemons.append(HAproxy.daemon_type)
supported_daemons.append(Keepalived.daemon_type)
+ supported_daemons.append(CephadmAgent.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
def is_container_running(ctx: CephadmContext, c: 'CephContainer') -> bool:
+ if ctx.name.split('.', 1)[0] in ['agent', 'cephadm-exporter']:
+ # these are non-containerized daemon types
+ return False
return bool(get_running_container_name(ctx, c))
cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
cephadm_exporter.deploy_daemon_unit(config_js)
+ elif daemon_type == CephadmAgent.daemon_type:
+ if ctx.config_json == '-':
+ config_js = get_parm('-')
+ else:
+ config_js = get_parm(ctx.config_json)
+ assert isinstance(config_js, dict)
+
+ cephadm_agent = CephadmAgent(ctx, fsid, daemon_id)
+ cephadm_agent.deploy_daemon_unit(config_js)
else:
if c:
deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
desc=self.entrypoint, timeout=timeout)
return out
+
+#####################################
+
+class MgrListener(Thread):
+ def __init__(self, agent: 'CephadmAgent') -> None:
+ self.agent = agent
+ self.stop = False
+ super(MgrListener, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ listenSocket.bind(('0.0.0.0', int(self.agent.listener_port)))
+ listenSocket.settimeout(60)
+ listenSocket.listen(1)
+ while not self.stop:
+ try:
+ conn, _ = listenSocket.accept()
+ except socket.timeout:
+ continue
+ while True:
+ data = conn.recv(self.agent.socket_buffer_size).decode()
+ if not data:
+ break
+ try:
+ self.agent.ack = int(data)
+ except Exception as e:
+ err_str = f'Failed to extract timestamp integer from message: {e}'
+ conn.send(err_str.encode())
+ self.agent.ack = -1
+ else:
+ conn.send(b'ACK')
+ self.agent.wakeup()
+ logger.debug(f'Got mgr message {data}')
+
+ def shutdown(self) -> None:
+ self.stop = True
+
+
+class CephadmAgent():
+
+ daemon_type = 'agent'
+ default_port = 8498
+ loop_interval = 30
+ socket_buffer_size = 256
+ stop = False
+
+ required_files = ['cephadm', 'agent.json', 'keyring']
+
+ def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
+ self.ctx = ctx
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.target_ip = ''
+ self.target_port = ''
+ self.host = ''
+ self.daemon_dir = os.path.join(ctx.data_dir, self.fsid, f'{self.daemon_type}.{self.daemon_id}')
+ self.binary_path = os.path.join(self.daemon_dir, 'cephadm')
+ self.config_path = os.path.join(self.daemon_dir, 'agent.json')
+ self.keyring_path = os.path.join(self.daemon_dir, 'keyring')
+ self.ca_path = os.path.join(self.daemon_dir, 'root_cert.pem')
+ self.listener_port = ''
+ self.ack = -1
+ self.event = threading.Event()
+ self.mgr_listener = MgrListener(self)
+
+ def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
+ if not config:
+ raise Error('Agent needs a config')
+ assert isinstance(config, dict)
+
+ # Create the required config files in the daemons dir, with restricted permissions
+ for filename in config:
+ with open(os.path.join(self.daemon_dir, filename), 'w') as f:
+ f.write(config[filename])
+
+ with open(os.path.join(self.daemon_dir, 'unit.run'), 'w') as f:
+ f.write(self.unit_run())
+
+ unit_file_path = os.path.join(self.ctx.unit_dir, self.unit_name())
+ with open(unit_file_path + '.new', 'w') as f:
+ f.write(self.unit_file())
+ os.rename(unit_file_path + '.new', unit_file_path)
+
+ call_throws(self.ctx, ['systemctl', 'daemon-reload'])
+ call(self.ctx, ['systemctl', 'stop', self.unit_name()],
+ verbosity=CallVerbosity.DEBUG)
+ call(self.ctx, ['systemctl', 'reset-failed', self.unit_name()],
+ verbosity=CallVerbosity.DEBUG)
+ call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name()])
+
+ def unit_name(self) -> str:
+ return '{}.service'.format(get_unit_name(self.fsid, self.daemon_type, self.daemon_id))
+
+ def unit_run(self) -> str:
+ py3 = shutil.which('python3')
+ return ('set -e\n' + f'{py3} {self.binary_path} agent --fsid {self.fsid} --daemon-id {self.daemon_id} &\n')
+
+ def unit_file(self) -> str:
+ return """#generated by cephadm
+[Unit]
+Description=cephadm agent for cluster {fsid}
+
+PartOf=ceph-{fsid}.target
+Before=ceph-{fsid}.target
+
+[Service]
+Type=forking
+ExecStart=/bin/bash {data_dir}/unit.run
+Restart=on-failure
+RestartSec=10s
+
+[Install]
+WantedBy=ceph-{fsid}.target
+""".format(
+ fsid=self.fsid,
+ data_dir=self.daemon_dir
+ )
+
+ def shutdown(self) -> None:
+ self.stop = True
+ if self.mgr_listener.is_alive():
+ self.mgr_listener.shutdown()
+
+ def wakeup(self) -> None:
+ self.event.set()
+
+ def run(self) -> None:
+ try:
+ with open(self.config_path, 'r') as f:
+ config = json.load(f)
+ self.target_ip = config['target_ip']
+ self.target_port = config['target_port']
+ self.loop_interval = int(config['refresh_period'])
+ starting_port = int(config['listener_port'])
+ self.host = config['host']
+ except Exception as e:
+ self.shutdown()
+ raise Error(f'Failed to get agent target ip and port from config: {e}')
+
+ try:
+ with open(self.keyring_path, 'r') as f:
+ self.keyring = f.read()
+ except Exception as e:
+ self.shutdown()
+ raise Error(f'Failed to get agent keyring: {e}')
+
+ assert self.target_ip and self.target_port
+
+ try:
+ for _ in range(1001):
+ if not port_in_use(self.ctx, starting_port):
+ self.listener_port = str(starting_port)
+ break
+ starting_port += 1
+ if not self.listener_port:
+ raise Error(f'All 1000 ports starting at {str(starting_port - 1001)} taken.')
+ except Exception as e:
+ raise Error(f'Failed to pick port for agent to listen on: {e}')
+
+ if not self.mgr_listener.is_alive():
+ self.mgr_listener.start()
+
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ ssl_ctx.check_hostname = True
+ ssl_ctx.load_verify_locations(self.ca_path)
+
+ while not self.stop:
+ ack = self.ack
+
+ data = json.dumps({'host': self.host,
+ 'ls': list_daemons(self.ctx),
+ 'networks': list_networks(self.ctx),
+ 'facts': HostFacts(self.ctx).dump(),
+ 'ack': str(ack),
+ 'keyring': self.keyring,
+ 'port': self.listener_port})
+ data = data.encode('ascii')
+
+ url = f'https://{self.target_ip}:{self.target_port}/data'
+ try:
+ req = Request(url, data, {'Content-Type': 'application/json'})
+ urlopen(req, context=ssl_ctx)
+ except Exception as e:
+ logger.error(f'Failed to send metadata to mgr: {e}')
+
+ self.event.wait(self.loop_interval)
+ self.event.clear()
+
+
+def command_agent(ctx: CephadmContext) -> None:
+ agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)
+
+ if not os.path.isdir(agent.daemon_dir):
+ raise Error(f'Agent daemon directory {agent.daemon_dir} does not exist. Perhaps agent was never deployed?')
+
+ agent.run()
+
+
##################################
'image_id': normalize_container_id(image_id)
} # type: Dict[str, Union[str,List[str]]]
if digests:
- r['repo_digests'] = list(map(normalize_image_digest, digests[1:-1].split(' ')))
+ r['repo_digests'] = list(map(normalize_image_digest, digests[1: -1].split(' ')))
return r
##################################
def unwrap_ipv6(address):
# type: (str) -> str
if address.startswith('[') and address.endswith(']'):
- return address[1:-1]
+ return address[1: -1]
return address
raise Error('--mon-addrv value %s must use square backets' %
addr_arg)
ipv6 = addr_arg.count('[') > 1
- for addr in addr_arg[1:-1].split(','):
+ for addr in addr_arg[1: -1].split(','):
hasport = r.findall(addr)
if not hasport:
raise Error('--mon-addrv value %s must include port number' %
(uid, gid) = extract_uid_gid(ctx)
# create some initial keys
- (mon_key, mgr_key, admin_key, bootstrap_keyring, admin_keyring) = \
- create_initial_keys(ctx, uid, gid, mgr_id)
+ (mon_key, mgr_key, admin_key, bootstrap_keyring, admin_keyring) = create_initial_keys(ctx, uid, gid, mgr_id)
monmap = create_initial_monmap(ctx, uid, gid, fsid, mon_id, addr_arg)
- (mon_dir, log_dir) = \
- prepare_create_mon(ctx, uid, gid, fsid, mon_id,
- bootstrap_keyring.name, monmap.name)
+ (mon_dir, log_dir) = prepare_create_mon(ctx, uid, gid, fsid, mon_id,
+ bootstrap_keyring.name, monmap.name)
with open(mon_dir + '/config', 'w') as f:
os.fchown(f.fileno(), uid, gid)
deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
uid, gid, ports=daemon_ports)
+ elif daemon_type == CephadmAgent.daemon_type:
+ # get current user gid and uid
+ uid = os.getuid()
+ gid = os.getgid()
+ deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
+ uid, gid, ports=daemon_ports)
+
else:
raise Error('daemon type {} not implemented in command_deploy function'
.format(daemon_type))
'systemd_unit': legacy_unit_name,
}
if detail:
- (val['enabled'], val['state'], _) = \
- check_unit(ctx, legacy_unit_name)
+ (val['enabled'], val['state'], _) = check_unit(ctx, legacy_unit_name)
if not host_version:
try:
out, err, code = call(ctx,
}
if detail:
# get container id
- (val['enabled'], val['state'], _) = \
- check_unit(ctx, unit_name)
+ (val['enabled'], val['state'], _) = check_unit(ctx, unit_name)
container_id = None
image_name = None
image_id = None
help='Maintenance action - enter maintenance, or exit maintenance')
parser_maintenance.set_defaults(func=command_maintenance)
+ parser_agent = subparsers.add_parser(
+ 'agent', help='start cephadm agent')
+ parser_agent.set_defaults(func=command_agent)
+ parser_agent.add_argument(
+ '--fsid',
+ required=True,
+ help='cluster FSID')
+ parser_agent.add_argument(
+ '--daemon-id',
+ help='daemon id for agent')
+
return parser
--- /dev/null
+import cherrypy
+import json
+import socket
+import tempfile
+import threading
+import time
+
+from mgr_util import verify_tls_files
+from ceph.utils import datetime_now
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+
+from OpenSSL import crypto
+from typing import Any, Dict, Set, Tuple, TYPE_CHECKING
+from uuid import uuid4
+
+if TYPE_CHECKING:
+ from cephadm.module import CephadmOrchestrator
+
+
+class CherryPyThread(threading.Thread):
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.cherrypy_shutdown_event = threading.Event()
+ self.ssl_certs = SSLCerts(self.mgr)
+ super(CherryPyThread, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ try:
+ server_addr = self.mgr.get_mgr_ip()
+ server_port = self.mgr.endpoint_port
+
+ self.ssl_certs.generate_root_cert()
+ cert, key = self.ssl_certs.generate_cert()
+
+ self.key_tmp = tempfile.NamedTemporaryFile()
+ self.key_tmp.write(key.encode('utf-8'))
+ self.key_tmp.flush() # pkey_tmp must not be gc'ed
+ key_fname = self.key_tmp.name
+
+ self.cert_tmp = tempfile.NamedTemporaryFile()
+ self.cert_tmp.write(cert.encode('utf-8'))
+ self.cert_tmp.flush() # cert_tmp must not be gc'ed
+ cert_fname = self.cert_tmp.name
+
+ verify_tls_files(cert_fname, key_fname)
+
+ self.mgr.set_uri('https://{0}:{1}/'.format(server_addr, server_port))
+
+ cherrypy.config.update({
+ 'server.socket_host': server_addr,
+ 'server.socket_port': server_port,
+ 'engine.autoreload.on': False,
+ 'server.ssl_module': 'builtin',
+ 'server.ssl_certificate': cert_fname,
+ 'server.ssl_private_key': key_fname,
+ })
+ root_conf = {'/': {'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
+ 'tools.response_headers.on': True}}
+ cherrypy.tree.mount(Root(self.mgr), '/', root_conf)
+ self.mgr.log.info('Starting cherrypy engine...')
+ cherrypy.engine.start()
+ self.mgr.log.info('Cherrypy engine started.')
+ # wait for the shutdown event
+ self.cherrypy_shutdown_event.wait()
+ self.cherrypy_shutdown_event.clear()
+ cherrypy.engine.stop()
+ self.mgr.log.info('Cherrypy engine stopped.')
+ except Exception as e:
+ self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}')
+
+ def shutdown(self) -> None:
+ self.mgr.log.info('Stopping cherrypy engine...')
+ self.cherrypy_shutdown_event.set()
+
+
+class Root:
+ exposed = True
+
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr = mgr
+ self.data = HostData(self.mgr)
+
+ def GET(self) -> str:
+ return '''<!DOCTYPE html>
+<html>
+<head><title>Cephadm HTTP Endpoint</title></head>
+<body>
+<p>Cephadm HTTP Endpoint is up and running</p>
+</body>
+</html>'''
+
+
+class HostData:
+ exposed = True
+
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr = mgr
+
+ @cherrypy.tools.json_in()
+ def POST(self) -> None:
+ data: Dict[str, Any] = cherrypy.request.json
+ try:
+ self.check_request_fields(data)
+ except Exception as e:
+ self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
+ else:
+ self.handle_metadata(data)
+
+ def check_request_fields(self, data: Dict[str, Any]) -> None:
+ fields = '{' + ', '.join([key for key in data.keys()]) + '}'
+ if 'host' not in data:
+ raise Exception(
+ f'No host in metadata from agent ("host" field). Only received fields {fields}')
+ host = data['host']
+ if host not in self.mgr.cache.get_hosts():
+ raise Exception(f'Received metadata from agent on unknown hostname {host}')
+ if 'keyring' not in data:
+ raise Exception(
+ f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
+ if host not in self.mgr.cache.agent_keys:
+ raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
+ if data['keyring'] != self.mgr.cache.agent_keys[host]:
+ raise Exception(f'Got wrong keyring from agent on host {host}.')
+ if 'port' not in data:
+ raise Exception(
+ f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
+ if 'ack' not in data:
+ raise Exception(
+ f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
+ try:
+ int(data['ack'])
+ except Exception as e:
+ raise Exception(
+ f'Counter value from agent on host {host} could not be converted to an integer: {e}')
+ metadata_types = ['ls', 'networks', 'facts']
+ metadata_types_str = '{' + ', '.join(metadata_types) + '}'
+ if not all(item in data.keys() for item in metadata_types):
+ self.mgr.log.warning(
+ f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
+
+ def handle_metadata(self, data: Dict[str, Any]) -> None:
+ try:
+ host = data['host']
+ if host not in self.mgr.cache.agent_counter:
+ self.mgr.log.debug(
+ f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata')
+ self.mgr.cache.agent_counter[host] = 1
+ self.mgr.agent_helpers._request_agent_acks({host})
+ return
+
+ self.mgr.cache.agent_ports[host] = int(data['port'])
+ self.mgr.cache.agent_timestamp[host] = datetime_now()
+ up_to_date = False
+
+ int_ack = int(data['ack'])
+ if int_ack == self.mgr.cache.agent_counter[host]:
+ up_to_date = True
+ else:
+ # we got old counter value with message, inform agent of new timestamp
+ self.mgr.agent_helpers._request_agent_acks({host})
+ self.mgr.log.debug(
+ f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
+
+ if up_to_date:
+ if 'ls' in data:
+ self.mgr._process_ls_output(host, data['ls'])
+ if 'networks' in data:
+ self.mgr.cache.update_host_networks(host, data['networks'])
+ if 'facts' in data:
+ self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
+
+ # update timestamp of most recent up-to-date agent update
+ self.mgr.cache.metadata_up_to_date[host] = True
+ self.mgr.log.debug(
+ f'Received up-to-date metadata from agent on host {host}.')
+
+ except Exception as e:
+ self.mgr.log.warning(
+ f'Failed to update metadata with metadata from agent on host {host}: {e}')
+
+
+class AgentMessageThread(threading.Thread):
+ def __init__(self, host: str, port: int, counter: int, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.host = host
+ self.port = port
+ self.counter = counter
+ super(AgentMessageThread, self).__init__(target=self.run)
+
+ def run(self) -> None:
+ for retry_wait in [3, 5]:
+ try:
+ agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port))
+ agent_socket.send(str(self.counter).encode())
+ agent_response = agent_socket.recv(1024).decode()
+ self.mgr.log.debug(f'Received {agent_response} from agent on host {self.host}')
+ return
+ except ConnectionError as e:
+ # if it's a connection error, possibly try to connect again.
+ # We could have just deployed agent and it might not be ready
+ self.mgr.log.debug(
+ f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
+ time.sleep(retry_wait)
+ except Exception as e:
+ # if it's not a connection error, something has gone wrong. Give up.
+ self.mgr.log.debug(f'Failed to contact agent on host {self.host}: {e}')
+ return
+ return
+
+
+class CephadmAgentHelpers:
+ def __init__(self, mgr: "CephadmOrchestrator"):
+ self.mgr: "CephadmOrchestrator" = mgr
+
+ def _request_agent_acks(self, hosts: Set[str]) -> None:
+ for host in hosts:
+ self.mgr.cache.metadata_up_to_date[host] = False
+ if host not in self.mgr.cache.agent_counter:
+ self.mgr.cache.agent_counter[host] = 1
+ else:
+ self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
+ message_thread = AgentMessageThread(
+ host, self.mgr.cache.agent_ports[host], self.mgr.cache.agent_counter[host], self.mgr)
+ message_thread.start()
+
+ def _agent_down(self, host: str) -> bool:
+ # if we don't have a timestamp, it's likely because of a mgr fail over.
+ # just set the timestamp to now
+ if host not in self.mgr.cache.agent_timestamp:
+ self.mgr.cache.agent_timestamp[host] = datetime_now()
+ # agent hasn't reported in 2.5 * it's refresh rate. Something is likely wrong with it.
+ time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
+ if time_diff.total_seconds() > 2.5 * float(self.mgr.agent_refresh_rate):
+ return True
+ return False
+
+ # this function probably seems very unnecessary, but it makes it considerably easier
+ # to get the unit tests working. All unit tests that check which daemons were deployed
+ # or services setup would have to be individually changed to expect an agent service or
+ # daemons, OR we can put this in its own function then mock the function
+ def _apply_agent(self) -> None:
+ spec = ServiceSpec(
+ service_type='agent',
+ placement=PlacementSpec(host_pattern='*')
+ )
+ self.mgr.spec_store.save(spec)
+
+
+class SSLCerts:
+ def __init__(self, mgr: "CephadmOrchestrator") -> None:
+ self.mgr = mgr
+ self.root_cert: Any
+ self.root_key: Any
+ self.root_subj: Any
+
+ def generate_root_cert(self) -> Tuple[str, str]:
+ self.root_key = crypto.PKey()
+ self.root_key.generate_key(crypto.TYPE_RSA, 2048)
+
+ self.root_cert = crypto.X509()
+ self.root_cert.set_serial_number(int(uuid4()))
+
+ self.root_subj = self.root_cert.get_subject()
+ self.root_subj.commonName = "cephadm-root"
+
+ self.root_cert.set_issuer(self.root_subj)
+ self.root_cert.set_pubkey(self.root_key)
+ self.root_cert.gmtime_adj_notBefore(0)
+ self.root_cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
+ self.root_cert.sign(self.root_key, 'sha256')
+
+ cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+ key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
+ return (cert_str, key_str)
+
+ def generate_cert(self) -> Tuple[str, str]:
+ key = crypto.PKey()
+ key.generate_key(crypto.TYPE_RSA, 2048)
+
+ cert = crypto.X509()
+ cert.set_serial_number(int(uuid4()))
+
+ subj = cert.get_subject()
+ subj.commonName = str(self.mgr.get_mgr_ip())
+
+ cert.set_issuer(self.root_subj)
+ cert.set_pubkey(key)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
+ cert.sign(self.root_key, 'sha256')
+
+ cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8')
+ key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, key).decode('utf-8')
+ return (cert_str, key_str)
+
+ def get_root_cert(self) -> str:
+ return crypto.dump_certificate(crypto.FILETYPE_PEM, self.root_cert).decode('utf-8')
+
+ def get_root_key(self) -> str:
+ return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.root_key).decode('utf-8')
self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
+ self.last_network_update = {} # type: Dict[str, datetime.datetime]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
self.last_device_change = {} # type: Dict[str, datetime.datetime]
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
+ self.network_refresh_queue = [] # type: List[str]
self.osdspec_previews_refresh_queue = [] # type: List[str]
# host -> daemon name -> dict
self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
+ self.agent_counter = {} # type: Dict[str, int]
+ self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
+ self.metadata_up_to_date = {} # type: Dict[str, bool]
+ self.agent_keys = {} # type: Dict[str, str]
+ self.agent_ports = {} # type: Dict[str, int]
+
def load(self):
# type: () -> None
for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
# for services, we ignore the persisted last_*_update
# and always trigger a new scrape on mgr restart.
self.daemon_refresh_queue.append(host)
+ self.network_refresh_queue.append(host)
self.daemons[host] = {}
self.osdspec_previews[host] = []
self.osdspec_last_applied[host] = {}
self.registry_login_queue.add(host)
self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
+ self.agent_counter[host] = int(j.get('agent_counter', 1))
+ self.metadata_up_to_date[host] = False
+ self.agent_keys[host] = str(j.get('agent_keys', ''))
+
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
'%d devices, %d networks' % (
return True
return False
- def update_host_devices_networks(
+ def update_host_devices(
self,
host: str,
dls: List[inventory.Device],
- nets: Dict[str, Dict[str, List[str]]]
) -> None:
if (
host not in self.devices
self.last_device_change[host] = datetime_now()
self.last_device_update[host] = datetime_now()
self.devices[host] = dls
+
+ def update_host_networks(
+ self,
+ host: str,
+ nets: Dict[str, Dict[str, List[str]]]
+ ) -> None:
self.networks[host] = nets
+ self.last_network_update[host] = datetime_now()
def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
self.daemon_config_deps[host][name] = {
self.daemon_config_deps[host] = {}
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
+ self.network_refresh_queue.append(host)
self.osdspec_previews_refresh_queue.append(host)
self.registry_login_queue.add(host)
self.last_client_files[host] = {}
del self.last_device_update[host]
self.mgr.event.set()
+ def invalidate_host_networks(self, host):
+ # type: (str) -> None
+ self.network_refresh_queue.append(host)
+ if host in self.last_network_update:
+ del self.last_network_update[host]
+ self.mgr.event.set()
+
def distribute_new_registry_login_info(self) -> None:
self.registry_login_queue = set(self.mgr.inventory.keys())
j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
if host in self.last_device_update:
j['last_device_update'] = datetime_to_str(self.last_device_update[host])
+ if host in self.last_network_update:
+ j['last_network_update'] = datetime_to_str(self.last_network_update[host])
if host in self.last_device_change:
j['last_device_change'] = datetime_to_str(self.last_device_change[host])
if host in self.daemons:
if host in self.scheduled_daemon_actions:
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
+ if host in self.agent_counter:
+ j['agent_counter'] = self.agent_counter[host]
+ if host in self.agent_keys:
+ j['agent_keys'] = self.agent_keys[host]
+
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
del self.last_daemon_update[host]
if host in self.last_device_update:
del self.last_device_update[host]
+ if host in self.last_network_update:
+ del self.last_network_update[host]
if host in self.last_device_change:
del self.last_device_change[host]
if host in self.daemon_config_deps:
return True
return False
+ def host_needs_network_refresh(self, host):
+ # type: (str) -> bool
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
+ return False
+ if host in self.network_refresh_queue:
+ self.network_refresh_queue.remove(host)
+ return True
+ cutoff = datetime_now() - datetime.timedelta(
+ seconds=self.mgr.device_cache_timeout)
+ if host not in self.last_network_update or self.last_network_update[host] < cutoff:
+ return True
+ return False
+
def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
return True
return False
+ def host_metadata_up_to_date(self, host: str) -> bool:
+ if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
+ return False
+ return True
+
+ def all_host_metadata_up_to_date(self) -> bool:
+ if [h for h in self.get_hosts() if not self.host_metadata_up_to_date(h)]:
+ return False
+ return True
+
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
+from cephadm.agent import CherryPyThread, CephadmAgentHelpers
+
from mgr_module import MgrModule, HandleCommandResult, Option
from mgr_util import create_self_signed_cert
from . import ssh
from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmService, CephfsMirrorService
+ RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent
from .services.ingress import IngressService
from .services.container import CustomContainerService
from .services.iscsi import IscsiService
ConnectTimeout=30
"""
+# cherrypy likes to sys.exit on error. don't let it take us down too!
+
+
+def os_exit_noop(status: int) -> None:
+ pass
+
+
+os._exit = os_exit_noop # type: ignore
+
+
# Default container images -----------------------------------------------------
DEFAULT_IMAGE = 'quay.io/ceph/ceph'
DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.18.1'
default=10 * 60,
desc='how frequently to autotune daemon memory'
),
+ Option(
+ 'use_agent',
+ type='bool',
+ default=True,
+ desc='Use cephadm agent on each host to gather and send metadata'
+ ),
+ Option(
+ 'agent_refresh_rate',
+ type='secs',
+ default=20,
+ desc='How often agent on each host will try to gather and send metadata'
+ ),
+ Option(
+ 'endpoint_port',
+ type='int',
+ default=8499,
+ desc='Which port cephadm http endpoint will listen on'
+ ),
+ Option(
+ 'agent_starting_port',
+ type='int',
+ default=4721,
+ desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
+ ),
]
def __init__(self, *args: Any, **kwargs: Any):
self._temp_files: List = []
self.ssh_key: Optional[str] = None
self.ssh_pub: Optional[str] = None
+ self.use_agent = False
+ self.agent_refresh_rate = 0
+ self.endpoint_port = 0
+ self.agent_starting_port = 0
self.notify('mon_map', None)
self.config_notify()
OSDService, NFSService, MonService, MgrService, MdsService,
RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
PrometheusService, NodeExporterService, CrashService, IscsiService,
- IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService
+ IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService,
+ CephadmAgent
]
# https://github.com/python/mypy/issues/8993
self.config_checker = CephadmConfigChecks(self)
+ self.cherrypy_thread = None
+ self.agent_helpers = CephadmAgentHelpers(self)
+
+ if self.use_agent:
+ try:
+ if not self.cherrypy_thread:
+ self.cherrypy_thread = CherryPyThread(self)
+ self.cherrypy_thread.start()
+ if 'agent' not in self.spec_store:
+ self.agent_helpers._apply_agent()
+ except Exception as e:
+ self.log.error(f'Failed to initialize agent spec and cherrypy server: {e}')
+
def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
self._worker_pool.join()
+ if self.cherrypy_thread:
+ self.cherrypy_thread.shutdown()
+ self.cherrypy_thread = None
self.run = False
self.event.set()
suffix = daemon_type not in [
'mon', 'crash',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
- 'container', 'cephadm-exporter',
+ 'container', 'cephadm-exporter', 'agent'
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
ssh_config_fname))
+ def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
+ dm = {}
+ for d in ls:
+ if not d['style'].startswith('cephadm'):
+ continue
+ if d['fsid'] != self._cluster_fsid:
+ continue
+ if '.' not in d['name']:
+ continue
+ sd = orchestrator.DaemonDescription()
+ sd.last_refresh = datetime_now()
+ for k in ['created', 'started', 'last_configured', 'last_deployed']:
+ v = d.get(k, None)
+ if v:
+ setattr(sd, k, str_to_datetime(d[k]))
+ sd.daemon_type = d['name'].split('.')[0]
+ if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
+ logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
+ continue
+
+ sd.daemon_id = '.'.join(d['name'].split('.')[1:])
+ sd.hostname = host
+ sd.container_id = d.get('container_id')
+ if sd.container_id:
+ # shorten the hash
+ sd.container_id = sd.container_id[0:12]
+ sd.container_image_name = d.get('container_image_name')
+ sd.container_image_id = d.get('container_image_id')
+ sd.container_image_digests = d.get('container_image_digests')
+ sd.memory_usage = d.get('memory_usage')
+ sd.memory_request = d.get('memory_request')
+ sd.memory_limit = d.get('memory_limit')
+ sd._service_name = d.get('service_name')
+ sd.deployed_by = d.get('deployed_by')
+ sd.version = d.get('version')
+ sd.ports = d.get('ports')
+ sd.ip = d.get('ip')
+ sd.rank = int(d['rank']) if d.get('rank') is not None else None
+ sd.rank_generation = int(d['rank_generation']) if d.get(
+ 'rank_generation') is not None else None
+ if sd.daemon_type == 'osd':
+ sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
+ if 'state' in d:
+ sd.status_desc = d['state']
+ sd.status = {
+ 'running': DaemonDescriptionStatus.running,
+ 'stopped': DaemonDescriptionStatus.stopped,
+ 'error': DaemonDescriptionStatus.error,
+ 'unknown': DaemonDescriptionStatus.error,
+ }[d['state']]
+ else:
+ sd.status_desc = 'unknown'
+ sd.status = None
+ dm[sd.name()] = sd
+ self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
+ self.cache.update_host_daemons(host, dm)
+ self.cache.save_host(host)
+ return None
+
def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)
for h in host_filter.hosts:
self.log.debug(f'will refresh {h} devs')
self.cache.invalidate_host_devices(h)
+ self.cache.invalidate_host_networks(h)
else:
for h in self.cache.get_hosts():
self.log.debug(f'will refresh {h} devs')
self.cache.invalidate_host_devices(h)
+ self.cache.invalidate_host_networks(h)
self.event.set()
self.log.debug('Kicked serve() loop to refresh devices')
['--', 'lvm', 'zap', '--destroy', path],
error_ok=True)
self.cache.invalidate_host_devices(host)
+ self.cache.invalidate_host_networks(host)
if code:
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
return []
daemons = self.cache.get_daemons_by_service(spec.service_name())
deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
+ elif daemon_type == 'agent':
+ root_cert = ''
+ if self.cherrypy_thread and self.cherrypy_thread.ssl_certs.root_cert:
+ root_cert = self.cherrypy_thread.ssl_certs.get_root_cert()
+ deps = [self.get_mgr_ip(), str(self.endpoint_port), root_cert]
else:
need = {
'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
import logging
import uuid
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, Set
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
-from ceph.utils import str_to_datetime, datetime_now
+from ceph.utils import datetime_now
import orchestrator
from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
DaemonDescriptionStatus, daemon_type_to_service
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.schedule import HostAssignment
+from cephadm.agent import CherryPyThread
from cephadm.autotune import MemoryAutotuner
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
self._purge_deleted_services()
+ if self.mgr.use_agent:
+ if not self.mgr.cherrypy_thread:
+ self.mgr.cherrypy_thread = CherryPyThread(self.mgr)
+ self.mgr.cherrypy_thread.start()
+ if 'agent' not in self.mgr.spec_store:
+ self.mgr.agent_helpers._apply_agent()
+ else:
+ if self.mgr.cherrypy_thread:
+ self.mgr.cherrypy_thread.shutdown()
+ self.mgr.cherrypy_thread = None
+ if 'agent' in self.mgr.spec_store:
+ self.mgr.spec_store.rm('agent')
+
if self.mgr.upgrade.continue_upgrade():
continue
self.log.warning(
f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+ agents_down: List[str] = []
+
@forall_hosts
def refresh(host: str) -> None:
if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
return
+ if self.mgr.use_agent and self.mgr.agent_helpers._agent_down(host):
+ if host in self.mgr.offline_hosts:
+ return
+ self.mgr.offline_hosts.add(host)
+ self.mgr._reset_con(host)
+ agents_down.append(host)
+ # try to schedule redeploy of agent in case it is individually down
+ try:
+ agent = [a for a in self.mgr.cache.get_daemons_by_host(
+ host) if a.hostname == host][0]
+ self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+ except Exception as e:
+ self.log.debug(
+ f'Failed to find entry for agent deployed on host {host}. Agent possibly never deployed: {e}')
+ return
+ else:
+ self.mgr.offline_hosts_remove(host)
+
if self.mgr.cache.host_needs_check(host):
r = self._check_host(host)
if r is not None:
bad_hosts.append(r)
- if self.mgr.cache.host_needs_daemon_refresh(host):
- self.log.debug('refreshing %s daemons' % host)
- r = self._refresh_host_daemons(host)
- if r:
- failures.append(r)
+
+ if not self.mgr.use_agent:
+ if self.mgr.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_facts_refresh(host):
+ self.log.debug(('Refreshing %s facts' % host))
+ r = self._refresh_facts(host)
+ if r:
+ failures.append(r)
+
+ if self.mgr.cache.host_needs_network_refresh(host):
+ self.log.debug(('Refreshing %s networks' % host))
+ r = self._refresh_host_networks(host)
+ if r:
+ failures.append(r)
+ self.mgr.cache.metadata_up_to_date[host] = True
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
self.log.debug(f"Logging `{host}` into custom registry")
if r:
failures.append(r)
- if self.mgr.cache.host_needs_facts_refresh(host):
- self.log.debug(('Refreshing %s facts' % host))
- r = self._refresh_facts(host)
- if r:
- failures.append(r)
-
if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
self.log.debug(f"refreshing OSDSpec previews for {host}")
r = self._refresh_host_osdspec_previews(host)
refresh(self.mgr.cache.get_hosts())
+ if 'CEPHADM_AGENT_DOWN' in self.mgr.health_checks:
+ del self.mgr.health_checks['CEPHADM_AGENT_DOWN']
+ if agents_down:
+ detail: List[str] = []
+ for agent in agents_down:
+ detail.append((f'Cephadm agent on host {agent} has not reported in '
+ f'{2.5 * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
+ 'down and host has been marked offline.'))
+ self.mgr.health_checks['CEPHADM_AGENT_DOWN'] = {
+ 'severity': 'warning',
+ 'summary': '%d Cephadm Agent(s) are not reporting. '
+ 'Hosts marked offline' % (len(agents_down)),
+ 'count': len(agents_down),
+ 'detail': detail,
+ }
+ self.mgr.set_health_checks(self.mgr.health_checks)
+
self.mgr.config_checker.run_checks()
health_changed = False
ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
except OrchestratorError as e:
return str(e)
- dm = {}
- for d in ls:
- if not d['style'].startswith('cephadm'):
- continue
- if d['fsid'] != self.mgr._cluster_fsid:
- continue
- if '.' not in d['name']:
- continue
- sd = orchestrator.DaemonDescription()
- sd.last_refresh = datetime_now()
- for k in ['created', 'started', 'last_configured', 'last_deployed']:
- v = d.get(k, None)
- if v:
- setattr(sd, k, str_to_datetime(d[k]))
- sd.daemon_type = d['name'].split('.')[0]
- if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
- logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
- continue
-
- sd.daemon_id = '.'.join(d['name'].split('.')[1:])
- sd.hostname = host
- sd.container_id = d.get('container_id')
- if sd.container_id:
- # shorten the hash
- sd.container_id = sd.container_id[0:12]
- sd.container_image_name = d.get('container_image_name')
- sd.container_image_id = d.get('container_image_id')
- sd.container_image_digests = d.get('container_image_digests')
- sd.memory_usage = d.get('memory_usage')
- sd.memory_request = d.get('memory_request')
- sd.memory_limit = d.get('memory_limit')
- sd._service_name = d.get('service_name')
- sd.deployed_by = d.get('deployed_by')
- sd.version = d.get('version')
- sd.ports = d.get('ports')
- sd.ip = d.get('ip')
- sd.rank = int(d['rank']) if d.get('rank') is not None else None
- sd.rank_generation = int(d['rank_generation']) if d.get(
- 'rank_generation') is not None else None
- if sd.daemon_type == 'osd':
- sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
- if 'state' in d:
- sd.status_desc = d['state']
- sd.status = {
- 'running': DaemonDescriptionStatus.running,
- 'stopped': DaemonDescriptionStatus.stopped,
- 'error': DaemonDescriptionStatus.error,
- 'unknown': DaemonDescriptionStatus.error,
- }[d['state']]
- else:
- sd.status_desc = 'unknown'
- sd.status = None
- dm[sd.name()] = sd
- self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
- self.mgr.cache.update_host_daemons(host, dm)
- self.mgr.cache.save_host(host)
+ self.mgr._process_ls_output(host, ls)
return None
def _refresh_facts(self, host: str) -> Optional[str]:
return None
def _refresh_host_devices(self, host: str) -> Optional[str]:
-
with_lsm = self.mgr.get_module_option('device_enhanced_scan')
inventory_args = ['--', 'inventory',
'--format=json',
else:
raise
- networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
except OrchestratorError as e:
return str(e)
- self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
- host, len(devices), len(networks)))
+ self.log.debug('Refreshed host %s devices (%d)' % (
+ host, len(devices)))
ret = inventory.Devices.from_json(devices)
- self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
+ self.mgr.cache.update_host_devices(host, ret.devices)
self.update_osdspec_previews(host)
self.mgr.cache.save_host(host)
return None
+ def _refresh_host_networks(self, host: str) -> Optional[str]:
+ try:
+ networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
+ except OrchestratorError as e:
+ return str(e)
+
+ self.log.debug('Refreshed host %s networks (%s)' % (
+ host, len(networks)))
+ self.mgr.cache.update_host_networks(host, networks)
+ self.mgr.cache.save_host(host)
+ return None
+
def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
self.update_osdspec_previews(host)
self.mgr.cache.save_host(host)
def _apply_all_services(self) -> bool:
r = False
specs = [] # type: List[ServiceSpec]
- for sn, spec in self.mgr.spec_store.active_specs.items():
- specs.append(spec)
+ # if metadata is not up to date, we still need to apply spec for agent
+ # since the agent is the one who gather the metadata. If we don't we
+ # end up stuck between wanting metadata to be up to date to apply specs
+ # and needing to apply the agent spec to get up to date metadata
+ if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+ try:
+ specs.append(self.mgr.spec_store['agent'].spec)
+ except Exception as e:
+ self.log.debug(f'Failed to find agent spec: {e}')
+ self.mgr.agent_helpers._apply_agent()
+ return r
+ else:
+ for sn, spec in self.mgr.spec_store.active_specs.items():
+ specs.append(spec)
for spec in specs:
try:
if self._apply_service(spec):
rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
ha = HostAssignment(
spec=spec,
- hosts=self.mgr._schedulable_hosts(),
+ hosts=self.mgr.inventory.all_specs() if spec.service_name() == 'agent' else self.mgr._schedulable_hosts(),
unreachable_hosts=self.mgr._unreachable_hosts(),
daemons=daemons,
networks=self.mgr.cache.networks,
self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
+ hosts_altered: Set[str] = set()
+
try:
# assign names
for i in range(len(slots_to_add)):
r = True
progress_done += 1
update_progress()
+ hosts_altered.add(daemon_spec.host)
except (RuntimeError, OrchestratorError) as e:
msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
f"on {slot.hostname}: {e}")
progress_done += 1
update_progress()
+ hosts_altered.add(d.hostname)
self.mgr.remote('progress', 'complete', progress_id)
except Exception as e:
self.mgr.remote('progress', 'fail', progress_id, str(e))
raise
+ finally:
+ if self.mgr.use_agent:
+ # can only send ack to agents if we know for sure port they bound to
+ hosts_altered = set([h for h in hosts_altered if h in self.mgr.cache.agent_ports])
+ self.mgr.agent_helpers._request_agent_acks(hosts_altered)
if r is None:
r = False
assert dd.hostname is not None
assert dd.daemon_type is not None
assert dd.daemon_id is not None
- if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
+ if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd', 'agent']:
# (mon and mgr specs should always exist; osds aren't matched
# to a service spec)
self.log.info('Removing orphan daemon %s...' % dd.name())
stdin=json.dumps(daemon_spec.final_config),
image=image)
+ if daemon_spec.daemon_type == 'agent':
+ self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
+
# refresh daemon state? (ceph daemon reconfig does not need it)
if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
if not code and daemon_spec.host in self.mgr.cache.daemons:
self.log.debug(f"_run_cephadm : command = {command}")
self.log.debug(f"_run_cephadm : args = {args}")
- bypass_image = ('cephadm-exporter',)
+ bypass_image = ('cephadm-exporter', 'agent')
assert image or entity
# Skip the image check for daemons deployed that are not ceph containers
# exec
self.log.debug('args: %s' % (' '.join(final_args)))
if self.mgr.mode == 'root':
- if stdin:
+ # agent has cephadm binary as an extra file which is
+ # therefore passed over stdin. Even for debug logs it's too much
+ if stdin and 'agent' not in str(entity):
self.log.debug('stdin: %s' % stdin)
cmd = ['which', 'python3']
# the CephService class refers to service types, not daemon types
if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
- elif self.TYPE == 'crash':
+ elif self.TYPE in ['crash', 'agent']:
if host == "":
- raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ raise OrchestratorError(
+ f'Host not provided to generate <{self.TYPE}> auth entity name')
return AuthEntity(f'client.{self.TYPE}.{host}')
elif self.TYPE == 'mon':
return AuthEntity('mon.')
daemon_spec.keyring = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
+
+
+class CephadmAgent(CephService):
+ TYPE = 'agent'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ if not self.mgr.cherrypy_thread:
+ raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+ daemon_spec.keyring = keyring
+ self.mgr.cache.agent_keys[host] = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ cfg = {'target_ip': self.mgr.get_mgr_ip(),
+ 'target_port': self.mgr.endpoint_port,
+ 'refresh_period': self.mgr.agent_refresh_rate,
+ 'listener_port': self.mgr.agent_starting_port,
+ 'host': daemon_spec.host}
+
+ assert self.mgr.cherrypy_thread
+ config = {
+ 'agent.json': json.dumps(cfg),
+ 'cephadm': self.mgr._cephadm,
+ 'keyring': daemon_spec.keyring,
+ 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ }
+
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.endpoint_port), self.mgr.cherrypy_thread.ssl_certs.get_root_cert()])
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from ceph.utils import datetime_to_str, datetime_now
-from cephadm.serve import CephadmServe
+from cephadm.serve import CephadmServe, cephadmNoImage
try:
- from typing import Any, Iterator, List
+ from typing import Any, Iterator, List, Callable, Dict
except ImportError:
pass
loop.close()
asyncio.set_event_loop(None)
+def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
+ to_update: Dict[str, Callable[[str, Any], None]] = {
+ 'ls': m._process_ls_output,
+ 'gather-facts': m.cache.update_host_facts,
+ 'list-networks': m.cache.update_host_networks,
+ }
+ if ops:
+ for op in ops:
+ out = CephadmServe(m)._run_cephadm_json(host, cephadmNoImage, op, [])
+ to_update[op](host, out)
+ m.cache.last_daemon_update[host] = datetime_now()
+ m.cache.last_facts_update[host] = datetime_now()
+ m.cache.last_network_update[host] = datetime_now()
+ m.cache.metadata_up_to_date[host] = True
+
+
+def receive_agent_metadata_all_hosts(m: CephadmOrchestrator) -> None:
+ for host in m.cache.get_hosts():
+ receive_agent_metadata(m, host)
@contextmanager
def with_cephadm_module(module_options=None, store=None):
mock.patch("cephadm.services.osd.RemoveUtil._run_mon_cmd"), \
mock.patch("cephadm.module.CephadmOrchestrator.get_osdmap"), \
mock.patch("cephadm.services.osd.OSDService.get_osdspec_affinity", return_value='test_spec'), \
- mock.patch("cephadm.module.CephadmOrchestrator.remote"):
+ mock.patch("cephadm.module.CephadmOrchestrator.remote"), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._request_agent_acks"), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._apply_agent", return_value=False), \
+ mock.patch("cephadm.agent.CephadmAgentHelpers._agent_down", return_value=False), \
+ mock.patch('cephadm.agent.CherryPyThread.run'):
m = CephadmOrchestrator.__new__(CephadmOrchestrator)
if module_options is not None:
wait(m, m.add_host(HostSpec(hostname=name)))
if refresh_hosts:
CephadmServe(m)._refresh_hosts_and_daemons()
+ receive_agent_metadata(m, name)
yield
wait(m, m.remove_host(name))
'value': '127.0.0.0/8'
})
- cephadm_module.cache.update_host_devices_networks(
+ cephadm_module.cache.update_host_networks(
'test',
- [],
{
"127.0.0.0/8": [
"127.0.0.1"
),
])
- cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+ cephadm_module.cache.update_host_devices('test', inventory.devices)
_run_cephadm.return_value = (['{}'], '', 0)
Device('/dev/sdd', available=True)
])
- cephadm_module.cache.update_host_devices_networks('test', inventory.devices, {})
+ cephadm_module.cache.update_host_devices('test', inventory.devices)
_run_cephadm.return_value = (['{}'], '', 0)
from ceph.utils import datetime_to_str, datetime_now
from cephadm import CephadmOrchestrator
from cephadm.inventory import SPEC_STORE_PREFIX
-from cephadm.tests.fixtures import _run_cephadm, wait, with_host
+from cephadm.tests.fixtures import _run_cephadm, wait, with_host, receive_agent_metadata_all_hosts
from cephadm.serve import CephadmServe
from tests import mock
assert cephadm_module.migration_current == 0
CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ receive_agent_metadata_all_hosts(cephadm_module)
cephadm_module.migration.migrate()
CephadmServe(cephadm_module)._apply_all_services()
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from cephadm import CephadmOrchestrator
from cephadm.upgrade import CephadmUpgrade
-from cephadm.serve import CephadmServe
from orchestrator import OrchestratorError, DaemonDescription
-from .fixtures import _run_cephadm, wait, with_host, with_service
+from .fixtures import _run_cephadm, wait, with_host, with_service, \
+ receive_agent_metadata
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
)
])
)):
- CephadmServe(cephadm_module)._refresh_hosts_and_daemons()
+ receive_agent_metadata(cephadm_module, 'host1', ['ls'])
+ receive_agent_metadata(cephadm_module, 'host2', ['ls'])
with mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm(json.dumps({
'image_id': 'image_id',
target_digests = self.upgrade_state.target_digests
target_version = self.upgrade_state.target_version
+ if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+ # need to wait for metadata to come in
+ self.mgr.agent_helpers._request_agent_acks(
+ set([h for h in self.mgr.cache.get_hosts() if not self.mgr.cache.host_metadata_up_to_date(h)]))
+ return
+
first = False
if not target_id or not target_version or not target_digests:
# need to learn the container hash
logger.info('Upgrade: First pull of %s' % target_image)
- self.upgrade_info_str = 'Doing first pull of %s image' % (target_image)
+ self.upgrade_info_str: str = 'Doing first pull of %s image' % (target_image)
try:
target_id, target_version, target_digests = CephadmServe(self.mgr)._get_container_image_info(
target_image)
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
'cephadm-exporter': 'cephadm-exporter',
+ 'agent': 'agent'
}
return mapping[dtype]
'crash': ['crash'],
'container': ['container'],
'cephadm-exporter': ['cephadm-exporter'],
+ 'agent': ['agent']
}
return mapping[stype]
"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi mds mgr mon nfs ' \
- 'node-exporter osd prometheus rbd-mirror rgw ' \
+ 'node-exporter osd prometheus rbd-mirror rgw agent ' \
'container cephadm-exporter ingress cephfs-mirror'.split()
REQUIRES_SERVICE_ID = 'iscsi mds nfs osd rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [