if (!messenger->destination_stopped) {
Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING);
p->sd = sd;
+ p->pipe_lock.Lock();
p->start_reader();
+ p->pipe_lock.Unlock();
messenger->pipes.insert(p);
}
messenger->lock.Unlock();
// create pipe
Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
+ pipe->pipe_lock.Lock();
pipe->set_peer_type(type);
pipe->set_peer_addr(addr);
pipe->policy = get_policy(type);
pipe->start_writer();
+ pipe->pipe_lock.Unlock();
pipe->register_pipe();
pipes.insert(pipe);
utime_t backoff; // backoff time
- bool reader_running;
+ bool reader_running, reader_joining;
bool writer_running;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
state(st),
connection_state(new Connection),
- reader_running(false), writer_running(false),
+ reader_running(false), reader_joining(false), writer_running(false),
in_qlen(0), keepalive(false),
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0),
void start_reader() {
+ assert(pipe_lock.is_locked());
+ assert(!reader_running);
reader_running = true;
reader_thread.create();
}
void start_writer() {
+ assert(pipe_lock.is_locked());
+ assert(!writer_running);
writer_running = true;
writer_thread.create();
}
void join_reader() {
if (!reader_running)
return;
+ assert(!reader_joining);
+ reader_joining = true;
cond.Signal();
reader_thread.kill(SIGUSR2);
pipe_lock.Unlock();
reader_thread.join();
pipe_lock.Lock();
+ assert(reader_joining);
+ reader_joining = false;
}
// public constructors
void register_pipe();
void unregister_pipe();
void join() {
- if (writer_thread.is_started()) writer_thread.join();
- if (reader_thread.is_started()) reader_thread.join();
+ if (writer_thread.is_started())
+ writer_thread.join();
+ if (reader_thread.is_started())
+ reader_thread.join();
}
void stop();