]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/cephadm: store agent metadata in its own cache
authorAdam King <adking@redhat.com>
Mon, 10 Jan 2022 20:12:11 +0000 (15:12 -0500)
committerAdam King <adking@redhat.com>
Mon, 24 Jan 2022 23:59:15 +0000 (18:59 -0500)
To avoid having the host cache getting too big
due to having to store this additional info

Fixes: https://tracker.ceph.com/issues/53624
Signed-off-by: Adam King <adking@redhat.com>
src/pybind/mgr/cephadm/agent.py
src/pybind/mgr/cephadm/inventory.py
src/pybind/mgr/cephadm/module.py
src/pybind/mgr/cephadm/serve.py
src/pybind/mgr/cephadm/services/cephadmservice.py

index 7f65190a9754519d8dc508999ca913f04093d03c..f6672be0bbda5881542ba0bca0358557b9956122 100644 (file)
@@ -181,9 +181,9 @@ class HostData:
         if 'keyring' not in data:
             raise Exception(
                 f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
-        if host not in self.mgr.cache.agent_keys:
+        if host not in self.mgr.agent_cache.agent_keys:
             raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
-        if data['keyring'] != self.mgr.cache.agent_keys[host]:
+        if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
             raise Exception(f'Got wrong keyring from agent on host {host}.')
         if 'port' not in data:
             raise Exception(
@@ -205,16 +205,16 @@ class HostData:
     def handle_metadata(self, data: Dict[str, Any]) -> str:
         try:
             host = data['host']
-            self.mgr.cache.agent_ports[host] = int(data['port'])
-            if host not in self.mgr.cache.agent_counter:
-                self.mgr.cache.agent_counter[host] = 1
+            self.mgr.agent_cache.agent_ports[host] = int(data['port'])
+            if host not in self.mgr.agent_cache.agent_counter:
+                self.mgr.agent_cache.agent_counter[host] = 1
                 self.mgr.agent_helpers._request_agent_acks({host})
                 res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
                 self.mgr.log.debug(res)
                 return res
 
             # update timestamp of most recent agent update
-            self.mgr.cache.agent_timestamp[host] = datetime_now()
+            self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
 
             error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
             daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
@@ -222,11 +222,11 @@ class HostData:
             up_to_date = False
 
             int_ack = int(data['ack'])
-            if int_ack == self.mgr.cache.agent_counter[host]:
+            if int_ack == self.mgr.agent_cache.agent_counter[host]:
                 up_to_date = True
             else:
                 # we got old counter value with message, inform agent of new timestamp
-                if not self.mgr.cache.messaging_agent(host):
+                if not self.mgr.agent_cache.messaging_agent(host):
                     self.mgr.agent_helpers._request_agent_acks({host})
                 self.mgr.log.debug(
                     f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
@@ -260,6 +260,7 @@ class HostData:
                 self.mgr.log.debug(
                     f'Received up-to-date metadata from agent on host {host}.')
 
+            self.mgr.agent_cache.save_agent(host)
             return 'Successfully processed metadata.'
 
         except Exception as e:
@@ -280,7 +281,7 @@ class AgentMessageThread(threading.Thread):
 
     def run(self) -> None:
         self.mgr.log.debug(f'Sending message to agent on host {self.host}')
-        self.mgr.cache.sending_agent_message[self.host] = True
+        self.mgr.agent_cache.sending_agent_message[self.host] = True
         try:
             assert self.mgr.cherrypy_thread
             root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
@@ -307,7 +308,7 @@ class AgentMessageThread(threading.Thread):
             ssl_ctx.load_cert_chain(cert_fname, key_fname)
         except Exception as e:
             self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
-            self.mgr.cache.sending_agent_message[self.host] = False
+            self.mgr.agent_cache.sending_agent_message[self.host] = False
             return
         try:
             bytes_len: str = str(len(self.data.encode('utf-8')))
@@ -318,7 +319,7 @@ class AgentMessageThread(threading.Thread):
                 bytes_len = '0' + bytes_len
         except Exception as e:
             self.mgr.log.error(f'Failed to get length of json payload: {e}')
-            self.mgr.cache.sending_agent_message[self.host] = False
+            self.mgr.agent_cache.sending_agent_message[self.host] = False
             return
         for retry_wait in [3, 5]:
             try:
@@ -330,8 +331,8 @@ class AgentMessageThread(threading.Thread):
                 agent_response = secure_agent_socket.recv(1024).decode()
                 self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
                 if self.daemon_spec:
-                    self.mgr.cache.agent_config_successfully_delivered(self.daemon_spec)
-                self.mgr.cache.sending_agent_message[self.host] = False
+                    self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
+                self.mgr.agent_cache.sending_agent_message[self.host] = False
                 return
             except ConnectionError as e:
                 # if it's a connection error, possibly try to connect again.
@@ -342,10 +343,10 @@ class AgentMessageThread(threading.Thread):
             except Exception as e:
                 # if it's not a connection error, something has gone wrong. Give up.
                 self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
-                self.mgr.cache.sending_agent_message[self.host] = False
+                self.mgr.agent_cache.sending_agent_message[self.host] = False
                 return
         self.mgr.log.error(f'Could not connect to agent on host {self.host}')
-        self.mgr.cache.sending_agent_message[self.host] = False
+        self.mgr.agent_cache.sending_agent_message[self.host] = False
         return
 
 
@@ -357,22 +358,22 @@ class CephadmAgentHelpers:
         for host in hosts:
             if increment:
                 self.mgr.cache.metadata_up_to_date[host] = False
-            if host not in self.mgr.cache.agent_counter:
-                self.mgr.cache.agent_counter[host] = 1
+            if host not in self.mgr.agent_cache.agent_counter:
+                self.mgr.agent_cache.agent_counter[host] = 1
             elif increment:
-                self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
-            payload: Dict[str, Any] = {'counter': self.mgr.cache.agent_counter[host]}
+                self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
+            payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
             if daemon_spec:
                 payload['config'] = daemon_spec.final_config
             message_thread = AgentMessageThread(
-                host, self.mgr.cache.agent_ports[host], payload, self.mgr, daemon_spec)
+                host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
             message_thread.start()
 
     def _request_ack_all_not_up_to_date(self) -> None:
         self.mgr.agent_helpers._request_agent_acks(
             set([h for h in self.mgr.cache.get_hosts() if
                  (not self.mgr.cache.host_metadata_up_to_date(h)
-                 and h in self.mgr.cache.agent_ports and not self.mgr.cache.messaging_agent(h))]))
+                 and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
 
     def _agent_down(self, host: str) -> bool:
         # if host is draining or drained (has _no_schedule label) there should not
@@ -385,13 +386,13 @@ class CephadmAgentHelpers:
         # if we don't have a timestamp, it's likely because of a mgr fail over.
         # just set the timestamp to now. However, if host was offline before, we
         # should not allow creating a new timestamp to cause it to be marked online
-        if host not in self.mgr.cache.agent_timestamp:
+        if host not in self.mgr.agent_cache.agent_timestamp:
             if host in self.mgr.offline_hosts:
                 return False
-            self.mgr.cache.agent_timestamp[host] = datetime_now()
+            self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
         # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
         down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
-        time_diff = datetime_now() - self.mgr.cache.agent_timestamp[host]
+        time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
         if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
             return True
         return False
@@ -434,7 +435,7 @@ class CephadmAgentHelpers:
             # we can tell they're in that state if we don't have a keyring for
             # them in the host cache
             for agent in self.mgr.cache.get_daemons_by_service('agent'):
-                if agent.hostname not in self.mgr.cache.agent_keys:
+                if agent.hostname not in self.mgr.agent_cache.agent_keys:
                     self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
             if 'agent' not in self.mgr.spec_store:
                 self.mgr.agent_helpers._apply_agent()
@@ -443,10 +444,10 @@ class CephadmAgentHelpers:
             if 'agent' in self.mgr.spec_store:
                 self.mgr.spec_store.rm('agent')
                 need_apply = True
-            self.mgr.cache.agent_counter = {}
-            self.mgr.cache.agent_timestamp = {}
-            self.mgr.cache.agent_keys = {}
-            self.mgr.cache.agent_ports = {}
+            self.mgr.agent_cache.agent_counter = {}
+            self.mgr.agent_cache.agent_timestamp = {}
+            self.mgr.agent_cache.agent_keys = {}
+            self.mgr.agent_cache.agent_ports = {}
         return need_apply
 
     def _check_agent(self, host: str) -> bool:
@@ -471,8 +472,7 @@ class CephadmAgentHelpers:
         try:
             spec = self.mgr.spec_store.active_specs.get('agent', None)
             deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
-            last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
-                host, agent.name())
+            last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
             if not last_config or last_deps != deps:
                 # if root cert is the dep that changed, we must use ssh to reconfig
                 # so it's necessary to check this one specifically
@@ -486,7 +486,7 @@ class CephadmAgentHelpers:
                 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
                 # we need to know the agent port to try to reconfig w/ http
                 # otherwise there is no choice but a full ssh reconfig
-                if host in self.mgr.cache.agent_ports and root_cert_match and not down:
+                if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
                     daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
                         daemon_spec.daemon_type)].prepare_create(daemon_spec)
                     self.mgr.agent_helpers._request_agent_acks(
index 15adc91a55ab4fcad4c40d1f4081f7e07aea83b8..dbdedd477780c5a607c06ae2ab2a0c73fe68d368 100644 (file)
@@ -25,6 +25,7 @@ logger = logging.getLogger(__name__)
 
 HOST_CACHE_PREFIX = "host."
 SPEC_STORE_PREFIX = "spec."
+AGENT_CACHE_PREFIX = 'agent.'
 
 
 class Inventory:
@@ -456,12 +457,7 @@ class HostCache():
 
         self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
 
-        self.agent_counter = {}  # type: Dict[str, int]
-        self.agent_timestamp = {}  # type: Dict[str, datetime.datetime]
         self.metadata_up_to_date = {}  # type: Dict[str, bool]
-        self.agent_keys = {}  # type: Dict[str, str]
-        self.agent_ports = {}  # type: Dict[str, int]
-        self.sending_agent_message = {}  # type: Dict[str, bool]
 
     def load(self):
         # type: () -> None
@@ -509,13 +505,7 @@ class HostCache():
                     self.last_host_check[host] = str_to_datetime(j['last_host_check'])
                 self.registry_login_queue.add(host)
                 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
-
-                self.agent_counter[host] = int(j.get('agent_counter', 1))
-                self.metadata_up_to_date[host] = False
-                self.agent_keys[host] = str(j.get('agent_keys', ''))
-                agent_port = int(j.get('agent_ports', 0))
-                if agent_port:
-                    self.agent_ports[host] = agent_port
+                self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
 
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
@@ -705,13 +695,8 @@ class HostCache():
             j['last_client_files'] = self.last_client_files[host]
         if host in self.scheduled_daemon_actions:
             j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
-
-        if host in self.agent_counter:
-            j['agent_counter'] = self.agent_counter[host]
-        if host in self.agent_keys:
-            j['agent_keys'] = self.agent_keys[host]
-        if host in self.agent_ports:
-            j['agent_ports'] = self.agent_ports[host]
+        if host in self.metadata_up_to_date:
+            j['metadata_up_to_date'] = self.metadata_up_to_date[host]
 
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
@@ -1007,11 +992,6 @@ class HostCache():
             return True
         return False
 
-    def messaging_agent(self, host: str) -> bool:
-        if host not in self.sending_agent_message or not self.sending_agent_message[host]:
-            return False
-        return True
-
     def host_metadata_up_to_date(self, host: str) -> bool:
         if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
             return False
@@ -1027,15 +1007,6 @@ class HostCache():
             return False
         return True
 
-    def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
-        # agent successfully received new config. Update config/deps
-        assert daemon_spec.service_name == 'agent'
-        self.update_daemon_config_deps(
-            daemon_spec.host, daemon_spec.name(), daemon_spec.deps, datetime_now())
-        self.agent_timestamp[daemon_spec.host] = datetime_now()
-        self.agent_counter[daemon_spec.host] = 1
-        self.save_host(daemon_spec.host)
-
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
@@ -1095,6 +1066,95 @@ class HostCache():
         return self.scheduled_daemon_actions.get(host, {}).get(daemon)
 
 
+class AgentCache():
+    """
+    AgentCache is used for storing metadata about agent daemons that must be kept
+    through MGR failovers
+    """
+
+    def __init__(self, mgr):
+        # type: (CephadmOrchestrator) -> None
+        self.mgr: CephadmOrchestrator = mgr
+        self.agent_config_deps = {}   # type: Dict[str, Dict[str,Any]]
+        self.agent_counter = {}  # type: Dict[str, int]
+        self.agent_timestamp = {}  # type: Dict[str, datetime.datetime]
+        self.agent_keys = {}  # type: Dict[str, str]
+        self.agent_ports = {}  # type: Dict[str, int]
+        self.sending_agent_message = {}  # type: Dict[str, bool]
+
+    def load(self):
+        # type: () -> None
+        for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
+            host = k[len(AGENT_CACHE_PREFIX):]
+            if host not in self.mgr.inventory:
+                self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
+                    host))
+                self.mgr.set_store(k, None)
+            try:
+                j = json.loads(v)
+                self.agent_config_deps[host] = {}
+                conf_deps = j.get('agent_config_deps', {})
+                if conf_deps:
+                    conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
+                self.agent_config_deps[host] = conf_deps
+                self.agent_counter[host] = int(j.get('agent_counter', 1))
+                self.agent_timestamp[host] = str_to_datetime(
+                    j.get('agent_timestamp', datetime_to_str(datetime_now())))
+                self.agent_keys[host] = str(j.get('agent_keys', ''))
+                agent_port = int(j.get('agent_ports', 0))
+                if agent_port:
+                    self.agent_ports[host] = agent_port
+
+            except Exception as e:
+                self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
+                    host, e))
+                pass
+
+    def save_agent(self, host: str) -> None:
+        j: Dict[str, Any] = {}
+        if host in self.agent_config_deps:
+            j['agent_config_deps'] = {
+                'deps': self.agent_config_deps[host].get('deps', []),
+                'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
+            }
+        if host in self.agent_counter:
+            j['agent_counter'] = self.agent_counter[host]
+        if host in self.agent_keys:
+            j['agent_keys'] = self.agent_keys[host]
+        if host in self.agent_ports:
+            j['agent_ports'] = self.agent_ports[host]
+        if host in self.agent_timestamp:
+            j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
+
+        self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
+
+    def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
+        self.agent_config_deps[host] = {
+            'deps': deps,
+            'last_config': stamp,
+        }
+
+    def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+        if host in self.agent_config_deps:
+            return self.agent_config_deps[host].get('deps', []), \
+                self.agent_config_deps[host].get('last_config', None)
+        return None, None
+
+    def messaging_agent(self, host: str) -> bool:
+        if host not in self.sending_agent_message or not self.sending_agent_message[host]:
+            return False
+        return True
+
+    def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+        # agent successfully received new config. Update config/deps
+        assert daemon_spec.service_name == 'agent'
+        self.update_agent_config_deps(
+            daemon_spec.host, daemon_spec.deps, datetime_now())
+        self.agent_timestamp[daemon_spec.host] = datetime_now()
+        self.agent_counter[daemon_spec.host] = 1
+        self.save_agent(daemon_spec.host)
+
+
 class EventStore():
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None
index b45f4a2c3e52f3df4dc956cd8835de8e10e5c1ff..c0e2f12605edd27f33a1619885b210d4834244da 100644 (file)
@@ -54,7 +54,8 @@ from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService, SNMPGatewayService
 from .schedule import HostAssignment
-from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
+from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
+    ClientKeyringStore, ClientKeyringSpec
 from .upgrade import CephadmUpgrade
 from .template import TemplateMgr
 from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
@@ -475,6 +476,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
         self.cache = HostCache(self)
         self.cache.load()
 
+        self.agent_cache = AgentCache(self)
+        self.agent_cache.load()
+
         self.to_remove_osds = OSDRemovalQueue(self)
         self.to_remove_osds.load_from_store()
 
index d4f590b9adf27dbccf21b4e714dec8b18337bdfe..f8ab3ad0787d08f34419037d46f159a9a10452cc 100644 (file)
@@ -824,7 +824,7 @@ class CephadmServe:
         finally:
             if self.mgr.use_agent:
                 # can only send ack to agents if we know for sure port they bound to
-                hosts_altered = set([h for h in hosts_altered if (h in self.mgr.cache.agent_ports and h in [
+                hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
                                     h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
                 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
 
@@ -1124,8 +1124,8 @@ class CephadmServe:
                     image=image)
 
                 if daemon_spec.daemon_type == 'agent':
-                    self.mgr.cache.agent_timestamp[daemon_spec.host] = datetime_now()
-                    self.mgr.cache.agent_counter[daemon_spec.host] = 1
+                    self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
+                    self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
 
                 # refresh daemon state?  (ceph daemon reconfig does not need it)
                 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
@@ -1139,9 +1139,14 @@ class CephadmServe:
                             self.mgr.requires_post_actions.add(daemon_spec.name())
                     self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
 
-                self.mgr.cache.update_daemon_config_deps(
-                    daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
-                self.mgr.cache.save_host(daemon_spec.host)
+                if daemon_spec.daemon_type != 'agent':
+                    self.mgr.cache.update_daemon_config_deps(
+                        daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
+                    self.mgr.cache.save_host(daemon_spec.host)
+                else:
+                    self.mgr.agent_cache.update_agent_config_deps(
+                        daemon_spec.host, daemon_spec.deps, start_time)
+                    self.mgr.agent_cache.save_agent(daemon_spec.host)
                 msg = "{} {} on host '{}'".format(
                     'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
                 if not code:
index a052c2cf8ded72cd18ee5013a28b3dad9a90d49c..38fec6ebf8514d03679159b315de7d7c26775086 100644 (file)
@@ -1027,7 +1027,7 @@ class CephadmAgent(CephService):
 
         keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
         daemon_spec.keyring = keyring
-        self.mgr.cache.agent_keys[host] = keyring
+        self.mgr.agent_cache.agent_keys[host] = keyring
 
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)