state(st),
session_security(NULL),
connection_state(NULL),
- reader_running(false), reader_joining(false), writer_running(false),
+ reader_running(false), reader_needs_join(false), reader_joining(false), writer_running(false),
in_q(&(r->dispatch_queue)),
keepalive(false),
close_on_empty(false),
{
assert(pipe_lock.is_locked());
assert(!reader_running);
+ if (reader_needs_join) {
+ reader_thread.join();
+ reader_needs_join = false;
+ }
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
}
pipe_lock.Lock();
assert(reader_joining);
reader_joining = false;
+ reader_needs_join = false;
}
// reap?
reader_running = false;
+ reader_needs_join = true;
unlock_maybe_reap();
ldout(msgr->cct,10) << "reader done" << dendl;
}
utime_t backoff; // backoff time
- bool reader_running, reader_joining;
+ bool reader_running, reader_needs_join, reader_joining;
bool writer_running;
map<int, list<Message*> > out_q; // priority queue for outbound msgs