]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: more conservative locking, thread join asserts
authorSage Weil <sage@newdream.net>
Fri, 12 Feb 2010 21:38:38 +0000 (13:38 -0800)
committerSage Weil <sage@newdream.net>
Fri, 12 Feb 2010 21:38:38 +0000 (13:38 -0800)
We caught a bunch of crashes like this:

10.02.11 17:01:01.600660 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=36 pgs=2409 cs=1 l=0).do_sendmsg error Broken pipe
10.02.11 17:01:01.600700 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=36 pgs=2409 cs=1 l=0).writer error sending 0x7fc27da1c570, 32: Broken pipe
10.02.11 17:01:01.600796 7f87070c3950 -- 10.3.14.134:6800/8203 >> 10.3.14.130:6800/18914 pipe(0x7fc2be2cebe0 sd=-1 pgs=2409 cs=1 l=0).fault initiating reconnect
...
./common/Thread.h: In function 'int Thread::join(void**)':
./common/Thread.h:66: FAILED assert(0)
 1: (Thread::join(void**)+0x73) [0x64fcd3]
 2: (SimpleMessenger::Pipe::join_reader()+0x68) [0x6555a2]
 3: (SimpleMessenger::Pipe::connect()+0xf5) [0x645be9]
 4: (SimpleMessenger::Pipe::writer()+0x157) [0x64793d]
 5: (SimpleMessenger::Pipe::Writer::entry()+0x19) [0x63e107]
 6: (Thread::_entry_func(void*)+0x20) [0x64e816]
 7: /lib/libpthread.so.0 [0x7fc2c3bbdfc7]
 8: (clone()+0x6d) [0x7fc2c2e005ad]

that look a bit like multiple procs were racing into
join_reader().  Add an assert to catch that if it happens again,
and also wrap thread starts in pipe_lock to ensure we keep the
_running flags in sync with reality.  Add in a few other
sanity checks too.

src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 9920a6ff69047f6e8bb08ef562180644acd6819b..5cafcb157d5d6f6925b6d19770ebf66c49ad7cbe 100644 (file)
@@ -213,7 +213,9 @@ void *SimpleMessenger::Accepter::entry()
       if (!messenger->destination_stopped) {
        Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING);
        p->sd = sd;
+       p->pipe_lock.Lock();
        p->start_reader();
+       p->pipe_lock.Unlock();
        messenger->pipes.insert(p);
       }
       messenger->lock.Unlock();
@@ -2165,10 +2167,12 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
   
   // create pipe
   Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
+  pipe->pipe_lock.Lock();
   pipe->set_peer_type(type);
   pipe->set_peer_addr(addr);
   pipe->policy = get_policy(type);
   pipe->start_writer();
+  pipe->pipe_lock.Unlock();
   pipe->register_pipe();
   pipes.insert(pipe);
 
index fb046de81ef7db8b47bfb0b8ce3d44b9b4b04767..ebdf34859270f2eb43eafa5b724941135a881f39 100644 (file)
@@ -119,7 +119,7 @@ private:
 
     utime_t backoff;         // backoff time
 
-    bool reader_running;
+    bool reader_running, reader_joining;
     bool writer_running;
 
     map<int, list<Message*> > out_q;  // priority queue for outbound msgs
@@ -175,7 +175,7 @@ private:
       pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
       state(st), 
       connection_state(new Connection),
-      reader_running(false), writer_running(false),
+      reader_running(false), reader_joining(false), writer_running(false),
       in_qlen(0), keepalive(false),
       connect_seq(0), peer_global_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
@@ -195,21 +195,29 @@ private:
 
 
     void start_reader() {
+      assert(pipe_lock.is_locked());
+      assert(!reader_running);
       reader_running = true;
       reader_thread.create();
     }
     void start_writer() {
+      assert(pipe_lock.is_locked());
+      assert(!writer_running);
       writer_running = true;
       writer_thread.create();
     }
     void join_reader() {
       if (!reader_running)
        return;
+      assert(!reader_joining);
+      reader_joining = true;
       cond.Signal();
       reader_thread.kill(SIGUSR2);
       pipe_lock.Unlock();
       reader_thread.join();
       pipe_lock.Lock();
+      assert(reader_joining);
+      reader_joining = false;
     }
 
     // public constructors
@@ -277,8 +285,10 @@ private:
     void register_pipe();
     void unregister_pipe();
     void join() {
-      if (writer_thread.is_started()) writer_thread.join();
-      if (reader_thread.is_started()) reader_thread.join();
+      if (writer_thread.is_started())
+       writer_thread.join();
+      if (reader_thread.is_started())
+       reader_thread.join();
     }
     void stop();