]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: Fix replacing cause original state lossy
authorHaomai Wang <haomaiwang@gmail.com>
Wed, 14 Jan 2015 03:14:16 +0000 (11:14 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Thu, 15 Jan 2015 19:07:13 +0000 (03:07 +0800)
Because AsyncConnection won't enter "open" tag from "replace" tag,
the codes which set reply_tag won't be used when enter "open" tag.
It will cause server side discard out_q and lose state.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.h

index bae0511ff52bc3768005a64b9273c6d7de7fd9f0..157ec77f7034c6d4a84e82f2975ad298b9ed28d0 100644 (file)
@@ -1489,9 +1489,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   int r = 0;
   ceph_msg_connect_reply reply;
   bufferlist reply_bl;
-  uint64_t existing_seq = -1;
   bool is_reset_from_peer = false;
-  char reply_tag = 0;
 
   memset(&reply, 0, sizeof(reply));
   reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
@@ -1663,11 +1661,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   assert(0);
 
  replace:
-  // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
-  if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
-    reply_tag = CEPH_MSGR_TAG_SEQ;
-    existing_seq = existing->in_seq;
-  }
   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
   if (async_msgr->cct->_conf->ms_inject_internal_delays) {
@@ -1687,20 +1680,15 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     reply.connect_seq = existing->connect_seq + 1;
     r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
     existing->lock.Unlock();
-    if (r < 0) {
+    if (r < 0)
       goto fail;
-    }
-    return r;
+    return 0;
   }
 
-  // Here we use "_stop" instead of "mark_down" because "mark_down" is a async
-  // operation, but now we need ensure all variables in `existing` is cleaned up
-  // and we will reuse it next.
-  existing->_stop(true);
   if (existing->policy.lossy) {
     // disconnect from the Connection
     center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
-    existing->discard_out_queue();
+    existing->_stop();
   } else {
     // queue a reset on the new connection, which we're dumping for the old
     center->dispatch_event_external(reset_handler);
@@ -1710,28 +1698,32 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
     if (is_reset_from_peer)
       existing->in_seq = 0;
 
-    // Clean up output buffer
-    existing->outcoming_bl.clear();
-    existing->requeue_sent();
-    reply.connect_seq = connect.connect_seq + 1;
-    if (_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
-      existing->lock.Unlock();
-      goto fail;
-    }
-
     // Now existing connection will be alive and the current connection will
     // exchange socket with existing connection because we want to maintain
     // original "connection_state"
+    existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
     existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
 
-    existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
-    center->create_file_event(existing->sd, EVENT_READABLE, read_handler);
+    reply.connect_seq = connect.connect_seq + 1;
+
+    // Clean up output buffer
+    existing->outcoming_bl.clear();
+    existing->requeue_sent();
 
     swap(existing->sd, sd);
-    existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
     existing->open_write = false;
     existing->replacing = true;
+    existing->state_offset = 0;
+    existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+    // there should exist any buffer
+    assert(recv_start == recv_end);
+
+    if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply) < 0) {
+      // handle error
+      existing->center->dispatch_event_external(existing->write_handler);
+    }
+
     _stop();
     existing->lock.Unlock();
     return 0;
@@ -1739,14 +1731,24 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   existing->lock.Unlock();
 
  open:
-  replacing = false;
   connect_seq = connect.connect_seq + 1;
   peer_global_seq = connect.global_seq;
   ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
-                       << connect_seq << ", sending READY" << dendl;
+                             << connect_seq << ", sending READY" << dendl;
+
+  int next_state;
+
+  // if it is a hard reset from peer(in_seq == 0), we don't need a round-trip to negotiate in/out sequence
+  if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && in_seq) {
+    reply.tag = CEPH_MSGR_TAG_SEQ;
+    next_state = STATE_ACCEPTING_WAIT_SEQ;
+  } else {
+    reply.tag = CEPH_MSGR_TAG_READY;
+    next_state = STATE_ACCEPTING_READY;
+    discard_requeued_up_to(0);
+  }
 
   // send READY reply
-  reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
   reply.features = policy.features_supported;
   reply.global_seq = async_msgr->get_global_seq();
   reply.connect_seq = connect_seq;
@@ -1767,18 +1769,12 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   if (reply.authorizer_len)
     reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
 
-  int next_state;
+  if (reply.tag == CEPH_MSGR_TAG_SEQ)
+    reply_bl.append((char*)&in_seq, sizeof(in_seq));
 
-  if (reply_tag == CEPH_MSGR_TAG_SEQ) {
-    reply_bl.append((char*)&existing_seq, sizeof(existing_seq));
-    next_state = STATE_ACCEPTING_WAIT_SEQ;
-  } else {
-    next_state = STATE_ACCEPTING_READY;
-    discard_requeued_up_to(0);
-  }
-
-  // if replacing, this con is alreadly accepted.
   lock.Unlock();
+  // Because "replacing" will prevent other connections preempt this addr,
+  // it's safe that here we don't acquire Connection's lock
   r = async_msgr->accept_conn(this);
 
   if (async_msgr->cct->_conf->ms_inject_internal_delays) {
@@ -1790,6 +1786,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
 
   lock.Lock();
+  replacing = false;
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " existing race replacing process for addr=" << peer_addr
                               << " just fail later one(this)" << dendl;
@@ -1806,6 +1803,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   center->dispatch_event_external(accept_handler);
   async_msgr->ms_deliver_handle_fast_accept(this);
 
+
   r = _try_send(reply_bl);
   if (r < 0)
     goto fail_registered;
@@ -1978,7 +1976,6 @@ void AsyncConnection::fault()
   if (policy.lossy && state != STATE_CONNECTING) {
     ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
     center->dispatch_event_external(reset_handler);
-    discard_out_queue();
     _stop();
     return ;
   }
@@ -1993,6 +1990,7 @@ void AsyncConnection::fault()
   requeue_sent();
   recv_start = recv_end = 0;
   state_offset = 0;
+  replacing = false;
   outcoming_bl.clear();
   if (policy.standby && !is_queued()) {
     ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;
@@ -2044,17 +2042,15 @@ void AsyncConnection::was_session_reset()
 }
 
 // *note: `async` is true only happen when replacing connection process
-void AsyncConnection::_stop(bool replacing)
+void AsyncConnection::_stop()
 {
   assert(lock.is_locked());
   ldout(async_msgr->cct, 10) << __func__ << dendl;
   if (sd > 0)
     center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
 
-  if (!replacing) {
-    discard_out_queue();
-    async_msgr->unregister_conn(this);
-  }
+  discard_out_queue();
+  async_msgr->unregister_conn(this);
 
   if (async_msgr->cct->_conf->ms_inject_internal_delays) {
     ldout(msgr->cct, 10) << __func__ << " sleep for "
@@ -2065,10 +2061,10 @@ void AsyncConnection::_stop(bool replacing)
     t.sleep();
   }
 
+  state = STATE_CLOSED;
   shutdown_socket();
   open_write = false;
   state_offset = 0;
-  state = STATE_CLOSED;
   if (sd > 0)
     ::close(sd);
   sd = -1;
index 690f5863725d3dd836dee56338fc5bfc723a6972..515b1d700d2a042ca3b6fae7dfce9e1c154d80c3 100644 (file)
@@ -53,7 +53,7 @@ class AsyncConnection : public Connection {
   int read_until(uint64_t needed, char *p);
   int _process_connection();
   void _connect();
-  void _stop(bool discard=true);
+  void _stop();
   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
   int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
   void was_session_reset();
index 0a6e9089be21069324e5ac1e54baeea645df1a4d..44b4da1d5188dab76d4ce902e46386ca5fc91bf2 100644 (file)
@@ -389,6 +389,7 @@ public:
       AsyncConnectionRef existing = conns[conn->peer_addr];
 
       // lazy delete, see "deleted_conns"
+      // If conn already in, we will return 0
       Mutex::Locker l(deleted_lock);
       if (deleted_conns.count(existing)) {
         deleted_conns.erase(existing);