]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: hold pipe_lock during important parts of accept()
authorSage Weil <sage@inktank.com>
Tue, 16 Jul 2013 20:13:46 +0000 (13:13 -0700)
committerSage Weil <sage@inktank.com>
Wed, 24 Jul 2013 23:20:37 +0000 (16:20 -0700)
Previously we did not bother with locking for accept() because we were
not visible to any other threads.  However, we need to close accepting
Pipes from mark_down_all(), which means we need to handle interference.

Fix up the locking so that we hold pipe_lock when looking at Pipe state
and verify that we are still in the ACCEPTING state any time we retake
the lock.

Signed-off-by: Sage Weil <sage@inktank.com>
(cherry picked from commit ecab4bb9513385bd765cca23e4e2fadb7ac4bac2)

src/msg/Pipe.cc

index 092abdd292c56686853fdfe527b8a351553c3f61..c47ac6b5dfd1191b1040931dd004e1d3af88f902 100644 (file)
@@ -211,12 +211,11 @@ void *Pipe::DelayedDelivery::entry()
 int Pipe::accept()
 {
   ldout(msgr->cct,10) << "accept" << dendl;
+  assert(pipe_lock.is_locked());
+  assert(state == STATE_ACCEPTING);
 
-  set_socket_options();
+  pipe_lock.Unlock();
 
-  // my creater gave me sd via accept()
-  assert(state == STATE_ACCEPTING);
-  
   // vars
   bufferlist addrs;
   entity_addr_t socket_addr;
@@ -243,6 +242,8 @@ int Pipe::accept()
   // used for reading in the remote acked seq on connect
   uint64_t newly_acked_seq = 0;
 
+  set_socket_options();
+
   // announce myself.
   r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
   if (r < 0) {
@@ -313,7 +314,6 @@ int Pipe::accept()
       goto fail_unlocked;
     }
 
-
     authorizer.clear();
     if (connect.authorizer_len) {
       bp = buffer::create(connect.authorizer_len);
@@ -330,8 +330,12 @@ int Pipe::accept()
             << dendl;
     
     msgr->lock.Lock();   // FIXME
+    pipe_lock.Lock();
     if (msgr->dispatch_queue.stop)
       goto shutting_down;
+    if (state != STATE_ACCEPTING) {
+      goto shutting_down;
+    }
 
     // note peer's type, flags
     set_peer_type(connect.host_type);
@@ -393,15 +397,18 @@ int Pipe::accept()
 
     ldout(msgr->cct,10) << "accept:  setting up session_security." << dendl;
 
+    pipe_lock.Unlock();
     msgr->lock.Lock();
+    pipe_lock.Lock();
     if (msgr->dispatch_queue.stop)
       goto shutting_down;
-
+    if (state != STATE_ACCEPTING)
+      goto shutting_down;
     
     // existing?
     existing = msgr->_lookup_pipe(peer_addr);
     if (existing) {
-      existing->pipe_lock.Lock();
+      existing->pipe_lock.Lock(true);  // skip lockdep check (we are locking a second Pipe here)
 
       if (connect.global_seq < existing->peer_global_seq) {
        ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -525,6 +532,8 @@ int Pipe::accept()
     assert(0);
 
   retry_session:
+    assert(existing->pipe_lock.is_locked());
+    assert(pipe_lock.is_locked());
     reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
     reply.connect_seq = existing->connect_seq + 1;
     existing->pipe_lock.Unlock();
@@ -532,8 +541,10 @@ int Pipe::accept()
     goto reply;    
 
   reply:
+    assert(pipe_lock.is_locked());
     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
     reply.authorizer_len = authorizer_reply.length();
+    pipe_lock.Unlock();
     r = tcp_write((char*)&reply, sizeof(reply));
     if (r < 0)
       goto fail_unlocked;
@@ -545,6 +556,8 @@ int Pipe::accept()
   }
   
  replace:
+  assert(existing->pipe_lock.is_locked());
+  assert(pipe_lock.is_locked());
   if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
     reply_tag = CEPH_MSGR_TAG_SEQ;
     existing_seq = existing->in_seq;
@@ -588,6 +601,7 @@ int Pipe::accept()
 
  open:
   // open
+  assert(pipe_lock.is_locked());
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   assert(state == STATE_ACCEPTING);
@@ -621,6 +635,7 @@ int Pipe::accept()
   assert(removed == 1);
   register_pipe();
   msgr->lock.Unlock();
+  pipe_lock.Unlock();
 
   r = tcp_write((char*)&reply, sizeof(reply));
   if (r < 0) {
@@ -652,7 +667,6 @@ int Pipe::accept()
     start_writer();
   }
   ldout(msgr->cct,20) << "accept done" << dendl;
-  pipe_lock.Unlock();
 
   maybe_start_delay_thread();
 
@@ -685,10 +699,10 @@ int Pipe::accept()
     if (queued || replaced)
       start_writer();
   }
-  pipe_lock.Unlock();
   return -1;
 
  shutting_down:
+  assert(pipe_lock.is_locked());
   msgr->lock.Unlock();
 
   if (msgr->cct->_conf->ms_inject_internal_delays) {
@@ -698,11 +712,9 @@ int Pipe::accept()
     t.sleep();
   }
 
-  pipe_lock.Lock();
   state = STATE_CLOSED;
   state_closed.set(1);
   fault();
-  pipe_lock.Unlock();
   return -1;
 }
 
@@ -1304,11 +1316,13 @@ void Pipe::stop()
  */
 void Pipe::reader()
 {
-  if (state == STATE_ACCEPTING) 
-    accept();
-
   pipe_lock.Lock();
 
+  if (state == STATE_ACCEPTING) {
+    accept();
+    assert(pipe_lock.is_locked());
+  }
+
   // loop.
   while (state != STATE_CLOSED &&
         state != STATE_CONNECTING) {