]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: hold pipe_lock during important parts of accept()
authorSage Weil <sage@inktank.com>
Tue, 16 Jul 2013 20:13:46 +0000 (13:13 -0700)
committerSage Weil <sage@inktank.com>
Wed, 17 Jul 2013 21:39:04 +0000 (14:39 -0700)
Previously we did not bother with locking for accept() because we were
not visible to any other threads.  However, we need to close accepting
Pipes from mark_down_all(), which means we need to handle interference.

Fix up the locking so that we hold pipe_lock when looking at Pipe state
and verify that we are still in the ACCEPTING state any time we retake
the lock.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Pipe.cc

index 22114f48e62fc594cf14452922a60c41df12ce7f..3fba9a7e41ba51fd3dae46437fc63967cf014713 100644 (file)
@@ -210,12 +210,11 @@ void *Pipe::DelayedDelivery::entry()
 int Pipe::accept()
 {
   ldout(msgr->cct,10) << "accept" << dendl;
+  assert(pipe_lock.is_locked());
+  assert(state == STATE_ACCEPTING);
 
-  set_socket_options();
+  pipe_lock.Unlock();
 
-  // my creater gave me sd via accept()
-  assert(state == STATE_ACCEPTING);
-  
   // vars
   bufferlist addrs;
   entity_addr_t socket_addr;
@@ -241,6 +240,8 @@ int Pipe::accept()
   // used for reading in the remote acked seq on connect
   uint64_t newly_acked_seq = 0;
 
+  set_socket_options();
+
   // announce myself.
   r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
   if (r < 0) {
@@ -311,7 +312,6 @@ int Pipe::accept()
       goto fail_unlocked;
     }
 
-
     authorizer.clear();
     if (connect.authorizer_len) {
       bp = buffer::create(connect.authorizer_len);
@@ -328,8 +328,12 @@ int Pipe::accept()
             << dendl;
     
     msgr->lock.Lock();   // FIXME
+    pipe_lock.Lock();
     if (msgr->dispatch_queue.stop)
       goto shutting_down;
+    if (state != STATE_ACCEPTING) {
+      goto shutting_down;
+    }
 
     // note peer's type, flags
     set_peer_type(connect.host_type);
@@ -391,15 +395,18 @@ int Pipe::accept()
 
     ldout(msgr->cct,10) << "accept:  setting up session_security." << dendl;
 
+    pipe_lock.Unlock();
     msgr->lock.Lock();
+    pipe_lock.Lock();
     if (msgr->dispatch_queue.stop)
       goto shutting_down;
-
+    if (state != STATE_ACCEPTING)
+      goto shutting_down;
     
     // existing?
     existing = msgr->_lookup_pipe(peer_addr);
     if (existing) {
-      existing->pipe_lock.Lock();
+      existing->pipe_lock.Lock(true);  // skip lockdep check (we are locking a second Pipe here)
 
       if (connect.global_seq < existing->peer_global_seq) {
        ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -523,6 +530,8 @@ int Pipe::accept()
     assert(0);
 
   retry_session:
+    assert(existing->pipe_lock.is_locked());
+    assert(pipe_lock.is_locked());
     reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
     reply.connect_seq = existing->connect_seq + 1;
     existing->pipe_lock.Unlock();
@@ -530,8 +539,10 @@ int Pipe::accept()
     goto reply;    
 
   reply:
+    assert(pipe_lock.is_locked());
     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
     reply.authorizer_len = authorizer_reply.length();
+    pipe_lock.Unlock();
     r = tcp_write((char*)&reply, sizeof(reply));
     if (r < 0)
       goto fail_unlocked;
@@ -543,6 +554,8 @@ int Pipe::accept()
   }
   
  replace:
+  assert(existing->pipe_lock.is_locked());
+  assert(pipe_lock.is_locked());
   if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
     reply_tag = CEPH_MSGR_TAG_SEQ;
     existing_seq = existing->in_seq;
@@ -593,6 +606,7 @@ int Pipe::accept()
 
  open:
   // open
+  assert(pipe_lock.is_locked());
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   assert(state == STATE_ACCEPTING);
@@ -626,6 +640,7 @@ int Pipe::accept()
   assert(removed == 1);
   register_pipe();
   msgr->lock.Unlock();
+  pipe_lock.Unlock();
 
   r = tcp_write((char*)&reply, sizeof(reply));
   if (r < 0) {
@@ -657,7 +672,6 @@ int Pipe::accept()
     start_writer();
   }
   ldout(msgr->cct,20) << "accept done" << dendl;
-  pipe_lock.Unlock();
 
   maybe_start_delay_thread();
 
@@ -690,10 +704,10 @@ int Pipe::accept()
     if (queued || replaced)
       start_writer();
   }
-  pipe_lock.Unlock();
   return -1;
 
  shutting_down:
+  assert(pipe_lock.is_locked());
   msgr->lock.Unlock();
 
   if (msgr->cct->_conf->ms_inject_internal_delays) {
@@ -703,11 +717,9 @@ int Pipe::accept()
     t.sleep();
   }
 
-  pipe_lock.Lock();
   state = STATE_CLOSED;
   state_closed.set(1);
   fault();
-  pipe_lock.Unlock();
   return -1;
 }
 
@@ -1310,11 +1322,13 @@ void Pipe::stop()
  */
 void Pipe::reader()
 {
-  if (state == STATE_ACCEPTING) 
-    accept();
-
   pipe_lock.Lock();
 
+  if (state == STATE_ACCEPTING) {
+    accept();
+    assert(pipe_lock.is_locked());
+  }
+
   // loop.
   while (state != STATE_CLOSED &&
         state != STATE_CONNECTING) {