Add a stop_and_wait() function that, in addition to closing the Pipe and killing
its socket, waits for any fast_dispatch call which is in-progress. Use this in
several parts of the Pipe and SimpleMessenger code where appropriate.
This fixes several races with fast_dispatch and other avenues; here are two:
1) It could be that we grab the lock while the existing pipe is fast_dispatching
and then proceed to dispatch messages ourself, beating it. Instead, wait for
the other pipe. Add a "reader_dispatching" member which tells bus this is
happening, and when re-locking, signal the cond if we're shutting down.
2) It could be that a normally-dispatched Message in the OSD triggers a
mark_down() on the Connection and then clears out the Session
(Connection::priv) pointer, causing a racing fast_dispatch()'ed function to
assert out in the OSD because it requires a valid Session.
Signed-off-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
state(st),
connection_state(NULL),
reader_running(false), reader_needs_join(false),
+ reader_dispatching(false),
writer_running(false),
in_q(&(r->dispatch_queue)),
send_keepalive(false),
++p)
out_q[p->first].splice(out_q[p->first].begin(), p->second);
}
+ existing->stop_and_wait();
existing->pipe_lock.Unlock();
open:
shutdown_socket();
}
+void Pipe::stop_and_wait()
+{
+ if (state != STATE_CLOSED)
+ stop();
+
+ while (reader_running &&
+ reader_dispatching)
+ cond.Wait(pipe_lock);
+}
/* read msgs from socket.
* also, server.
delay_thread->queue(release, m);
} else {
if (in_q->can_fast_dispatch(m)) {
+ reader_dispatching = true;
pipe_lock.Unlock();
in_q->fast_dispatch(m);
pipe_lock.Lock();
+ reader_dispatching = false;
+ if (state == STATE_CLOSED) // there might be somebody waiting
+ cond.Signal();
} else {
in_q->enqueue(m, m->get_priority(), conn_id);
}
utime_t backoff; // backoff time
bool reader_running, reader_needs_join;
+ bool reader_dispatching; /// reader thread is dispatching without pipe_lock
bool writer_running;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
void register_pipe();
void unregister_pipe();
void join();
+ /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
void stop();
+ /// stop() a Pipe if not already done, and wait for it to finish any
+ /// fast_dispatch in progress.
+ void stop_and_wait();
void _send(Message *m) {
assert(pipe_lock.is_locked());
Pipe *p = rank_pipe.begin()->second;
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop();
+ p->stop_and_wait();
p->pipe_lock.Unlock();
}
Pipe *p = *q;
ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
p->pipe_lock.Lock();
- p->stop();
+ p->stop_and_wait();
ConnectionRef con = p->connection_state;
if (con && con->clear_pipe(p))
dispatch_queue.queue_reset(con.get());
rank_pipe.erase(it);
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop();
+ p->stop_and_wait();
ConnectionRef con = p->connection_state;
if (con && con->clear_pipe(p))
dispatch_queue.queue_reset(con.get());
ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop();
+ p->stop_and_wait();
if (p->connection_state) {
// generate a reset event for the caller in this case, even
// though they asked for it, since this is the addr-based (and
assert(p->msgr == this);
p->unregister_pipe();
p->pipe_lock.Lock();
- p->stop();
+ p->stop_and_wait();
if (p->connection_state) {
// do not generate a reset event for the caller in this case,
// since they asked for it.
p->unregister_pipe();
if (p->out_q.empty()) {
ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl;
- p->stop();
+ p->stop_and_wait();
} else {
ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl;
p->close_on_empty = true;