]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: reset the TX session handler in proper EventCenter.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 2 Oct 2019 11:06:25 +0000 (13:06 +0200)
committerNathan Cutler <ncutler@suse.com>
Mon, 27 Jan 2020 18:02:13 +0000 (19:02 +0100)
Fixes: https://tracker.ceph.com/issues/42026
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
(cherry picked from commit 8a86719b062cd3b4b6c64a1c027564c092600117)

src/msg/async/ProtocolV2.cc

index 678516f9f963878b8c82c1add6c88e55387ad571..94f9b134640cc9aaa059bc909b1d6b219e57d543 100644 (file)
@@ -221,12 +221,27 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   return count;
 }
 
+// it's expected the `write_lock` is held while calling this method.
 void ProtocolV2::reset_recv_state() {
-  auth_meta.reset(new AuthConnectionMeta);
-  session_stream_handlers.tx.reset(nullptr);
-  session_stream_handlers.rx.reset(nullptr);
-  pre_auth.txbuf.clear();
-  pre_auth.rxbuf.clear();
+  ldout(cct, 5) << __func__ << dendl;
+
+  // execute in the same thread that uses the rx/tx handlers. We need
+  // to do the warp because holding `write_lock` is not enough as
+  // `write_event()` unlocks it just before calling `write_message()`.
+  // `submit_to()` here is NOT blocking.
+  connection->center->submit_to(connection->center->get_id(), [this] {
+    ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl;
+
+    // Possibly unnecessary. See the comment in `deactivate_existing`.
+    std::lock_guard<std::mutex> l(connection->lock);
+    std::lock_guard<std::mutex> wl(connection->write_lock);
+
+    auth_meta.reset(new AuthConnectionMeta);
+    session_stream_handlers.rx.reset(nullptr);
+    session_stream_handlers.tx.reset(nullptr);
+    pre_auth.rxbuf.clear();
+    pre_auth.txbuf.clear();
+  }, /* nowait = */true);
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
@@ -2658,16 +2673,19 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   }
   exproto->peer_global_seq = peer_global_seq;
 
+  ceph_assert(connection->center->in_thread());
   auto temp_cs = std::move(connection->cs);
   EventCenter *new_center = connection->center;
   Worker *new_worker = connection->worker;
+  // we can steal the session_stream_handlers under the assumption
+  // this happens in the event center's thread as there should be
+  // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+  auto temp_stream_handlers = std::move(session_stream_handlers);
+  exproto->auth_meta = auth_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
 
-  std::swap(exproto->session_stream_handlers, session_stream_handlers);
-  exproto->auth_meta = auth_meta;
-
   // avoid _stop shutdown replacing socket
   // queue a reset on the new connection, which we're dumping for the old
   stop();
@@ -2688,14 +2706,25 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   ceph_assert(connection->recv_start == connection->recv_end);
 
   auto deactivate_existing = std::bind(
-      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+      [ existing,
+        new_worker,
+        new_center,
+        exproto,
+        temp_stream_handlers=std::move(temp_stream_handlers)
+      ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
           std::lock_guard<std::mutex> l(existing->lock);
           existing->write_lock.lock();
           exproto->requeue_sent();
+          // XXX: do we really need the locking for `outgoing_bl`? There is
+          // a comment just above its definition saying "lockfree, only used
+          // in own thread". I'm following lockfull schema just in the case.
+          // From performance point of view it should be fine – this happens
+          // far away from hot paths.
           existing->outgoing_bl.clear();
           existing->open_write = false;
+          exproto->session_stream_handlers = std::move(temp_stream_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();