from datetime import datetime, timedelta
from OpenSSL import crypto
-from contextlib import contextmanager
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
-from typing import Any, Dict, Iterator, List, Set, Tuple, TYPE_CHECKING
+from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.mgr.log.debug('Starting cherrypy engine...')
self.start_engine()
self.mgr.log.debug('Cherrypy engine started.')
- agents_down = []
- for h in self.mgr.cache.get_hosts():
- if self.mgr.agent_helpers._check_agent(h):
- agents_down.append(h)
- self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
# wait for the shutdown event
self.cherrypy_shutdown_event.wait()
self.cherrypy_shutdown_event.clear()
error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
- agents_down = []
- for h in self.mgr.cache.get_hosts():
- if self.mgr.agent_helpers._check_agent(h):
- agents_down.append(h)
- self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
-
up_to_date = False
int_ack = int(data['ack'])
class CephadmAgentHelpers:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- self.agent_locks: Dict[str, threading.Lock] = {}
def _request_agent_acks(self, hosts: Set[str], increment: bool = False) -> None:
for host in hosts:
self.mgr.cache.agent_ports = {}
return need_apply
- @contextmanager
- def agent_lock(self, host: str) -> Iterator[threading.Lock]:
- if host not in self.mgr.agent_helpers.agent_locks:
- self.mgr.agent_helpers.agent_locks[host] = threading.Lock()
- lock = self.mgr.agent_helpers.agent_locks[host]
- if not lock.acquire(False):
- raise AgentLockException()
- try:
- yield lock
- finally:
- lock.release()
-
def _check_agent(self, host: str) -> bool:
try:
assert self.mgr.cherrypy_thread
f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
return False
if self.mgr.agent_helpers._agent_down(host):
- if host not in self.mgr.offline_hosts:
- self.mgr.cache.metadata_up_to_date[host] = False
- # In case host is actually offline, it's best to reset the connection to avoid
- # a long timeout trying to use an existing connection to an offline host
- self.mgr.ssh._reset_con(host)
-
- try:
- # try to schedule redeploy of agent in case it is individually down
- agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
- with self.mgr.agent_helpers.agent_lock(host):
- daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- self.mgr._daemon_action(daemon_spec, action='redeploy')
- except AgentLockException:
- self.mgr.log.debug(
- f'Could not redeploy agent on host {host}. Someone else holds agent\'s lock')
- except Exception as e:
- self.mgr.log.debug(
- f'Failed to redeploy agent on host {host}. Agent possibly never deployed: {e}')
return True
else:
try:
host, agent.name())
if not last_config or last_deps != deps:
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- with self.mgr.agent_helpers.agent_lock(host):
- self.mgr._daemon_action(daemon_spec, action='reconfig')
+ self.mgr._daemon_action(daemon_spec, action='reconfig')
return False
- except AgentLockException:
- self.mgr.log.debug(
- f'Could not reconfig agent on host {host}. Someone else holds agent\'s lock')
except Exception as e:
self.mgr.log.debug(
f'Agent on host {host} not ready to have config and deps checked: {e}')
action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
if action:
try:
- with self.mgr.agent_helpers.agent_lock(host):
- daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
- self.mgr._daemon_action(daemon_spec, action=action)
- self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
- except AgentLockException:
- self.mgr.log.debug(
- f'Could not {action} agent on host {host}. Someone else holds agent\'s lock')
+ daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
+ self.mgr._daemon_action(daemon_spec, action=action)
+ self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
except Exception as e:
self.mgr.log.debug(
f'Agent on host {host} not ready to {action}: {e}')