state(st),
connection_state(NULL),
reader_running(false), reader_needs_join(false),
- reader_dispatching(false),
+ reader_dispatching(false), notify_on_dispatch_done(false),
writer_running(false),
in_q(&(r->dispatch_queue)),
send_keepalive(false),
ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
+ retry_existing_lookup:
msgr->lock.Lock();
pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
existing = msgr->_lookup_pipe(peer_addr);
if (existing) {
existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
+ if (existing->reader_dispatching) {
+ /** we need to wait, or we can deadlock if downstream
+ * fast_dispatchers are (naughtily!) waiting on resources
+ * held by somebody trying to make use of the SimpleMessenger lock.
+ * So drop locks, wait, and retry. It just looks like a slow network
+ * to everybody else.
+ */
+ pipe_lock.Unlock();
+ msgr->lock.Unlock();
+ existing->notify_on_dispatch_done = true;
+ while (existing->reader_dispatching)
+ existing->cond.Wait(existing->pipe_lock);
+ existing->pipe_lock.Unlock();
+ goto retry_existing_lookup;
+ }
if (connect.global_seq < existing->peer_global_seq) {
ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
in_q->fast_dispatch(m);
pipe_lock.Lock();
reader_dispatching = false;
- if (state == STATE_CLOSED) // there might be somebody waiting
+ if (state == STATE_CLOSED ||
+ notify_on_dispatch_done) { // there might be somebody waiting
+ notify_on_dispatch_done = false;
cond.Signal();
+ }
} else {
in_q->enqueue(m, m->get_priority(), conn_id);
}