From: Adam King Date: Wed, 25 Aug 2021 13:25:11 +0000 (-0400) Subject: cephadm: allow mgr listener to handle variable length json strings X-Git-Tag: v17.1.0~816^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=be72463d032cb047e8d866dca12412c6de61328b;p=ceph.git cephadm: allow mgr listener to handle variable length json strings Signed-off-by: Adam King --- diff --git a/src/cephadm/cephadm b/src/cephadm/cephadm index 4e938295d6d..5265f9c790a 100755 --- a/src/cephadm/cephadm +++ b/src/cephadm/cephadm @@ -3479,31 +3479,39 @@ class MgrListener(Thread): conn, _ = listenSocket.accept() except socket.timeout: continue + try: + length: int = int(conn.recv(10).decode()) + except Exception as e: + err_str = f'Failed to extract length of payload from message: {e}' + conn.send(err_str.encode()) while True: - data = conn.recv(self.agent.socket_buffer_size).decode() - if not data: + payload = conn.recv(length).decode() + if not payload: break try: - self.agent.ack = int(data) + data: Dict[Any, Any] = json.loads(payload) + self.handle_json_payload(data) except Exception as e: - err_str = f'Failed to extract timestamp integer from message: {e}' + err_str = f'Failed to extract json payload from message: {e}' conn.send(err_str.encode()) - self.agent.ack = -1 + logger.error(err_str) else: conn.send(b'ACK') - self.agent.wakeup() - logger.debug(f'Got mgr message {data}') + self.agent.wakeup() + logger.debug(f'Got mgr message {data}') def shutdown(self) -> None: self.stop = True + def handle_json_payload(self, data: Dict[Any, Any]) -> None: + self.agent.ack = int(data['counter']) + class CephadmAgent(): daemon_type = 'agent' default_port = 8498 loop_interval = 30 - socket_buffer_size = 256 stop = False required_files = ['cephadm', 'agent.json', 'keyring'] diff --git a/src/pybind/mgr/cephadm/agent.py b/src/pybind/mgr/cephadm/agent.py index 3ed1c360f1b..e1dc5e96ab5 100644 --- a/src/pybind/mgr/cephadm/agent.py +++ b/src/pybind/mgr/cephadm/agent.py @@ -180,11 +180,11 @@ class HostData: class AgentMessageThread(threading.Thread): - def __init__(self, host: str, port: int, counter: int, mgr: "CephadmOrchestrator") -> None: + def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None: self.mgr = mgr self.host = host self.port = port - self.counter = counter + self.data: str = json.dumps(data) super(AgentMessageThread, self).__init__(target=self.run) def run(self) -> None: @@ -192,9 +192,14 @@ class AgentMessageThread(threading.Thread): try: agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port)) - agent_socket.send(str(self.counter).encode()) + bytes_len: str = str(len(self.data.encode('utf-8'))) + if len(bytes_len.encode('utf-8')) > 10: + raise Exception(f'Message is too big to send to agent. Message size is {bytes_len} bytes!') + while len(bytes_len.encode('utf-8')) < 10: + bytes_len = '0' + bytes_len + agent_socket.sendall((bytes_len + self.data).encode('utf-8')) agent_response = agent_socket.recv(1024).decode() - self.mgr.log.debug(f'Received {agent_response} from agent on host {self.host}') + self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}') return except ConnectionError as e: # if it's a connection error, possibly try to connect again. @@ -221,7 +226,7 @@ class CephadmAgentHelpers: else: self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1 message_thread = AgentMessageThread( - host, self.mgr.cache.agent_ports[host], self.mgr.cache.agent_counter[host], self.mgr) + host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr) message_thread.start() def _agent_down(self, host: str) -> bool: