]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: perform recv reset immediately if called inside EC. 33820/head
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 5 Mar 2020 00:12:40 +0000 (01:12 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 9 Mar 2020 11:42:39 +0000 (12:42 +0100)
Fixes: https://tracker.ceph.com/issues/43808
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
(cherry picked from commit c4e97bacf7f8606bcab67496d83ac92ba1237eca)

Conflicts:
        src/msg/async/ProtocolV1.cc (removing the dependency
          on c58c5754dfd26eef359cad8b29e825120af59c64).

src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV1.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 42eb2d6eaed28591d9ac1d19a7651b34e4447444..e7c6295c14f2f29ade8796c45a8e14e3e0a4a74d 100644 (file)
@@ -1240,6 +1240,19 @@ void ProtocolV1::discard_out_queue() {
   write_in_progress = false;
 }
 
+void ProtocolV1::reset_security()
+{
+  ldout(cct, 5) << __func__ << dendl;
+
+  // clean up state internal variables and states
+  if (state == CONNECTING_SEND_CONNECT_MSG) {
+    if (authorizer) {
+      delete authorizer;
+    }
+    authorizer = nullptr;
+  }
+}
+
 void ProtocolV1::reset_recv_state() {
   ldout(cct, 5) << __func__ << dendl;
 
@@ -1247,21 +1260,18 @@ void ProtocolV1::reset_recv_state() {
   // We need to do the warp because holding `write_lock` is not
   // enough as `write_event()` releases it just before calling
   // `write_message()`. `submit_to()` here is NOT blocking.
-  connection->center->submit_to(connection->center->get_id(), [this] {
-    ldout(cct, 5) << "reset_recv_state reseting security handlers" << dendl;
-
-    // Possibly unnecessary. See the comment in `deactivate_existing`.
-    std::lock_guard<std::mutex> l(connection->lock);
-    std::lock_guard<std::mutex> wl(connection->write_lock);
-
-    // clean up state internal variables and states
-    if (state == CONNECTING_SEND_CONNECT_MSG) {
-      if (authorizer) {
-        delete authorizer;
-      }
-      authorizer = nullptr;
-    }
-  }, /* nowait = */true);
+  if (!connection->center->in_thread()) {
+    connection->center->submit_to(connection->center->get_id(), [this] {
+      ldout(cct, 5) << "reset_recv_state (warped) reseting security handlers"
+                    << dendl;
+      // Possibly unnecessary. See the comment in `deactivate_existing`.
+      std::lock_guard<std::mutex> l(connection->lock);
+      std::lock_guard<std::mutex> wl(connection->write_lock);
+      reset_security();
+    }, /* nowait = */true);
+  } else {
+    reset_security();
+  }
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
index b332b5ff7bf47d64374c900efa1bf5389df7d554..070ce73f739bce0604e633cd1ca7f14c03810ba8 100644 (file)
@@ -205,6 +205,7 @@ protected:
   void discard_out_queue();
 
   void reset_recv_state();
+  void reset_security();
 
   ostream &_conn_prefix(std::ostream *_dout);
 
index 94f9b134640cc9aaa059bc909b1d6b219e57d543..2d0532a1a3f5733f601df56aaea90c02751742e9 100644 (file)
@@ -221,27 +221,36 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   return count;
 }
 
+void ProtocolV2::reset_security() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  auth_meta.reset(new AuthConnectionMeta);
+  session_stream_handlers.rx.reset(nullptr);
+  session_stream_handlers.tx.reset(nullptr);
+  pre_auth.rxbuf.clear();
+  pre_auth.txbuf.clear();
+}
+
 // it's expected the `write_lock` is held while calling this method.
 void ProtocolV2::reset_recv_state() {
   ldout(cct, 5) << __func__ << dendl;
 
-  // execute in the same thread that uses the rx/tx handlers. We need
-  // to do the warp because holding `write_lock` is not enough as
-  // `write_event()` unlocks it just before calling `write_message()`.
-  // `submit_to()` here is NOT blocking.
-  connection->center->submit_to(connection->center->get_id(), [this] {
-    ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl;
-
-    // Possibly unnecessary. See the comment in `deactivate_existing`.
-    std::lock_guard<std::mutex> l(connection->lock);
-    std::lock_guard<std::mutex> wl(connection->write_lock);
-
-    auth_meta.reset(new AuthConnectionMeta);
-    session_stream_handlers.rx.reset(nullptr);
-    session_stream_handlers.tx.reset(nullptr);
-    pre_auth.rxbuf.clear();
-    pre_auth.txbuf.clear();
-  }, /* nowait = */true);
+  if (!connection->center->in_thread()) {
+    // execute in the same thread that uses the rx/tx handlers. We need
+    // to do the warp because holding `write_lock` is not enough as
+    // `write_event()` unlocks it just before calling `write_message()`.
+    // `submit_to()` here is NOT blocking.
+    connection->center->submit_to(connection->center->get_id(), [this] {
+      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+                    << dendl;
+      // Possibly unnecessary. See the comment in `deactivate_existing`.
+      std::lock_guard<std::mutex> l(connection->lock);
+      std::lock_guard<std::mutex> wl(connection->write_lock);
+      reset_security();
+    }, /* nowait = */true);
+  } else {
+    reset_security();
+  }
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
index 69bff1b90dd4986ea46d55094e619d5ced2d0b71..5c9836815be815baa2041a1288fc60b44643f45a 100644 (file)
@@ -132,6 +132,7 @@ private:
   void requeue_sent();
   uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
   void reset_recv_state();
+  void reset_security();
   void reset_throttle();
   Ct<ProtocolV2> *_fault();
   void discard_out_queue();