]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: avoid is_connected require connection's lock 8520/head
authorHaomai Wang <haomai@xsky.com>
Sun, 10 Apr 2016 07:21:55 +0000 (15:21 +0800)
committerHaomai Wang <haomai@xsky.com>
Sun, 10 Apr 2016 09:02:58 +0000 (17:02 +0800)
Fixes: http://tracker.ceph.com/issues/15440
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 611e49890b1f09a8564c553240d92fa9a013e884..7ae81871845359d4b5d8891e791d289d82e9eb41 100644 (file)
@@ -178,7 +178,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
   : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
     out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
-    write_lock("AsyncConnection::write_lock"), can_write(NOWRITE),
+    write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
@@ -1349,7 +1349,7 @@ ssize_t AsyncConnection::_process_connection()
         // message may in queue between last _try_send and connection ready
         // write event may already notify and we need to force scheduler again
         write_lock.Lock();
-        can_write = CANWRITE;
+        can_write = WriteStatus::CANWRITE;
         if (is_queued())
           center->dispatch_event_external(write_handler);
         write_lock.Unlock();
@@ -1511,7 +1511,7 @@ ssize_t AsyncConnection::_process_connection()
         state = STATE_OPEN;
         memset(&connect_msg, 0, sizeof(connect_msg));
         write_lock.Lock();
-        can_write = CANWRITE;
+        can_write = WriteStatus::CANWRITE;
         if (is_queued())
           center->dispatch_event_external(write_handler);
         write_lock.Unlock();
@@ -1816,7 +1816,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     ldout(async_msgr->cct, 1) << __func__ << " replacing on lossy channel, failing existing" << dendl;
     existing->_stop();
   } else {
-    assert(can_write == NOWRITE);
+    assert(can_write == WriteStatus::NOWRITE);
     existing->write_lock.Lock(true);
     // queue a reset on the new connection, which we're dumping for the old
     center->dispatch_event_external(reset_handler);
@@ -1842,7 +1842,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
     existing->requeue_sent();
 
     swap(existing->sd, sd);
-    existing->can_write = NOWRITE;
+    existing->can_write = WriteStatus::NOWRITE;
     existing->open_write = false;
     existing->replacing = true;
     existing->state_offset = 0;
@@ -2011,7 +2011,7 @@ int AsyncConnection::send_message(Message *m)
   if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
     Mutex::Locker l(write_lock);
-    if (can_write != CLOSED) {
+    if (can_write != WriteStatus::CLOSED) {
       local_messages.push_back(m);
       center->dispatch_event_external(local_deliver_handler);
     } else {
@@ -2037,14 +2037,14 @@ int AsyncConnection::send_message(Message *m)
 
   Mutex::Locker l(write_lock);
   // "features" changes will change the payload encoding
-  if (can_fast_prepare && (can_write == NOWRITE || get_features() != f)) {
+  if (can_fast_prepare && (can_write == WriteStatus::NOWRITE || get_features() != f)) {
     // ensure the correctness of message encoding
     bl.clear();
     m->get_payload().clear();
-    ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous "
+    ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer previous "
                               << f << " != " << get_features() << dendl;
   }
-  if (!is_queued() && can_write == CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
+  if (!is_queued() && can_write == WriteStatus::CANWRITE && async_msgr->cct->_conf->ms_async_send_inline) {
     if (!can_fast_prepare)
       prepare_send_message(get_features(), m, bl);
     logger->inc(l_msgr_send_messages_inline);
@@ -2053,7 +2053,7 @@ int AsyncConnection::send_message(Message *m)
       // we want to handle fault within internal thread
       center->dispatch_event_external(write_handler);
     }
-  } else if (can_write == CLOSED) {
+  } else if (can_write == WriteStatus::CLOSED) {
     ldout(async_msgr->cct, 10) << __func__ << " connection closed."
                                << " Drop message " << m << dendl;
     m->put();
@@ -2163,7 +2163,7 @@ void AsyncConnection::fault()
     ::close(sd);
     sd = -1;
   }
-  can_write = NOWRITE;
+  can_write = WriteStatus::NOWRITE;
   open_write = false;
 
   // requeue sent items
@@ -2238,7 +2238,7 @@ void AsyncConnection::was_session_reset()
   // it's safe to directly set 0, double locked
   ack_left.set(0);
   once_ready = false;
-  can_write = NOWRITE;
+  can_write = WriteStatus::NOWRITE;
 }
 
 void AsyncConnection::_stop()
@@ -2257,7 +2257,7 @@ void AsyncConnection::_stop()
 
   state = STATE_CLOSED;
   open_write = false;
-  can_write = CLOSED;
+  can_write = WriteStatus::CLOSED;
   state_offset = 0;
   if (sd >= 0) {
     shutdown_socket();
@@ -2293,7 +2293,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
 
 ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more)
 {
-  assert(can_write == CANWRITE);
+  assert(can_write == WriteStatus::CANWRITE);
   m->set_seq(out_seq.inc());
 
   if (!policy.lossy) {
@@ -2412,7 +2412,7 @@ void AsyncConnection::send_keepalive()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
   Mutex::Locker l(write_lock);
-  if (can_write != CLOSED) {
+  if (can_write != WriteStatus::CLOSED) {
     keepalive = true;
     center->dispatch_event_external(write_handler);
   }
@@ -2454,7 +2454,7 @@ void AsyncConnection::handle_write()
   ssize_t r = 0;
 
   write_lock.Lock();
-  if (can_write == CANWRITE) {
+  if (can_write == WriteStatus::CANWRITE) {
     if (keepalive) {
       _send_keepalive_or_ack();
       keepalive = false;
index 4a76a7c1b24113cf13ae57a974a6057d74d5b5b1..608cccee84895adbd81f611e6e81cb74a8fadfa7 100644 (file)
@@ -17,6 +17,7 @@
 #ifndef CEPH_MSG_ASYNCCONNECTION_H
 #define CEPH_MSG_ASYNCCONNECTION_H
 
+#include <atomic>
 #include <pthread.h>
 #include <signal.h>
 #include <climits>
@@ -130,8 +131,7 @@ class AsyncConnection : public Connection {
   ostream& _conn_prefix(std::ostream *_dout);
 
   bool is_connected() override {
-    Mutex::Locker l(lock);
-    return state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE;
+    return can_write.load() == WriteStatus::CANWRITE;
   }
 
   // Only call when AsyncConnection first construct
@@ -240,11 +240,12 @@ class AsyncConnection : public Connection {
   Messenger::Policy policy;
 
   Mutex write_lock;
-  enum {
+  enum class WriteStatus {
     NOWRITE,
     CANWRITE,
     CLOSED
-  } can_write;
+  };
+  std::atomic<WriteStatus> can_write;
   bool open_write;
   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
   list<Message*> sent; // the first bufferlist need to inject seq