conn, _ = listenSocket.accept()
except socket.timeout:
continue
+ try:
+ length: int = int(conn.recv(10).decode())
+ except Exception as e:
+ err_str = f'Failed to extract length of payload from message: {e}'
+ conn.send(err_str.encode())
while True:
- data = conn.recv(self.agent.socket_buffer_size).decode()
- if not data:
+ payload = conn.recv(length).decode()
+ if not payload:
break
try:
- self.agent.ack = int(data)
+ data: Dict[Any, Any] = json.loads(payload)
+ self.handle_json_payload(data)
except Exception as e:
- err_str = f'Failed to extract timestamp integer from message: {e}'
+ err_str = f'Failed to extract json payload from message: {e}'
conn.send(err_str.encode())
- self.agent.ack = -1
+ logger.error(err_str)
else:
conn.send(b'ACK')
- self.agent.wakeup()
- logger.debug(f'Got mgr message {data}')
+ self.agent.wakeup()
+ logger.debug(f'Got mgr message {data}')
def shutdown(self) -> None:
self.stop = True
+ def handle_json_payload(self, data: Dict[Any, Any]) -> None:
+ self.agent.ack = int(data['counter'])
+
class CephadmAgent():
daemon_type = 'agent'
default_port = 8498
loop_interval = 30
- socket_buffer_size = 256
stop = False
required_files = ['cephadm', 'agent.json', 'keyring']
class AgentMessageThread(threading.Thread):
- def __init__(self, host: str, port: int, counter: int, mgr: "CephadmOrchestrator") -> None:
+ def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator") -> None:
self.mgr = mgr
self.host = host
self.port = port
- self.counter = counter
+ self.data: str = json.dumps(data)
super(AgentMessageThread, self).__init__(target=self.run)
def run(self) -> None:
try:
agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
agent_socket.connect((self.mgr.inventory.get_addr(self.host), self.port))
- agent_socket.send(str(self.counter).encode())
+ bytes_len: str = str(len(self.data.encode('utf-8')))
+ if len(bytes_len.encode('utf-8')) > 10:
+ raise Exception(f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
+ while len(bytes_len.encode('utf-8')) < 10:
+ bytes_len = '0' + bytes_len
+ agent_socket.sendall((bytes_len + self.data).encode('utf-8'))
agent_response = agent_socket.recv(1024).decode()
- self.mgr.log.debug(f'Received {agent_response} from agent on host {self.host}')
+ self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
return
except ConnectionError as e:
# if it's a connection error, possibly try to connect again.
else:
self.mgr.cache.agent_counter[host] = self.mgr.cache.agent_counter[host] + 1
message_thread = AgentMessageThread(
- host, self.mgr.cache.agent_ports[host], self.mgr.cache.agent_counter[host], self.mgr)
+ host, self.mgr.cache.agent_ports[host], {'counter': self.mgr.cache.agent_counter[host]}, self.mgr)
message_thread.start()
def _agent_down(self, host: str) -> bool: