from urllib.error import HTTPError, URLError
from ceph_node_proxy.protocols import SystemForReporter
-from ceph_node_proxy.util import BaseThread, get_logger, http_req
+from ceph_node_proxy.util import BaseThread, get_logger, http_req, _dict_diff
DEFAULT_MAX_RETRIES = 30
RETRY_SLEEP_SEC = 5
HEARTBEAT_INTERVAL_SEC = 300
-
class Reporter(BaseThread):
def __init__(
self,
time.sleep(RETRY_SLEEP_SEC)
return False
+ def _log_data_delta(
+ self,
+ new_data: Dict[str, Any],
+ max_log_len: int = 2048,
+ ) -> None:
+ """Compute diff between previous and new data, then log it (truncated if needed)."""
+ delta = _dict_diff(self.system.previous_data, new_data) or {}
+ delta_json = json.dumps(delta, indent=2, default=str)
+ if len(delta_json) > max_log_len:
+ delta_json = delta_json[:max_log_len] + "\n... (truncated)"
+ if self.system.previous_data:
+ self.log.info(
+ "data has changed since last iteration; delta:\n%s",
+ delta_json,
+ )
+ else:
+ # the first delta is the full data received from the system
+ # which is by definition big so we don't log it as it would be too verbose
+ self.log.info("first data received from the system.")
+
def main(self) -> None:
last_heartbeat = time.monotonic()
while not self.stop:
if self.system.data_ready:
self.log.debug("data ready to be sent to the mgr.")
if self.system.get_system() != self.system.previous_data:
- self.log.info("data has changed since last iteration.")
- self.data["patch"] = self.system.get_system()
+ new_data = self.system.get_system()
+ self._log_data_delta(new_data)
+ self.data["patch"] = new_data
if self._send_with_retries():
- self.system.previous_data = self.system.get_system()
+ self.system.previous_data = new_data
else:
self.log.error(
f"Failed to send data after {self.max_retries} retries; "
f.write(data.encode("utf-8"))
f.flush()
return f
+
+
+def _dict_diff(old: Any, new: Any) -> Any:
+ if old == new:
+ return None
+ if type(old) != type(new) or not isinstance(new, dict):
+ return new
+ if not isinstance(old, dict):
+ return new
+ delta: Dict[str, Any] = {}
+ for k in sorted(set(old) | set(new)):
+ if k not in old:
+ delta[k] = new[k]
+ elif k not in new:
+ delta[k] = None
+ else:
+ sub = _dict_diff(old[k], new[k])
+ if sub is not None:
+ delta[k] = sub
+ return delta if delta else None