from functools import wraps
from glob import glob
from io import StringIO
-from threading import Thread, RLock
+from threading import Thread, RLock, Event
from urllib.error import HTTPError
from urllib.request import urlopen, Request
from pathlib import Path
self.listener_key_path = os.path.join(self.daemon_dir, 'listener.key')
self.listener_port = ''
self.ack = -1
- self.event = threading.Event()
+ self.event = Event()
self.mgr_listener = MgrListener(self)
self.device_enhanced_scan = False
while not self.stop:
ack = self.ack
-
try:
volume = self._ceph_volume(self.device_enhanced_scan)
except Exception as e:
logger.error(f'Failed to get ceph-volume metadata: {e}')
volume = ''
+ # part of the networks info is returned as a set which is not JSON
+ # serializable. The set must be converted to a list
+ networks = list_networks(self.ctx)
+ networks_list = {}
+ for key in networks.keys():
+ for k, v in networks[key].items():
+ networks_list[key] = {k: list(v)}
+
data = json.dumps({'host': self.host,
'ls': list_daemons(self.ctx),
- 'networks': list_networks(self.ctx),
+ 'networks': networks_list,
'facts': HostFacts(self.ctx).dump(),
'volume': volume,
'ack': str(ack),
if 'facts' in data and data['facts']:
self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
if 'volume' in data and data['volume']:
- self.mgr.log.error(data['volume'])
ret = Devices.from_json(json.loads(data['volume']))
self.mgr.cache.update_host_devices(host, ret.devices)
import logging
import uuid
from collections import defaultdict
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator, Set
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
if host in self.mgr.offline_hosts:
return
self.mgr.offline_hosts.add(host)
- self.mgr._reset_con(host)
+ # In case host is actually offline, it's best to reset the connection to avoid
+ # a long timeout trying to use an existing connection to an offline host
+ # REVISIT AFTER https://github.com/ceph/ceph/pull/42919
+ # self.mgr.ssh._reset_con(host)
agents_down.append(host)
# try to schedule redeploy of agent in case it is individually down
try:
self.log.debug(
f'Failed to find entry for agent deployed on host {host}. Agent possibly never deployed: {e}')
return
- else:
+ elif self.mgr.use_agent:
self.mgr.offline_hosts_remove(host)
if self.mgr.cache.host_needs_check(host):
loop.close()
asyncio.set_event_loop(None)
+
def receive_agent_metadata(m: CephadmOrchestrator, host: str, ops: List[str] = None) -> None:
to_update: Dict[str, Callable[[str, Any], None]] = {
'ls': m._process_ls_output,
for host in m.cache.get_hosts():
receive_agent_metadata(m, host)
+
@contextmanager
def with_cephadm_module(module_options=None, store=None):
"""