]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/mgr/client:Introduce Client::send()
authorMatan Breizman <mbreizma@redhat.com>
Sun, 2 Mar 2025 14:33:07 +0000 (14:33 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 5 Mar 2025 12:26:25 +0000 (12:26 +0000)
Client::reconnect nullifies the connection used by the mgr client
before setting a new one.

In this time, we might re-use the nullptr connection due to tasks
that are being run in the background (See: dispatch_in_background).

To avoid this, we had multiple `if (!conn)` checks, some methods
even checked this condition twice to reduce the possibilty of using
undefined the connection.

Instead of introducing an additional check in Client::_send_report,
Introduce Client::send which would be responsible for:
a) Veryfing the connection is set
b) Trying to get a shared access to conn_lock

Client::reconnect will lock conn_lock exclusivly until the
connection is set. If we send is called while reconnecting,
sending will be dropped - same as before.

Fixes: https://tracker.ceph.com/issues/70179
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/mgr/client.cc
src/crimson/mgr/client.h

index 6e800cf12cbdb68a35fc4d324986780c10184ced..9a9bbbbea99de5c7877c269c6fb75633e71b016d 100644 (file)
@@ -4,6 +4,7 @@
 #include "client.h"
 
 #include <seastar/core/sleep.hh>
+#include <seastar/util/defer.hh>
 
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
@@ -51,6 +52,25 @@ seastar::future<> Client::stop()
   co_await gates.close_all();
 }
 
+seastar::future<> Client::send(MessageURef msg)
+{
+  LOG_PREFIX(Client::send);
+  DEBUGDPP("{}", *this, *msg);
+  if (!conn_lock.try_lock_shared()) {
+    WARNDPP("ongoing reconnect, report skipped", *this, *msg);
+    co_return;
+  }
+  auto unlocker = seastar::defer([this] {
+    conn_lock.unlock_shared();
+  });
+  if (!conn) {
+    WARNDPP("no conn available, report skipped", *this, *msg);
+    co_return;
+  }
+  DEBUGDPP("sending {}", *this, *msg);
+  co_await conn->send(std::move(msg));
+}
+
 std::optional<seastar::future<>>
 Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
@@ -89,7 +109,7 @@ void Client::ms_handle_connect(
       m->daemon_name = local_conf()->name.get_id();
       local_conf().get_config_bl(0, &m->config_bl, &last_config_bl_version);
       local_conf().get_defaults_bl(&m->config_defaults_bl);
-      return conn->send(std::move(m));
+      return send(std::move(m));
     } else {
       DEBUGDPP("connection changed", *this);
       return seastar::now();
@@ -117,6 +137,10 @@ seastar::future<> Client::reconnect()
 {
   LOG_PREFIX(Client::reconnect);
   DEBUGDPP("", *this);
+  co_await conn_lock.lock();
+  auto unlocker = seastar::defer([this] {
+    conn_lock.unlock();
+  });
   if (conn) {
     DEBUGDPP("marking down", *this);
     conn->mark_down();
@@ -185,17 +209,9 @@ void Client::report()
   _send_report();
   gates.dispatch_in_background(__func__, *this, [this, FNAME] {
     DEBUGDPP("dispatching in background", *this);
-    if (!conn) {
-      WARNDPP("no conn available; report skipped", *this);
-      return seastar::now();
-    }
     return with_stats.get_stats(
-    ).then([this, FNAME](auto &&pg_stats) {
-      if (!conn) {
-        WARNDPP("no conn available; before sending stats, report skipped", *this);
-        return seastar::now();
-      }
-      return conn->send(std::move(pg_stats));
+    ).then([this](auto &&pg_stats) {
+      return send(std::move(pg_stats));
     });
   });
 }
@@ -211,10 +227,6 @@ void Client::_send_report()
   DEBUGDPP("", *this);
   gates.dispatch_in_background(__func__, *this, [this, FNAME] {
     DEBUGDPP("dispatching in background", *this);
-    if (!conn) {
-      WARNDPP("cannot send report; no conn available", *this);
-      return seastar::now();
-    }
     auto report = make_message<MMgrReport>();
     // Adding empty information since we don't support perfcounters yet
     report->undeclare_types.emplace_back();
@@ -235,10 +247,10 @@ void Client::_send_report()
       return get_perf_report_cb(
       ).then([report=std::move(report), this](auto payload) mutable {
        report->metric_report_message = MetricReportMessage(std::move(payload));
-       return conn->send(std::move(report));
+       return send(std::move(report));
       });
     }
-    return conn->send(std::move(report));
+    return send(std::move(report));
   });
 }
 
index 001c59e19d9620c2695ac40364dfba526d22890a..aad1f820e7b295bc2f9662175da091c58162e005 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include <seastar/core/timer.hh>
+#include <seastar/core/shared_mutex.hh>
 
 #include "crimson/common/gated.h"
 #include "crimson/net/Dispatcher.h"
@@ -41,6 +42,7 @@ public:
         get_perf_report_cb_t cb_get);
   seastar::future<> start();
   seastar::future<> stop();
+  seastar::future<> send(MessageURef msg);
   void report();
   void update_daemon_health(std::vector<DaemonHealthMetric>&& metrics);
 
@@ -62,6 +64,7 @@ private:
   crimson::net::Messenger& msgr;
   WithStats& with_stats;
   crimson::net::ConnectionRef conn;
+  seastar::shared_mutex conn_lock;
   seastar::timer<seastar::lowres_clock> report_timer;
   crimson::common::gate_per_shard gates;
   uint64_t last_config_bl_version = 0;