]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Fix accept connection replacing process
authorHaomai Wang <haomaiwang@gmail.com>
Sat, 6 Dec 2014 11:47:01 +0000 (19:47 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Sat, 6 Dec 2014 16:33:58 +0000 (00:33 +0800)
Original there exists bugs that we don't modify existing file event,
it will let remaining alive connection's event won't be called by
EventCenter.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h

index 68f34d613e5208a20b169b607edb0cc679f419c5..815c767775f4c58add8eeecd53b8728797106929 100644 (file)
@@ -169,7 +169,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente
     state(STATE_NONE), state_after_send(0), sd(-1),
     lock("AsyncConnection::lock"), open_write(false), keepalive(false),
     stop_lock("AsyncConnection::stop_lock"),
-    got_bad_auth(false), authorizer(NULL),
+    got_bad_auth(false), authorizer(NULL), replacing(false),
     state_buffer(4096), state_offset(0), net(cct), center(c)
 {
   read_handler.reset(new C_handle_read(this));
@@ -1544,7 +1544,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
                          << " > " << existing->connect_seq << dendl;
     goto replace;
   } // existing
-  else if (policy.resetcheck && connect.connect_seq > 0) {
+  else if (!replacing && policy.resetcheck && connect.connect_seq > 0) {
     // we reset, and they are opening a new session
     ldout(async_msgr->cct, 0) << __func__ << "accept we reset (peer sent cseq "
                         << connect.connect_seq << "), sending RESETSESSION" << dendl;
@@ -1597,10 +1597,17 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0)
       goto fail;
 
-    uint64_t s = existing->sd;
-    existing->sd = sd;
-    sd = s;
+    // Now existing connection will be alive and the current connection will
+    // exchange socket with existing connection because we want to maintain
+    // original "connection_state"
+    center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+    center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
+
+    swap(existing->sd, sd);
     existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+    existing->open_write = false;
+    existing->discard_out_queue();
+    existing->replacing = true;
     _stop();
     existing->lock.Unlock();
     return 0;
@@ -1608,6 +1615,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   existing->lock.Unlock();
 
  open:
+  replacing = false;
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
@@ -1777,6 +1785,7 @@ void AsyncConnection::discard_out_queue()
       (*r)->put();
     }
   out_q.clear();
+  outcoming_bl.clear();
 }
 
 int AsyncConnection::randomize_out_seq()
@@ -1855,7 +1864,6 @@ void AsyncConnection::was_session_reset()
 {
   ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
   discard_out_queue();
-  outcoming_bl.clear();
 
   center->dispatch_event_external(remote_reset_handler);
 
@@ -1871,15 +1879,16 @@ void AsyncConnection::was_session_reset()
 void AsyncConnection::_stop()
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
-  center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+  if (sd > 0)
+    center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
   shutdown_socket();
   discard_out_queue();
-  outcoming_bl.clear();
   open_write = false;
   state = STATE_CLOSED;
-  ::close(sd);
+  if (sd > 0)
+    ::close(sd);
   sd = -1;
-  async_msgr->unregister_conn(peer_addr);
+  async_msgr->unregister_conn(this);
   // Here we need to dispatch "signal" event, because we want to ensure signal
   // it after all events called by this "_stop" has be done.
   center->dispatch_event_external(signal_handler);
index 5ccd7904c7fa2030de8165d26000959fe4a0dec1..3ee416daeb8837f8aeca83f4590db5000471f7c0 100644 (file)
@@ -109,8 +109,8 @@ class AsyncConnection : public Connection {
   ostream& _conn_prefix(std::ostream *_dout);
 
   bool is_connected() {
-    // FIXME?
-    return state != STATE_CLOSED;
+    Mutex::Locker l(lock);
+    return state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE;
   }
 
   // Only call when AsyncConnection first construct
@@ -146,7 +146,7 @@ class AsyncConnection : public Connection {
   void mark_down() {
     Mutex::Locker l(stop_lock);
     if (center->get_owner() == pthread_self()) {
-      _stop();
+      stop();
     } else {
       center->dispatch_event_external(stop_handler);
       stop_cond.Wait(stop_lock);
@@ -281,6 +281,12 @@ class AsyncConnection : public Connection {
   // Accepting state
   entity_addr_t socket_addr;
   CryptoKey session_key;
+  bool replacing;    // when replacing process happened, we will reply connect
+                     // side with RETRY tag and accept side will clear replaced
+                     // connection. So when connect side reissue connect_msg,
+                     // there won't exists conflicting connection so we use
+                     // "replacing" to skip RESETSESSION to avoid detect wrong
+                     // presentation
 
   // used only for local state, it will be overwrite when state transition
   bufferptr state_buffer;
index 032538aa6985f4bacfa61e439859da5dc158e0c3..e7660e79b6fef2f8bc84b974503eb6740a4b05e5 100644 (file)
@@ -352,7 +352,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     processor(this, _nonce),
     lock("AsyncMessenger::lock"),
     nonce(_nonce), did_bind(false),
-    global_seq(0),
+    global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
     cluster_protocol(0), stopped(true)
 {
   ceph_spin_init(&global_seq_lock);
@@ -471,7 +471,7 @@ void AsyncMessenger::wait()
 
     while (!conns.empty()) {
       AsyncConnectionRef p = conns.begin()->second;
-      _stop_conn(p);
+      p->mark_down();
     }
   }
   lock.Unlock();
@@ -658,7 +658,7 @@ void AsyncMessenger::mark_down(const entity_addr_t& addr)
   AsyncConnectionRef p = _lookup_conn(addr);
   if (p) {
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
-    _stop_conn(p);
+    p->mark_down();
     p->get();
     ms_deliver_handle_reset(p.get());
   } else {
index aff5ed3d2b8d0a02a8b37fd99fb69be50c55d368..4d01e266546447b502a6c15b248afba6cf0941c7 100644 (file)
@@ -293,6 +293,20 @@ private:
   // FIXME clear up
   set<AsyncConnectionRef> accepting_conns;
 
+  /**
+   * list of connection are closed which need to be clean up
+   *
+   * Because AsyncMessenger and AsyncConnection follow a lock rule that
+   * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
+   * but can't reversed. This rule is aimed to avoid dead lock.
+   * So if AsyncConnection want to unregister itself from AsyncMessenger,
+   * we pick up this idea that just queue itself to this set and do lazy
+   * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
+   * AsyncConnection in this set.
+   */
+  Mutex deleted_lock;
+  set<AsyncConnectionRef> deleted_conns;
+
   /// internal cluster protocol version, if any, for talking to entities of the same type.
   int cluster_protocol;
 
@@ -305,16 +319,15 @@ private:
     if (p == conns.end())
       return NULL;
 
-    assert(p->second->is_connected());
-    return p->second;
-  }
-
-  void _stop_conn(AsyncConnectionRef c) {
-    assert(lock.is_locked());
-    if (c) {
-      c->mark_down();
-      conns.erase(c->peer_addr);
+    // lazy delete, see "deleted_conns"
+    Mutex::Locker l(deleted_lock);
+    if (deleted_conns.count(p->second)) {
+      deleted_conns.erase(p->second);
+      conns.erase(p);
+      return NULL;
     }
+
+    return p->second;
   }
 
   void _init_local_connection() {
@@ -324,7 +337,6 @@ private:
     ms_deliver_handle_fast_connect(local_connection.get());
   }
 
-
 public:
 
   /// con used for sending messages to ourselves
@@ -399,10 +411,12 @@ public:
 
   /**
    * Unregister connection from `conns`
+   *
+   * See "deleted_conns"
    */
-  void unregister_conn(const entity_addr_t &addr) {
-    Mutex::Locker l(lock);
-    conns.erase(addr);
+  void unregister_conn(AsyncConnectionRef conn) {
+    Mutex::Locker l(deleted_lock);
+    deleted_conns.insert(conn);
   }
   /**
    * @} // AsyncMessenger Internals