]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephadm: allow mgr listener to handle variable length json strings
authorAdam King <adking@redhat.com>
Wed, 25 Aug 2021 13:25:11 +0000 (09:25 -0400)
committerAdam King <adking@redhat.com>
Fri, 24 Sep 2021 11:23:50 +0000 (07:23 -0400)
Signed-off-by: Adam King <adking@redhat.com>
src/cephadm/cephadm
src/pybind/mgr/cephadm/agent.py

index 4e938295d6d4eb51d336a8e32558373c33c5816a..5265f9c790a132dd43bc2d9632c3b5f1e3ac371f 100755 (executable)
@@ -3479,31 +3479,39 @@ class MgrListener(Thread):
                 conn, _ = listenSocket.accept()
             except socket.timeout:
                 continue
+            try:
+                length: int = int(conn.recv(10).decode())
+            except Exception as e:
+                err_str = f'Failed to extract length of payload from message: {e}'
+                conn.send(err_str.encode())
             while True:
-                data = conn.recv(self.agent.socket_buffer_size).decode()
-                if not data:
+                payload = conn.recv(length).decode()
+                if not payload:
                     break
                 try:
-                    self.agent.ack = int(data)
+                    data: Dict[Any, Any] = json.loads(payload)
+                    self.handle_json_payload(data)
                 except Exception as e:
-                    err_str = f'Failed to extract timestamp integer from message: {e}'
+                    err_str = f'Failed to extract json payload from message: {e}'
                     conn.send(err_str.encode())
-                    self.agent.ack = -1
+                    logger.error(err_str)
                 else:
                     conn.send(b'ACK')
-                self.agent.wakeup()
-                logger.debug(f'Got mgr message {data}')
+                    self.agent.wakeup()
+                    logger.debug(f'Got mgr message {data}')
 
     def shutdown(self) -> None:
         self.stop = True
 
+    def handle_json_payload(self, data: Dict[Any, Any]) -> None:
+        self.agent.ack = int(data['counter'])
+
 
 class CephadmAgent():
 
     daemon_type = 'agent'
     default_port = 8498
     loop_interval = 30
-    socket_buffer_size = 256
     stop = False
 
     required_files = ['cephadm', 'agent.json', 'keyring']
index 3ed1c360f1b3bb7182eb6e97262e761ad940abcb..e1dc5e96ab5bbaa161795839a9a7b5f1f3211578 100644 (file)
@@ -180,11 +180,11 @@ class HostData:
 
 
 class AgentMessageThread(threading.Thread):
-    def __init__(self, host: str, port: int, counter: int, mgr: "CephadmOrchestrator") -> None:
+    def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None:
         self.mgr = mgr
         self.host = host
         self.port = port
-        self.counter = counter
+        self.data: str = json.dumps(data)
         super(AgentMessageThread, self).__init__(target=self.run)
 
     def run(self) -> None:
@@ -192,9 +192,14 @@ class AgentMessageThread(threading.Thread):
             try:
                 agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                 agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port))
-                agent_socket.send(str(self.counter).encode())
+                bytes_len: str = str(len(self.data.encode('utf-8')))
+                if len(bytes_len.encode('utf-8')) > 10:
+                    raise Exception(f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
+                while len(bytes_len.encode('utf-8')) < 10:
+                    bytes_len = '0' + bytes_len
+                agent_socket.sendall((bytes_len + self.data).encode('utf-8'))
                 agent_response = agent_socket.recv(1024).decode()
-                self.mgr.log.debug(f'Received {agent_response} from agent on host {self.host}')
+                self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
                 return
             except ConnectionError as e:
                 # if it's a connection error, possibly try to connect again.
@@ -221,7 +226,7 @@ class CephadmAgentHelpers:
             else:
                 self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
             message_thread = AgentMessageThread(
-                host, self.mgr.cache.agent_ports[host], self.mgr.cache.agent_counter[host], self.mgr)
+                host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr)
             message_thread.start()
 
     def _agent_down(self, host: str) -> bool: