self.ack = -1
self.event = threading.Event()
self.mgr_listener = MgrListener(self)
+ self.device_enhanced_scan = False
def deploy_daemon_unit(self, config: Dict[str, str] = {}) -> None:
if not config:
self.loop_interval = int(config['refresh_period'])
starting_port = int(config['listener_port'])
self.host = config['host']
+ use_lsm = config['device_enhanced_scan']
except Exception as e:
self.shutdown()
raise Error(f'Failed to get agent target ip and port from config: {e}')
assert self.target_ip and self.target_port
+ self.device_enhanced_scan = False
+ if use_lsm.lower() == 'true':
+ self.device_enhanced_scan = True
+
try:
for _ in range(1001):
if not port_in_use(self.ctx, starting_port):
while not self.stop:
ack = self.ack
+ try:
+ volume = self._ceph_volume(self.device_enhanced_scan)
+ except Exception as e:
+ logger.error(f'Failed to get ceph-volume metadata: {e}')
+ volume = ''
+
data = json.dumps({'host': self.host,
'ls': list_daemons(self.ctx),
'networks': list_networks(self.ctx),
'facts': HostFacts(self.ctx).dump(),
+ 'volume': volume,
'ack': str(ack),
'keyring': self.keyring,
'port': self.listener_port})
self.event.wait(self.loop_interval)
self.event.clear()
+ def _ceph_volume(self, enhanced: bool = False) -> str:
+ self.ctx.command = 'inventory --format=json'.split()
+ if enhanced:
+ self.ctx.command.append('--with-lsm')
+ self.ctx.fsid = self.fsid
+
+ stream = io.StringIO()
+ with redirect_stdout(stream):
+ command_ceph_volume(self.ctx)
+
+ stdout = stream.getvalue()
+
+ if stdout:
+ return stdout
+ else:
+ raise Exception('ceph-volume returned empty value')
+
def command_agent(ctx: CephadmContext) -> None:
agent = CephadmAgent(ctx, ctx.fsid, ctx.daemon_id)
from mgr_util import verify_tls_files
from ceph.utils import datetime_now
+from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from OpenSSL import crypto
except Exception as e:
raise Exception(
f'Counter value from agent on host {host} could not be converted to an integer: {e}')
- metadata_types = ['ls', 'networks', 'facts']
+ metadata_types = ['ls', 'networks', 'facts', 'volume']
metadata_types_str = '{' + ', '.join(metadata_types) + '}'
if not all(item in data.keys() for item in metadata_types):
self.mgr.log.warning(
self.mgr.log.debug(
f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
-
- if 'ls' in data:
+ if 'ls' in data and data['ls']:
self.mgr._process_ls_output(host, data['ls'])
- if 'networks' in data:
+ if 'networks' in data and data['networks']:
self.mgr.cache.update_host_networks(host, data['networks'])
- if 'facts' in data:
+ if 'facts' in data and data['facts']:
self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
+ if 'volume' in data and data['volume']:
+ self.mgr.log.error(data['volume'])
+ ret = Devices.from_json(json.loads(data['volume']))
+ self.mgr.cache.update_host_devices(host, ret.devices)
if up_to_date:
self.mgr.cache.metadata_up_to_date[host] = True
def run(self) -> None:
try:
assert self.mgr.cherrypy_thread
- root_cert= self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
root_cert_tmp = tempfile.NamedTemporaryFile()
root_cert_tmp.write(root_cert.encode('utf-8'))
root_cert_tmp.flush()
try:
bytes_len: str = str(len(self.data.encode('utf-8')))
if len(bytes_len.encode('utf-8')) > 10:
- raise Exception(f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
+ raise Exception(
+ f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
while len(bytes_len.encode('utf-8')) < 10:
bytes_len = '0' + bytes_len
except Exception as e:
root_cert = ''
if self.cherrypy_thread and self.cherrypy_thread.ssl_certs.root_cert:
root_cert = self.cherrypy_thread.ssl_certs.get_root_cert()
- deps = [self.get_mgr_ip(), str(self.endpoint_port), root_cert]
+ deps = sorted([self.get_mgr_ip(), str(self.endpoint_port), root_cert,
+ str(self.get_module_option('device_enhanced_scan'))])
else:
need = {
'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
r = self._refresh_host_networks(host)
if r:
failures.append(r)
+
+ if self.mgr.cache.host_needs_device_refresh(host):
+ self.log.debug('refreshing %s devices' % host)
+ r = self._refresh_host_devices(host)
+ if r:
+ failures.append(r)
self.mgr.cache.metadata_up_to_date[host] = True
if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
if r:
bad_hosts.append(r)
- if self.mgr.cache.host_needs_device_refresh(host):
- self.log.debug('refreshing %s devices' % host)
- r = self._refresh_host_devices(host)
- if r:
- failures.append(r)
-
if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
self.log.debug(f"refreshing OSDSpec previews for {host}")
r = self._refresh_host_osdspec_previews(host)
'target_port': self.mgr.endpoint_port,
'refresh_period': self.mgr.agent_refresh_rate,
'listener_port': self.mgr.agent_starting_port,
- 'host': daemon_spec.host}
+ 'host': daemon_spec.host,
+ 'device_enhanced_scan': str(self.mgr.get_module_option('device_enhanced_scan'))}
assert self.mgr.cherrypy_thread
assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
'listener.key': listener_key,
}
- return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.endpoint_port), self.mgr.cherrypy_thread.ssl_certs.get_root_cert()])
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.endpoint_port), self.mgr.cherrypy_thread.ssl_certs.get_root_cert(), str(self.mgr.get_module_option('device_enhanced_scan'))])