assert(recv_start == recv_end);
existing->write_lock.Unlock();
- if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
- // handle error
- ldout(async_msgr->cct, 0) << __func__ << " reply fault for existing connection." << dendl;
- existing->fault();
- }
ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl;
_stop();
// queue a reset on the new connection, which we're dumping for the old
dispatch_queue->queue_reset(this);
existing->lock.Unlock();
+
+ EventCenter::submit_to(existing->center->get_id(), [existing, connect, reply, authorizer_reply]() mutable {
+ Mutex::Locker l(existing->lock);
+ if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) {
+ // handle error
+ existing->fault();
+ }
+ }, true);
+
return 0;
}
existing->lock.Unlock();