]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Avoid "lock" acquire in message normal send flow
authorHaomai Wang <haomaiwang@gmail.com>
Thu, 28 May 2015 16:59:50 +0000 (00:59 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 30 May 2015 14:29:56 +0000 (22:29 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 6d2b8e89b4c4ef3cf4df061967165f9173893921..26d0481d4543f2adaa1c44d53247f68024d0a1ee 100644 (file)
@@ -174,9 +174,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
   : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0),
-    sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0),
-    open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL),
+    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
+    write_lock("AsyncConnection::write_lock"), can_write(0),
+    open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
@@ -837,18 +837,19 @@ void AsyncConnection::process()
           // side queueing because messages can't be renumbered, but the (kernel) client will
           // occasionally pull a message out of the sent queue to send elsewhere.  in that case
           // it doesn't matter if we "got" it or not.
-          if (message->get_seq() <= in_seq) {
+          uint64_t cur_seq = in_seq.read();
+          if (message->get_seq() <= cur_seq) {
             ldout(async_msgr->cct,0) << __func__ << " got old message "
-                    << message->get_seq() << " <= " << in_seq << " " << message << " " << *message
+                    << message->get_seq() << " <= " << cur_seq << " " << message << " " << *message
                     << ", discarding" << dendl;
             message->put();
             if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
               assert(0 == "old msgs despite reconnect_seq feature");
             break;
           }
-          if (message->get_seq() > in_seq + 1) {
+          if (message->get_seq() > cur_seq + 1) {
             ldout(async_msgr->cct, 0) << __func__ << " missed message?  skipped from seq "
-                                      << in_seq << " to " << message->get_seq() << dendl;
+                                      << cur_seq << " to " << message->get_seq() << dendl;
             if (async_msgr->cct->_conf->ms_die_on_skipped_message)
               assert(0 == "skipped incoming seq");
           }
@@ -856,13 +857,13 @@ void AsyncConnection::process()
           message->set_connection(this);
 
           // note last received message.
-          in_seq = message->get_seq();
+          in_seq.set(message->get_seq());
           ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
                                << " " << message << " " << *message << dendl;
 
           // if send_message always successfully send, it may have no
           // opportunity to send seq ack. 10 is a experience value.
-          if (in_seq > in_seq_acked + 10) {
+          if (ack_left.inc() > 10) {
             center->dispatch_event_external(write_handler);
           }
 
@@ -1242,7 +1243,8 @@ int AsyncConnection::_process_connection()
         //}
 
         bufferlist bl;
-        bl.append((char*)&in_seq, sizeof(in_seq));
+        uint64_t s = in_seq.read();
+        bl.append((char*)&s, sizeof(s));
         r = try_send(bl);
         if (r == 0) {
           state = STATE_CONNECTING_READY;
@@ -1810,7 +1812,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
-                             << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
+                             << connect_seq << " in_seq=" << in_seq.read() << ", sending READY" << dendl;
 
   int next_state;
 
@@ -1823,7 +1825,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     next_state = STATE_ACCEPTING_READY;
     discard_requeued_up_to(0);
     is_reset_from_peer = false;
-    in_seq = 0;
+    in_seq.set(0);
   }
 
   // send READY reply
@@ -1847,8 +1849,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (reply.authorizer_len)
     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
 
+  uint64_t s = in_seq.read();
   if (reply.tag == CEPH_MSGR_TAG_SEQ)
-    reply_bl.append((char*)&in_seq, sizeof(in_seq));
+    reply_bl.append((char*)&s, sizeof(s));
 
   lock.Unlock();
   // Because "replacing" will prevent other connections preempt this addr,
@@ -2148,6 +2151,7 @@ void AsyncConnection::fault()
 void AsyncConnection::was_session_reset()
 {
   ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
+  assert(lock.is_locked());
   Mutex::Locker l(write_lock);
   discard_out_queue();
 
@@ -2157,9 +2161,10 @@ void AsyncConnection::was_session_reset()
     ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl;
   }
 
-  in_seq = 0;
+  in_seq.set(0);
   connect_seq = 0;
-  in_seq_acked = 0;
+  // it's safe to directly set 0, double locked
+  ack_left.set(0);
   once_ready = false;
   can_write = 0;
 }
@@ -2324,8 +2329,8 @@ void AsyncConnection::handle_ack(uint64_t seq)
 void AsyncConnection::send_keepalive()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
-  Mutex::Locker l(lock);
-  if (state != STATE_CLOSED) {
+  Mutex::Locker l(write_lock);
+  if (can_write != 2) {
     keepalive = true;
     center->dispatch_event_external(write_handler);
   }
@@ -2367,12 +2372,11 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
 void AsyncConnection::handle_write()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
-  Mutex::Locker l(lock);
   bufferlist bl;
   int r = 0;
 
   write_lock.Lock();
-  if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+  if (can_write == 1) {
     if (keepalive) {
       _send_keepalive_or_ack();
       keepalive = false;
@@ -2399,13 +2403,14 @@ void AsyncConnection::handle_write()
       }
     }
 
-    if (in_seq > in_seq_acked) {
+    uint64_t left = ack_left.read();
+    if (left) {
       ceph_le64 s;
-      s = in_seq;
+      s = in_seq.read();
       bl.append(CEPH_MSGR_TAG_ACK);
       bl.append((char*)&s, sizeof(s));
-      ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl;
-      in_seq_acked = s;
+      ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl;
+      ack_left.sub(ack_left);
       r = _try_send(bl);
     } else if (is_queued()) {
       r = _try_send(bl);
@@ -2415,23 +2420,42 @@ void AsyncConnection::handle_write()
       ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl;
       goto fail;
     }
-  } else if (state == STATE_STANDBY && !policy.server && is_queued()) {
-    ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
-                               << " policy.server is false" << dendl;
-    _connect();
-  } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
-    r = _try_send(bl);
-    if (r < 0) {
-      ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
-      goto fail;
+    write_lock.Unlock();
+  } else {
+    write_lock.Unlock();
+
+    if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+      if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) {
+        ldout(msgr->cct, 10) << __func__ << " sleep for "
+                             << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+        utime_t t;
+        t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+        t.sleep();
+      }
     }
+
+    lock.Lock();
+    write_lock.Lock();
+    if (state == STATE_STANDBY && !policy.server && is_queued()) {
+      ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
+                                 << " policy.server is false" << dendl;
+      _connect();
+    } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) {
+      r = _try_send(bl);
+      if (r < 0) {
+        ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl;
+        write_lock.Unlock();
+        goto fail;
+      }
+    }
+    write_lock.Unlock();
+    lock.Unlock();
   }
-  write_lock.Unlock();
 
   return ;
  fail:
-  write_lock.Unlock();
   fault();
+  lock.Unlock();
 }
 
 void AsyncConnection::wakeup_from(uint64_t id)
index 11850856c129f52598c3d26a9dc2c5ace5e735c8..1f5c0edc0c2bb3701ffa61dc9883ef0850716d25 100644 (file)
@@ -234,7 +234,6 @@ class AsyncConnection : public Connection {
   int global_seq;
   __u32 connect_seq, peer_global_seq;
   atomic_t out_seq;
-  uint64_t in_seq, in_seq_acked;
   int state;
   int state_after_send;
   int sd;
@@ -242,12 +241,14 @@ class AsyncConnection : public Connection {
   Messenger::Policy policy;
 
   Mutex write_lock;
+  uint64_t in_seq, in_seq_acked;
   int can_write;  // 0. can't send 1. can send_message 2. connection is closed
   bool open_write;
   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
   list<pair<bufferlist, Message*> > sent; // the first bufferlist need to inject seq
   list<Message*> local_messages;    // local deliver
   bufferlist outcoming_bl;
+  bool keepalive;
 
   Mutex lock;
   utime_t backoff;         // backoff time
@@ -258,7 +259,6 @@ class AsyncConnection : public Connection {
   EventCallbackRef connect_handler;
   EventCallbackRef local_deliver_handler;
   EventCallbackRef wakeup_handler;
-  bool keepalive;
   struct iovec msgvec[IOV_MAX];
   char *recv_buf;
   uint32_t recv_max_prefetch;
@@ -277,6 +277,9 @@ class AsyncConnection : public Connection {
   bufferlist::iterator data_blp;
   bufferlist front, middle, data;
   ceph_msg_connect connect_msg;
+  // used to accumulate the difference between `in_seq` and `in_seq_acked`, why
+  // we have this field because of lock separation
+  int ack_left;
   // Connecting state
   bool got_bad_auth;
   AuthAuthorizer *authorizer;