if 'keyring' not in data:
raise Exception(
f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
- if host not in self.mgr.cache.agent_keys:
+ if host not in self.mgr.agent_cache.agent_keys:
raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
- if data['keyring'] != self.mgr.cache.agent_keys[host]:
+ if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
raise Exception(f'Got wrong keyring from agent on host {host}.')
if 'port' not in data:
raise Exception(
def handle_metadata(self, data: Dict[str, Any]) -> str:
try:
host = data['host']
- self.mgr.cache.agent_ports[host] = int(data['port'])
- if host not in self.mgr.cache.agent_counter:
- self.mgr.cache.agent_counter[host] = 1
+ self.mgr.agent_cache.agent_ports[host] = int(data['port'])
+ if host not in self.mgr.agent_cache.agent_counter:
+ self.mgr.agent_cache.agent_counter[host] = 1
self.mgr.agent_helpers._request_agent_acks({host})
res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
self.mgr.log.debug(res)
return res
# update timestamp of most recent agent update
- self.mgr.cache.agent_timestamp[host] = datetime_now()
+ self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
up_to_date = False
int_ack = int(data['ack'])
- if int_ack == self.mgr.cache.agent_counter[host]:
+ if int_ack == self.mgr.agent_cache.agent_counter[host]:
up_to_date = True
else:
# we got old counter value with message, inform agent of new timestamp
- if not self.mgr.cache.messaging_agent(host):
+ if not self.mgr.agent_cache.messaging_agent(host):
self.mgr.agent_helpers._request_agent_acks({host})
self.mgr.log.debug(
f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
self.mgr.log.debug(
f'Received up-to-date metadata from agent on host {host}.')
+ self.mgr.agent_cache.save_agent(host)
return 'Successfully processed metadata.'
except Exception as e:
def run(self) -> None:
self.mgr.log.debug(f'Sending message to agent on host {self.host}')
- self.mgr.cache.sending_agent_message[self.host] = True
+ self.mgr.agent_cache.sending_agent_message[self.host] = True
try:
assert self.mgr.cherrypy_thread
root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
ssl_ctx.load_cert_chain(cert_fname, key_fname)
except Exception as e:
self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
- self.mgr.cache.sending_agent_message[self.host] = False
+ self.mgr.agent_cache.sending_agent_message[self.host] = False
return
try:
bytes_len: str = str(len(self.data.encode('utf-8')))
bytes_len = '0' + bytes_len
except Exception as e:
self.mgr.log.error(f'Failed to get length of json payload: {e}')
- self.mgr.cache.sending_agent_message[self.host] = False
+ self.mgr.agent_cache.sending_agent_message[self.host] = False
return
for retry_wait in [3, 5]:
try:
agent_response = secure_agent_socket.recv(1024).decode()
self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
if self.daemon_spec:
- self.mgr.cache.agent_config_successfully_delivered(self.daemon_spec)
- self.mgr.cache.sending_agent_message[self.host] = False
+ self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
+ self.mgr.agent_cache.sending_agent_message[self.host] = False
return
except ConnectionError as e:
# if it's a connection error, possibly try to connect again.
except Exception as e:
# if it's not a connection error, something has gone wrong. Give up.
self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
- self.mgr.cache.sending_agent_message[self.host] = False
+ self.mgr.agent_cache.sending_agent_message[self.host] = False
return
self.mgr.log.error(f'Could not connect to agent on host {self.host}')
- self.mgr.cache.sending_agent_message[self.host] = False
+ self.mgr.agent_cache.sending_agent_message[self.host] = False
return
for host in hosts:
if increment:
self.mgr.cache.metadata_up_to_date[host] = False
- if host not in self.mgr.cache.agent_counter:
- self.mgr.cache.agent_counter[host] = 1
+ if host not in self.mgr.agent_cache.agent_counter:
+ self.mgr.agent_cache.agent_counter[host] = 1
elif increment:
- self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
- payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
+ self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
+ payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
if daemon_spec:
payload['config'] = daemon_spec.final_config
message_thread = AgentMessageThread(
- host, self.mgr.cache.agent_ports[host], payload, self.mgr, daemon_spec)
+ host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
message_thread.start()
def _request_ack_all_not_up_to_date(self) -> None:
self.mgr.agent_helpers._request_agent_acks(
set([h for h in self.mgr.cache.get_hosts() if
(not self.mgr.cache.host_metadata_up_to_date(h)
- and h in self.mgr.cache.agent_ports and not self.mgr.cache.messaging_agent(h))]))
+ and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
def _agent_down(self, host: str) -> bool:
# if host is draining or drained (has _no_schedule label) there should not
# if we don't have a timestamp, it's likely because of a mgr fail over.
# just set the timestamp to now. However, if host was offline before, we
# should not allow creating a new timestamp to cause it to be marked online
- if host not in self.mgr.cache.agent_timestamp:
+ if host not in self.mgr.agent_cache.agent_timestamp:
if host in self.mgr.offline_hosts:
return False
- self.mgr.cache.agent_timestamp[host] = datetime_now()
+ self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
# agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
- time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
+ time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
return True
return False
# we can tell they're in that state if we don't have a keyring for
# them in the host cache
for agent in self.mgr.cache.get_daemons_by_service('agent'):
- if agent.hostname not in self.mgr.cache.agent_keys:
+ if agent.hostname not in self.mgr.agent_cache.agent_keys:
self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
if 'agent' not in self.mgr.spec_store:
self.mgr.agent_helpers._apply_agent()
if 'agent' in self.mgr.spec_store:
self.mgr.spec_store.rm('agent')
need_apply = True
- self.mgr.cache.agent_counter = {}
- self.mgr.cache.agent_timestamp = {}
- self.mgr.cache.agent_keys = {}
- self.mgr.cache.agent_ports = {}
+ self.mgr.agent_cache.agent_counter = {}
+ self.mgr.agent_cache.agent_timestamp = {}
+ self.mgr.agent_cache.agent_keys = {}
+ self.mgr.agent_cache.agent_ports = {}
return need_apply
def _check_agent(self, host: str) -> bool:
try:
spec = self.mgr.spec_store.active_specs.get('agent', None)
deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
- last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
- host, agent.name())
+ last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
if not last_config or last_deps != deps:
# if root cert is the dep that changed, we must use ssh to reconfig
# so it's necessary to check this one specifically
daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
# we need to know the agent port to try to reconfig w/ http
# otherwise there is no choice but a full ssh reconfig
- if host in self.mgr.cache.agent_ports and root_cert_match and not down:
+ if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
daemon_spec.daemon_type)].prepare_create(daemon_spec)
self.mgr.agent_helpers._request_agent_acks(
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
+AGENT_CACHE_PREFIX = 'agent.'
class Inventory:
self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
- self.agent_counter = {} # type: Dict[str, int]
- self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
self.metadata_up_to_date = {} # type: Dict[str, bool]
- self.agent_keys = {} # type: Dict[str, str]
- self.agent_ports = {} # type: Dict[str, int]
- self.sending_agent_message = {} # type: Dict[str, bool]
def load(self):
# type: () -> None
self.last_host_check[host] = str_to_datetime(j['last_host_check'])
self.registry_login_queue.add(host)
self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
-
- self.agent_counter[host] = int(j.get('agent_counter', 1))
- self.metadata_up_to_date[host] = False
- self.agent_keys[host] = str(j.get('agent_keys', ''))
- agent_port = int(j.get('agent_ports', 0))
- if agent_port:
- self.agent_ports[host] = agent_port
+ self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
j['last_client_files'] = self.last_client_files[host]
if host in self.scheduled_daemon_actions:
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
-
- if host in self.agent_counter:
- j['agent_counter'] = self.agent_counter[host]
- if host in self.agent_keys:
- j['agent_keys'] = self.agent_keys[host]
- if host in self.agent_ports:
- j['agent_ports'] = self.agent_ports[host]
+ if host in self.metadata_up_to_date:
+ j['metadata_up_to_date'] = self.metadata_up_to_date[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
return True
return False
- def messaging_agent(self, host: str) -> bool:
- if host not in self.sending_agent_message or not self.sending_agent_message[host]:
- return False
- return True
-
def host_metadata_up_to_date(self, host: str) -> bool:
if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
return False
return False
return True
- def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
- # agent successfully received new config. Update config/deps
- assert daemon_spec.service_name == 'agent'
- self.update_daemon_config_deps(
- daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
- self.agent_timestamp[daemon_spec.host] = datetime_now()
- self.agent_counter[daemon_spec.host] = 1
- self.save_host(daemon_spec.host)
-
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
return self.scheduled_daemon_actions.get(host, {}).get(daemon)
+class AgentCache():
+ """
+ AgentCache is used for storing metadata about agent daemons that must be kept
+ through MGR failovers
+ """
+
+ def __init__(self, mgr):
+ # type: (CephadmOrchestrator) -> None
+ self.mgr: CephadmOrchestrator = mgr
+ self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
+ self.agent_counter = {} # type: Dict[str, int]
+ self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
+ self.agent_keys = {} # type: Dict[str, str]
+ self.agent_ports = {} # type: Dict[str, int]
+ self.sending_agent_message = {} # type: Dict[str, bool]
+
+ def load(self):
+ # type: () -> None
+ for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
+ host = k[len(AGENT_CACHE_PREFIX):]
+ if host not in self.mgr.inventory:
+ self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
+ host))
+ self.mgr.set_store(k, None)
+ try:
+ j = json.loads(v)
+ self.agent_config_deps[host] = {}
+ conf_deps = j.get('agent_config_deps', {})
+ if conf_deps:
+ conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
+ self.agent_config_deps[host] = conf_deps
+ self.agent_counter[host] = int(j.get('agent_counter', 1))
+ self.agent_timestamp[host] = str_to_datetime(
+ j.get('agent_timestamp', datetime_to_str(datetime_now())))
+ self.agent_keys[host] = str(j.get('agent_keys', ''))
+ agent_port = int(j.get('agent_ports', 0))
+ if agent_port:
+ self.agent_ports[host] = agent_port
+
+ except Exception as e:
+ self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
+ host, e))
+ pass
+
+ def save_agent(self, host: str) -> None:
+ j: Dict[str, Any] = {}
+ if host in self.agent_config_deps:
+ j['agent_config_deps'] = {
+ 'deps': self.agent_config_deps[host].get('deps', []),
+ 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
+ }
+ if host in self.agent_counter:
+ j['agent_counter'] = self.agent_counter[host]
+ if host in self.agent_keys:
+ j['agent_keys'] = self.agent_keys[host]
+ if host in self.agent_ports:
+ j['agent_ports'] = self.agent_ports[host]
+ if host in self.agent_timestamp:
+ j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
+
+ self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
+
+ def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
+ self.agent_config_deps[host] = {
+ 'deps': deps,
+ 'last_config': stamp,
+ }
+
+ def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+ if host in self.agent_config_deps:
+ return self.agent_config_deps[host].get('deps', []), \
+ self.agent_config_deps[host].get('last_config', None)
+ return None, None
+
+ def messaging_agent(self, host: str) -> bool:
+ if host not in self.sending_agent_message or not self.sending_agent_message[host]:
+ return False
+ return True
+
+ def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+ # agent successfully received new config. Update config/deps
+ assert daemon_spec.service_name == 'agent'
+ self.update_agent_config_deps(
+ daemon_spec.host, daemon_spec.deps, datetime_now())
+ self.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.agent_counter[daemon_spec.host] = 1
+ self.save_agent(daemon_spec.host)
+
+
class EventStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService, SNMPGatewayService
from .schedule import HostAssignment
-from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
+from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
+ ClientKeyringStore, ClientKeyringSpec
from .upgrade import CephadmUpgrade
from .template import TemplateMgr
from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
self.cache = HostCache(self)
self.cache.load()
+ self.agent_cache = AgentCache(self)
+ self.agent_cache.load()
+
self.to_remove_osds = OSDRemovalQueue(self)
self.to_remove_osds.load_from_store()
finally:
if self.mgr.use_agent:
# can only send ack to agents if we know for sure port they bound to
- hosts_altered = set([h for h in hosts_altered if (h in self.mgr.cache.agent_ports and h in [
+ hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
image=image)
if daemon_spec.daemon_type == 'agent':
- self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
- self.mgr.cache.agent_counter[daemon_spec.host] = 1
+ self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
# refresh daemon state? (ceph daemon reconfig does not need it)
if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
self.mgr.requires_post_actions.add(daemon_spec.name())
self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
- self.mgr.cache.update_daemon_config_deps(
- daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
- self.mgr.cache.save_host(daemon_spec.host)
+ if daemon_spec.daemon_type != 'agent':
+ self.mgr.cache.update_daemon_config_deps(
+ daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
+ self.mgr.cache.save_host(daemon_spec.host)
+ else:
+ self.mgr.agent_cache.update_agent_config_deps(
+ daemon_spec.host, daemon_spec.deps, start_time)
+ self.mgr.agent_cache.save_agent(daemon_spec.host)
msg = "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
if not code:
keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
daemon_spec.keyring = keyring
- self.mgr.cache.agent_keys[host] = keyring
+ self.mgr.agent_cache.agent_keys[host] = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)