from datetime import datetime, timedelta
from OpenSSL import crypto
+from contextlib import contextmanager
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
-from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
+from typing import Any, Dict, Iterator, List, Set, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.mgr.log.info('Starting cherrypy engine...')
cherrypy.engine.start()
self.mgr.log.info('Cherrypy engine started.')
- self.mgr._kick_serve_loop()
+ agents_down = []
+ for h in self.mgr.cache.get_hosts():
+ if self.mgr.agent_helpers._check_agent(h):
+ agents_down.append(h)
+ self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
# wait for the shutdown event
self.cherrypy_shutdown_event.wait()
self.cherrypy_shutdown_event.clear()
agents_down = []
for h in self.mgr.cache.get_hosts():
- if self.mgr.agent_helpers._agent_down(h):
+ if self.mgr.agent_helpers._check_agent(h):
agents_down.append(h)
self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
class CephadmAgentHelpers:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ self.agent_locks: Dict[str, threading.Lock] = {}
def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
for host in hosts:
self.mgr.cache.agent_ports = {}
return need_apply
+ @contextmanager
+ def agent_lock(self, host: str) -> Iterator[threading.Lock]:
+ if host not in self.mgr.agent_helpers.agent_locks:
+ self.mgr.agent_helpers.agent_locks[host] = threading.Lock()
+ lock = self.mgr.agent_helpers.agent_locks[host]
+ if not lock.acquire(False):
+ raise Exception('Agent lock in use')
+ try:
+ yield lock
+ finally:
+ lock.release()
+
def _check_agent(self, host: str) -> bool:
+ try:
+ assert self.mgr.cherrypy_thread
+ assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ except Exception:
+ self.mgr.log.debug(
+ f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
+ return False
if self.mgr.agent_helpers._agent_down(host):
if host not in self.mgr.offline_hosts:
self.mgr.cache.metadata_up_to_date[host] = False
# try to schedule redeploy of agent in case it is individually down
agent = [a for a in self.mgr.cache.get_daemons_by_type(
'agent') if a.hostname == host][0]
- self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+ with self.mgr.agent_helpers.agent_lock(host):
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+ self.mgr._daemon_action(daemon_spec, action='redeploy')
except Exception as e:
self.mgr.log.debug(
- f'Failed to find entry for agent deployed on host {host}. Agent possibly never deployed: {e}')
+ f'Failed to redeploy agent on host {host}. Agent possibly never deployed: {e}')
return True
else:
try:
agent = [a for a in self.mgr.cache.get_daemons_by_type(
'agent') if a.hostname == host][0]
assert agent.daemon_id is not None
+ assert agent.hostname is not None
+ except Exception as e:
+ self.mgr.log.debug(
+ f'Could not retrieve agent on host {host} from daemon cache: {e}')
+ return False
+ try:
spec = self.mgr.spec_store.active_specs.get('agent', None)
deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
host, agent.name())
if not last_config or last_deps != deps:
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- self.mgr._daemon_action(daemon_spec, action='reconfig')
+ with self.mgr.agent_helpers.agent_lock(host):
+ self.mgr._daemon_action(daemon_spec, action='reconfig')
+ return False
except Exception as e:
self.mgr.log.debug(
f'Agent on host {host} not ready to have config and deps checked: {e}')
+ action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
+ if action:
+ try:
+ with self.mgr.agent_helpers.agent_lock(host):
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+ self.mgr._daemon_action(daemon_spec, action=action)
+ self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
+ except Exception as e:
+ self.mgr.log.debug(
+ f'Agent on host {host} not ready to {action}: {e}')
return False