return count;
}
+// it's expected the `write_lock` is held while calling this method.
void ProtocolV2::reset_recv_state() {
- auth_meta.reset(new AuthConnectionMeta);
- session_stream_handlers.tx.reset(nullptr);
- session_stream_handlers.rx.reset(nullptr);
- pre_auth.txbuf.clear();
- pre_auth.rxbuf.clear();
+ ldout(cct, 5) << __func__ << dendl;
+
+ // execute in the same thread that uses the rx/tx handlers. We need
+ // to do the warp because holding `write_lock` is not enough as
+ // `write_event()` unlocks it just before calling `write_message()`.
+ // `submit_to()` here is NOT blocking.
+ connection->center->submit_to(connection->center->get_id(), [this] {
+ ldout(cct, 5) << "reset_recv_state reseting crypto handlers" << dendl;
+
+ // Possibly unnecessary. See the comment in `deactivate_existing`.
+ std::lock_guard<std::mutex> l(connection->lock);
+ std::lock_guard<std::mutex> wl(connection->write_lock);
+
+ auth_meta.reset(new AuthConnectionMeta);
+ session_stream_handlers.rx.reset(nullptr);
+ session_stream_handlers.tx.reset(nullptr);
+ pre_auth.rxbuf.clear();
+ pre_auth.txbuf.clear();
+ }, /* nowait = */true);
// clean read and write callbacks
connection->pendingReadLen.reset();
}
exproto->peer_global_seq = peer_global_seq;
+ ceph_assert(connection->center->in_thread());
auto temp_cs = std::move(connection->cs);
EventCenter *new_center = connection->center;
Worker *new_worker = connection->worker;
+ // we can steal the session_stream_handlers under the assumption
+ // this happens in the event center's thread as there should be
+ // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+ auto temp_stream_handlers = std::move(session_stream_handlers);
+ exproto->auth_meta = auth_meta;
ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
<< dendl;
- std::swap(exproto->session_stream_handlers, session_stream_handlers);
- exproto->auth_meta = auth_meta;
-
// avoid _stop shutdown replacing socket
// queue a reset on the new connection, which we're dumping for the old
stop();
ceph_assert(connection->recv_start == connection->recv_end);
auto deactivate_existing = std::bind(
- [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+ [ existing,
+ new_worker,
+ new_center,
+ exproto,
+ temp_stream_handlers=std::move(temp_stream_handlers)
+ ](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
existing->write_lock.lock();
exproto->requeue_sent();
+ // XXX: do we really need the locking for `outgoing_bl`? There is
+ // a comment just above its definition saying "lockfree, only used
+ // in own thread". I'm following lockfull schema just in the case.
+ // From performance point of view it should be fine – this happens
+ // far away from hot paths.
existing->outgoing_bl.clear();
existing->open_write = false;
+ exproto->session_stream_handlers = std::move(temp_stream_handlers);
existing->write_lock.unlock();
if (exproto->state == NONE) {
existing->shutdown_socket();