From 10ae652f0325a19e435685704f90d0b3327abcc6 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 12 Feb 2010 13:38:38 -0800 Subject: [PATCH] msgr: more conservative locking, thread join asserts We caught a bunch of crashes like this: 10.02.11 17:01:01.600660 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=36 pgs=2409 cs=1 l=0).do_sendmsg error Broken pipe 10.02.11 17:01:01.600700 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=36 pgs=2409 cs=1 l=0).writer error sending 0x7fc27da1c570, 32: Broken pipe 10.02.11 17:01:01.600796 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=-1 pgs=2409 cs=1 l=0).fault initiating reconnect ... ./common/Thread.h: In function 'int Thread::join(void**)': ./common/Thread.h:66: FAILED assert(0) 1: (Thread::join(void**)+0x73) [0x64fcd3] 2: (SimpleMessenger::Pipe::join_reader()+0x68) [0x6555a2] 3: (SimpleMessenger::Pipe::connect()+0xf5) [0x645be9] 4: (SimpleMessenger::Pipe::writer()+0x157) [0x64793d] 5: (SimpleMessenger::Pipe::Writer::entry()+0x19) [0x63e107] 6: (Thread::_entry_func(void*)+0x20) [0x64e816] 7: /lib/libpthread.so.0 [0x7fc2c3bbdfc7] 8: (clone()+0x6d) [0x7fc2c2e005ad] that look a bit like multiple procs were racing into join_reader(). Add an assert to catch that if it happens again, and also wrap thread starts in pipe_lock to ensure we keep the _running flags in sync with reality. Add in a few other sanity checks too. --- src/msg/SimpleMessenger.cc | 4 ++++ src/msg/SimpleMessenger.h | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 9920a6ff69047..5cafcb157d5d6 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -213,7 +213,9 @@ void *SimpleMessenger::Accepter::entry() if (!messenger->destination_stopped) { Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING); p->sd = sd; + p->pipe_lock.Lock(); p->start_reader(); + p->pipe_lock.Unlock(); messenger->pipes.insert(p); } messenger->lock.Unlock(); @@ -2165,10 +2167,12 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, // create pipe Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); + pipe->pipe_lock.Lock(); pipe->set_peer_type(type); pipe->set_peer_addr(addr); pipe->policy = get_policy(type); pipe->start_writer(); + pipe->pipe_lock.Unlock(); pipe->register_pipe(); pipes.insert(pipe); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index fb046de81ef7d..ebdf34859270f 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -119,7 +119,7 @@ private: utime_t backoff; // backoff time - bool reader_running; + bool reader_running, reader_joining; bool writer_running; map > out_q; // priority queue for outbound msgs @@ -175,7 +175,7 @@ private: pipe_lock("SimpleMessenger::Pipe::pipe_lock"), state(st), connection_state(new Connection), - reader_running(false), writer_running(false), + reader_running(false), reader_joining(false), writer_running(false), in_qlen(0), keepalive(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), @@ -195,21 +195,29 @@ private: void start_reader() { + assert(pipe_lock.is_locked()); + assert(!reader_running); reader_running = true; reader_thread.create(); } void start_writer() { + assert(pipe_lock.is_locked()); + assert(!writer_running); writer_running = true; writer_thread.create(); } void join_reader() { if (!reader_running) return; + assert(!reader_joining); + reader_joining = true; cond.Signal(); reader_thread.kill(SIGUSR2); pipe_lock.Unlock(); reader_thread.join(); pipe_lock.Lock(); + assert(reader_joining); + reader_joining = false; } // public constructors @@ -277,8 +285,10 @@ private: void register_pipe(); void unregister_pipe(); void join() { - if (writer_thread.is_started()) writer_thread.join(); - if (reader_thread.is_started()) reader_thread.join(); + if (writer_thread.is_started()) + writer_thread.join(); + if (reader_thread.is_started()) + reader_thread.join(); } void stop(); -- 2.39.5