From 960a6f565e68f4ed217cca13160901b30b2e890f Mon Sep 17 00:00:00 2001 From: Adam King Date: Mon, 10 Jan 2022 15:12:11 -0500 Subject: [PATCH] mgr/cephadm: store agent metadata in its own cache To avoid having the host cache getting too big due to having to store this additional info Fixes: https://tracker.ceph.com/issues/53624 Signed-off-by: Adam King --- src/pybind/mgr/cephadm/agent.py | 64 ++++----- src/pybind/mgr/cephadm/inventory.py | 126 +++++++++++++----- src/pybind/mgr/cephadm/module.py | 6 +- src/pybind/mgr/cephadm/serve.py | 17 ++- .../mgr/cephadm/services/cephadmservice.py | 2 +- 5 files changed, 142 insertions(+), 73 deletions(-) diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index 7f65190a975..f6672be0bbd 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -181,9 +181,9 @@ class HostData: if 'keyring' not in data: raise Exception( f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}') - if host not in self.mgr.cache.agent_keys: + if host not in self.mgr.agent_cache.agent_keys: raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent') - if data['keyring'] != self.mgr.cache.agent_keys[host]: + if data['keyring'] != self.mgr.agent_cache.agent_keys[host]: raise Exception(f'Got wrong keyring from agent on host {host}.') if 'port' not in data: raise Exception( @@ -205,16 +205,16 @@ class HostData: def handle_metadata(self, data: Dict[str, Any]) -> str: try: host = data['host'] - self.mgr.cache.agent_ports[host] = int(data['port']) - if host not in self.mgr.cache.agent_counter: - self.mgr.cache.agent_counter[host] = 1 + self.mgr.agent_cache.agent_ports[host] = int(data['port']) + if host not in self.mgr.agent_cache.agent_counter: + self.mgr.agent_cache.agent_counter[host] = 1 self.mgr.agent_helpers._request_agent_acks({host}) res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata' self.mgr.log.debug(res) return res # update timestamp of most recent agent update - self.mgr.cache.agent_timestamp[host] = datetime_now() + self.mgr.agent_cache.agent_timestamp[host] = datetime_now() error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()]) daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host)) @@ -222,11 +222,11 @@ class HostData: up_to_date = False int_ack = int(data['ack']) - if int_ack == self.mgr.cache.agent_counter[host]: + if int_ack == self.mgr.agent_cache.agent_counter[host]: up_to_date = True else: # we got old counter value with message, inform agent of new timestamp - if not self.mgr.cache.messaging_agent(host): + if not self.mgr.agent_cache.messaging_agent(host): self.mgr.agent_helpers._request_agent_acks({host}) self.mgr.log.debug( f'Received old metadata from agent on host {host}. Requested up-to-date metadata.') @@ -260,6 +260,7 @@ class HostData: self.mgr.log.debug( f'Received up-to-date metadata from agent on host {host}.') + self.mgr.agent_cache.save_agent(host) return 'Successfully processed metadata.' except Exception as e: @@ -280,7 +281,7 @@ class AgentMessageThread(threading.Thread): def run(self) -> None: self.mgr.log.debug(f'Sending message to agent on host {self.host}') - self.mgr.cache.sending_agent_message[self.host] = True + self.mgr.agent_cache.sending_agent_message[self.host] = True try: assert self.mgr.cherrypy_thread root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() @@ -307,7 +308,7 @@ class AgentMessageThread(threading.Thread): ssl_ctx.load_cert_chain(cert_fname, key_fname) except Exception as e: self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}') - self.mgr.cache.sending_agent_message[self.host] = False + self.mgr.agent_cache.sending_agent_message[self.host] = False return try: bytes_len: str = str(len(self.data.encode('utf-8'))) @@ -318,7 +319,7 @@ class AgentMessageThread(threading.Thread): bytes_len = '0' + bytes_len except Exception as e: self.mgr.log.error(f'Failed to get length of json payload: {e}') - self.mgr.cache.sending_agent_message[self.host] = False + self.mgr.agent_cache.sending_agent_message[self.host] = False return for retry_wait in [3, 5]: try: @@ -330,8 +331,8 @@ class AgentMessageThread(threading.Thread): agent_response = secure_agent_socket.recv(1024).decode() self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}') if self.daemon_spec: - self.mgr.cache.agent_config_successfully_delivered(self.daemon_spec) - self.mgr.cache.sending_agent_message[self.host] = False + self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec) + self.mgr.agent_cache.sending_agent_message[self.host] = False return except ConnectionError as e: # if it's a connection error, possibly try to connect again. @@ -342,10 +343,10 @@ class AgentMessageThread(threading.Thread): except Exception as e: # if it's not a connection error, something has gone wrong. Give up. self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}') - self.mgr.cache.sending_agent_message[self.host] = False + self.mgr.agent_cache.sending_agent_message[self.host] = False return self.mgr.log.error(f'Could not connect to agent on host {self.host}') - self.mgr.cache.sending_agent_message[self.host] = False + self.mgr.agent_cache.sending_agent_message[self.host] = False return @@ -357,22 +358,22 @@ class CephadmAgentHelpers: for host in hosts: if increment: self.mgr.cache.metadata_up_to_date[host] = False - if host not in self.mgr.cache.agent_counter: - self.mgr.cache.agent_counter[host] = 1 + if host not in self.mgr.agent_cache.agent_counter: + self.mgr.agent_cache.agent_counter[host] = 1 elif increment: - self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1 - payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]} + self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1 + payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]} if daemon_spec: payload['config'] = daemon_spec.final_config message_thread = AgentMessageThread( - host, self.mgr.cache.agent_ports[host], payload, self.mgr, daemon_spec) + host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec) message_thread.start() def _request_ack_all_not_up_to_date(self) -> None: self.mgr.agent_helpers._request_agent_acks( set([h for h in self.mgr.cache.get_hosts() if (not self.mgr.cache.host_metadata_up_to_date(h) - and h in self.mgr.cache.agent_ports and not self.mgr.cache.messaging_agent(h))])) + and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])) def _agent_down(self, host: str) -> bool: # if host is draining or drained (has _no_schedule label) there should not @@ -385,13 +386,13 @@ class CephadmAgentHelpers: # if we don't have a timestamp, it's likely because of a mgr fail over. # just set the timestamp to now. However, if host was offline before, we # should not allow creating a new timestamp to cause it to be marked online - if host not in self.mgr.cache.agent_timestamp: + if host not in self.mgr.agent_cache.agent_timestamp: if host in self.mgr.offline_hosts: return False - self.mgr.cache.agent_timestamp[host] = datetime_now() + self.mgr.agent_cache.agent_timestamp[host] = datetime_now() # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it. down_mult: float = max(self.mgr.agent_down_multiplier, 1.5) - time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host] + time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host] if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate): return True return False @@ -434,7 +435,7 @@ class CephadmAgentHelpers: # we can tell they're in that state if we don't have a keyring for # them in the host cache for agent in self.mgr.cache.get_daemons_by_service('agent'): - if agent.hostname not in self.mgr.cache.agent_keys: + if agent.hostname not in self.mgr.agent_cache.agent_keys: self.mgr._schedule_daemon_action(agent.name(), 'redeploy') if 'agent' not in self.mgr.spec_store: self.mgr.agent_helpers._apply_agent() @@ -443,10 +444,10 @@ class CephadmAgentHelpers: if 'agent' in self.mgr.spec_store: self.mgr.spec_store.rm('agent') need_apply = True - self.mgr.cache.agent_counter = {} - self.mgr.cache.agent_timestamp = {} - self.mgr.cache.agent_keys = {} - self.mgr.cache.agent_ports = {} + self.mgr.agent_cache.agent_counter = {} + self.mgr.agent_cache.agent_timestamp = {} + self.mgr.agent_cache.agent_keys = {} + self.mgr.agent_cache.agent_ports = {} return need_apply def _check_agent(self, host: str) -> bool: @@ -471,8 +472,7 @@ class CephadmAgentHelpers: try: spec = self.mgr.spec_store.active_specs.get('agent', None) deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id) - last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( - host, agent.name()) + last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host) if not last_config or last_deps != deps: # if root cert is the dep that changed, we must use ssh to reconfig # so it's necessary to check this one specifically @@ -486,7 +486,7 @@ class CephadmAgentHelpers: daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) # we need to know the agent port to try to reconfig w/ http # otherwise there is no choice but a full ssh reconfig - if host in self.mgr.cache.agent_ports and root_cert_match and not down: + if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down: daemon_spec = self.mgr.cephadm_services[daemon_type_to_service( daemon_spec.daemon_type)].prepare_create(daemon_spec) self.mgr.agent_helpers._request_agent_acks( diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 15adc91a55a..dbdedd47778 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -25,6 +25,7 @@ logger = logging.getLogger(__name__) HOST_CACHE_PREFIX = "host." SPEC_STORE_PREFIX = "spec." +AGENT_CACHE_PREFIX = 'agent.' class Inventory: @@ -456,12 +457,7 @@ class HostCache(): self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {} - self.agent_counter = {} # type: Dict[str, int] - self.agent_timestamp = {} # type: Dict[str, datetime.datetime] self.metadata_up_to_date = {} # type: Dict[str, bool] - self.agent_keys = {} # type: Dict[str, str] - self.agent_ports = {} # type: Dict[str, int] - self.sending_agent_message = {} # type: Dict[str, bool] def load(self): # type: () -> None @@ -509,13 +505,7 @@ class HostCache(): self.last_host_check[host] = str_to_datetime(j['last_host_check']) self.registry_login_queue.add(host) self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {}) - - self.agent_counter[host] = int(j.get('agent_counter', 1)) - self.metadata_up_to_date[host] = False - self.agent_keys[host] = str(j.get('agent_keys', '')) - agent_port = int(j.get('agent_ports', 0)) - if agent_port: - self.agent_ports[host] = agent_port + self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False) self.mgr.log.debug( 'HostCache.load: host %s has %d daemons, ' @@ -705,13 +695,8 @@ class HostCache(): j['last_client_files'] = self.last_client_files[host] if host in self.scheduled_daemon_actions: j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] - - if host in self.agent_counter: - j['agent_counter'] = self.agent_counter[host] - if host in self.agent_keys: - j['agent_keys'] = self.agent_keys[host] - if host in self.agent_ports: - j['agent_ports'] = self.agent_ports[host] + if host in self.metadata_up_to_date: + j['metadata_up_to_date'] = self.metadata_up_to_date[host] self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) @@ -1007,11 +992,6 @@ class HostCache(): return True return False - def messaging_agent(self, host: str) -> bool: - if host not in self.sending_agent_message or not self.sending_agent_message[host]: - return False - return True - def host_metadata_up_to_date(self, host: str) -> bool: if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]: return False @@ -1027,15 +1007,6 @@ class HostCache(): return False return True - def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None: - # agent successfully received new config. Update config/deps - assert daemon_spec.service_name == 'agent' - self.update_daemon_config_deps( - daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now()) - self.agent_timestamp[daemon_spec.host] = datetime_now() - self.agent_counter[daemon_spec.host] = 1 - self.save_host(daemon_spec.host) - def add_daemon(self, host, dd): # type: (str, orchestrator.DaemonDescription) -> None assert host in self.daemons @@ -1095,6 +1066,95 @@ class HostCache(): return self.scheduled_daemon_actions.get(host, {}).get(daemon) +class AgentCache(): + """ + AgentCache is used for storing metadata about agent daemons that must be kept + through MGR failovers + """ + + def __init__(self, mgr): + # type: (CephadmOrchestrator) -> None + self.mgr: CephadmOrchestrator = mgr + self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]] + self.agent_counter = {} # type: Dict[str, int] + self.agent_timestamp = {} # type: Dict[str, datetime.datetime] + self.agent_keys = {} # type: Dict[str, str] + self.agent_ports = {} # type: Dict[str, int] + self.sending_agent_message = {} # type: Dict[str, bool] + + def load(self): + # type: () -> None + for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items(): + host = k[len(AGENT_CACHE_PREFIX):] + if host not in self.mgr.inventory: + self.mgr.log.warning('removing stray AgentCache record for agent on %s' % ( + host)) + self.mgr.set_store(k, None) + try: + j = json.loads(v) + self.agent_config_deps[host] = {} + conf_deps = j.get('agent_config_deps', {}) + if conf_deps: + conf_deps['last_config'] = str_to_datetime(conf_deps['last_config']) + self.agent_config_deps[host] = conf_deps + self.agent_counter[host] = int(j.get('agent_counter', 1)) + self.agent_timestamp[host] = str_to_datetime( + j.get('agent_timestamp', datetime_to_str(datetime_now()))) + self.agent_keys[host] = str(j.get('agent_keys', '')) + agent_port = int(j.get('agent_ports', 0)) + if agent_port: + self.agent_ports[host] = agent_port + + except Exception as e: + self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % ( + host, e)) + pass + + def save_agent(self, host: str) -> None: + j: Dict[str, Any] = {} + if host in self.agent_config_deps: + j['agent_config_deps'] = { + 'deps': self.agent_config_deps[host].get('deps', []), + 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']), + } + if host in self.agent_counter: + j['agent_counter'] = self.agent_counter[host] + if host in self.agent_keys: + j['agent_keys'] = self.agent_keys[host] + if host in self.agent_ports: + j['agent_ports'] = self.agent_ports[host] + if host in self.agent_timestamp: + j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host]) + + self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j)) + + def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None: + self.agent_config_deps[host] = { + 'deps': deps, + 'last_config': stamp, + } + + def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: + if host in self.agent_config_deps: + return self.agent_config_deps[host].get('deps', []), \ + self.agent_config_deps[host].get('last_config', None) + return None, None + + def messaging_agent(self, host: str) -> bool: + if host not in self.sending_agent_message or not self.sending_agent_message[host]: + return False + return True + + def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None: + # agent successfully received new config. Update config/deps + assert daemon_spec.service_name == 'agent' + self.update_agent_config_deps( + daemon_spec.host, daemon_spec.deps, datetime_now()) + self.agent_timestamp[daemon_spec.host] = datetime_now() + self.agent_counter[daemon_spec.host] = 1 + self.save_agent(daemon_spec.host) + + class EventStore(): def __init__(self, mgr): # type: (CephadmOrchestrator) -> None diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index b45f4a2c3e5..c0e2f12605e 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -54,7 +54,8 @@ from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ NodeExporterService, SNMPGatewayService from .schedule import HostAssignment -from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec +from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \ + ClientKeyringStore, ClientKeyringSpec from .upgrade import CephadmUpgrade from .template import TemplateMgr from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage @@ -475,6 +476,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, self.cache = HostCache(self) self.cache.load() + self.agent_cache = AgentCache(self) + self.agent_cache.load() + self.to_remove_osds = OSDRemovalQueue(self) self.to_remove_osds.load_from_store() diff --git a/src/pybind/mgr/cephadm/serve.py b/src/pybind/mgr/cephadm/serve.py index d4f590b9adf..f8ab3ad0787 100644 --- a/src/pybind/mgr/cephadm/serve.py +++ b/src/pybind/mgr/cephadm/serve.py @@ -824,7 +824,7 @@ class CephadmServe: finally: if self.mgr.use_agent: # can only send ack to agents if we know for sure port they bound to - hosts_altered = set([h for h in hosts_altered if (h in self.mgr.cache.agent_ports and h in [ + hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [ h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])]) self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True) @@ -1124,8 +1124,8 @@ class CephadmServe: image=image) if daemon_spec.daemon_type == 'agent': - self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now() - self.mgr.cache.agent_counter[daemon_spec.host] = 1 + self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now() + self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1 # refresh daemon state? (ceph daemon reconfig does not need it) if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: @@ -1139,9 +1139,14 @@ class CephadmServe: self.mgr.requires_post_actions.add(daemon_spec.name()) self.mgr.cache.invalidate_host_daemons(daemon_spec.host) - self.mgr.cache.update_daemon_config_deps( - daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) - self.mgr.cache.save_host(daemon_spec.host) + if daemon_spec.daemon_type != 'agent': + self.mgr.cache.update_daemon_config_deps( + daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) + self.mgr.cache.save_host(daemon_spec.host) + else: + self.mgr.agent_cache.update_agent_config_deps( + daemon_spec.host, daemon_spec.deps, start_time) + self.mgr.agent_cache.save_agent(daemon_spec.host) msg = "{} {} on host '{}'".format( 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) if not code: diff --git a/src/pybind/mgr/cephadm/services/cephadmservice.py b/src/pybind/mgr/cephadm/services/cephadmservice.py index a052c2cf8de..38fec6ebf85 100644 --- a/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -1027,7 +1027,7 @@ class CephadmAgent(CephService): keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), []) daemon_spec.keyring = keyring - self.mgr.cache.agent_keys[host] = keyring + self.mgr.agent_cache.agent_keys[host] = keyring daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) -- 2.47.3