]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: reset the TX session handler in proper EventCenter.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 2 Oct 2019 11:06:25 +0000 (13:06 +0200)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 31 Oct 2019 23:44:53 +0000 (00:44 +0100)
Fixes: https://tracker.ceph.com/issues/42026
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc

index 446772f3f4c24c7092eb26497b4faccaed090c70..fe2158289a50b2e623de83f52d6bf58ea8af28c8 100644 (file)
@@ -222,12 +222,27 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   return count;
 }
 
+// it's expected the `write_lock` is held while calling this method.
 void ProtocolV2::reset_recv_state() {
-  auth_meta.reset(new AuthConnectionMeta);
-  session_stream_handlers.tx.reset(nullptr);
-  session_stream_handlers.rx.reset(nullptr);
-  pre_auth.txbuf.clear();
-  pre_auth.rxbuf.clear();
+  ldout(cct, 5) << __func__ << dendl;
+
+  // execute in the same thread that uses the rx/tx handlers. We need
+  // to do the warp because holding `write_lock` is not enough as
+  // `write_event()` unlocks it just before calling `write_message()`.
+  // `submit_to()` here is NOT blocking.
+  connection->center->submit_to(connection->center->get_id(), [this] {
+    ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl;
+
+    // Possibly unnecessary. See the comment in `deactivate_existing`.
+    std::lock_guard<std::mutex> l(connection->lock);
+    std::lock_guard<std::mutex> wl(connection->write_lock);
+
+    auth_meta.reset(new AuthConnectionMeta);
+    session_stream_handlers.rx.reset(nullptr);
+    session_stream_handlers.tx.reset(nullptr);
+    pre_auth.rxbuf.clear();
+    pre_auth.txbuf.clear();
+  }, /* nowait = */true);
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
@@ -2681,16 +2696,19 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   }
   exproto->peer_global_seq = peer_global_seq;
 
+  ceph_assert(connection->center->in_thread());
   auto temp_cs = std::move(connection->cs);
   EventCenter *new_center = connection->center;
   Worker *new_worker = connection->worker;
+  // we can steal the session_stream_handlers under the assumption
+  // this happens in the event center's thread as there should be
+  // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+  auto temp_stream_handlers = std::move(session_stream_handlers);
+  exproto->auth_meta = auth_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
 
-  std::swap(exproto->session_stream_handlers, session_stream_handlers);
-  exproto->auth_meta = auth_meta;
-
   // avoid _stop shutdown replacing socket
   // queue a reset on the new connection, which we're dumping for the old
   stop();
@@ -2711,14 +2729,25 @@ CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
   ceph_assert(connection->recv_start == connection->recv_end);
 
   auto deactivate_existing = std::bind(
-      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+      [ existing,
+        new_worker,
+        new_center,
+        exproto,
+        temp_stream_handlers=std::move(temp_stream_handlers)
+      ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
           std::lock_guard<std::mutex> l(existing->lock);
           existing->write_lock.lock();
           exproto->requeue_sent();
+          // XXX: do we really need the locking for `outgoing_bl`? There is
+          // a comment just above its definition saying "lockfree, only used
+          // in own thread". I'm following lockfull schema just in the case.
+          // From performance point of view it should be fine – this happens
+          // far away from hot paths.
           existing->outgoing_bl.clear();
           existing->open_write = false;
+          exproto->session_stream_handlers = std::move(temp_stream_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();