]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: msgr2: clean cookie if connection failed in ACCEPT_SESSION
authorRicardo Dias <rdias@suse.com>
Thu, 24 Jan 2019 09:48:44 +0000 (09:48 +0000)
committerRicardo Dias <rdias@suse.com>
Thu, 24 Jan 2019 09:48:44 +0000 (09:48 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index ec8e7e534b31b12ca3901319b3a9fec32dda64cf..7084ca8ea8404f5a2ede423bee715fa2181f1333 100644 (file)
@@ -423,6 +423,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       connect_seq(0),
       peer_global_seq(0),
       message_seq(0),
+      reconnecting(false),
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
@@ -643,6 +644,15 @@ CtPtr ProtocolV2::_fault() {
   connection->fault();
   reset_recv_state();
 
+  if (state == ACCEPTING_SESSION && !reconnecting) {
+    // if the connection is generating a new session and fails during this
+    // process, we need to clean the cookie. Otherwise, the connection might
+    // later try to reconnect to a non-existing session, and the other side
+    // will force a SESSION_RESET, which will cause a drop of the out_queue.
+    cookie = 0;
+  }
+  reconnecting = false;
+
   if (connection->policy.standby && out_queue.empty() && !keepalive &&
       state != WAIT) {
     ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
@@ -1488,6 +1498,7 @@ CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) {
 CtPtr ProtocolV2::ready() {
   ldout(cct, 25) << __func__ << dendl;
 
+  reconnecting = false;
   replacing = false;
 
   // make sure no pending tick timer
@@ -2710,11 +2721,13 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) {
 
   ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
 
+  reconnecting = true;
+
   // everything looks good
   exproto->connect_seq = reconnect.connect_seq();
   exproto->message_seq = reconnect.msg_seq();
 
-  return reuse_connection(existing, exproto, true);
+  return reuse_connection(existing, exproto);
 }
 
 CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
@@ -2772,13 +2785,13 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     if (connection->policy.resetcheck) {
       exproto->reset_session();
     }
-    return reuse_connection(existing, exproto, false);
+    return reuse_connection(existing, exproto);
   }
 
   if (exproto->state == READY || exproto->state == STANDBY) {
     ldout(cct, 1) << __func__ << " existing=" << existing
                   << " is READY/STANDBY, lets reuse it" << dendl;
-    return reuse_connection(existing, exproto, false);
+    return reuse_connection(existing, exproto);
   }
 
   // Looks like a connection race: server and client are both connecting to
@@ -2790,7 +2803,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     ldout(cct, 1) << __func__
                   << " connection race detected, replacing existing="
                   << existing << " socket by this connection's socket" << dendl;
-    return reuse_connection(existing, exproto, false);
+    return reuse_connection(existing, exproto);
   } else {
     // the existing connection wins
     ldout(cct, 1)
@@ -2813,9 +2826,9 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
 }
 
 CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
-                                   ProtocolV2 *exproto, bool reconnect) {
+                                   ProtocolV2 *exproto) {
   ldout(cct, 20) << __func__ << " existing=" << existing
-                 << " reconnect=" << reconnect << dendl;
+                 << " reconnect=" << reconnecting << dendl;
 
   connection->inject_delay();
 
@@ -2829,7 +2842,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
     ceph_assert(!connection->delay_state);
   }
   exproto->reset_recv_state();
-  if (!reconnect) {
+  if (!reconnecting) {
     exproto->peer_name = peer_name;
     exproto->peer_global_seq = peer_global_seq;
     exproto->connection_features = connection_features;
@@ -2848,6 +2861,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   connection->dispatch_queue->queue_reset(connection);
 
   exproto->can_write = false;
+  exproto->reconnecting = reconnecting;
   exproto->replacing = true;
   exproto->session_security = session_security;
   exproto->auth_method = auth_method;
@@ -2864,8 +2878,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   ceph_assert(connection->recv_start == connection->recv_end);
 
   auto deactivate_existing = std::bind(
-      [existing, new_worker, new_center, exproto,
-       reconnect](ConnectedSocket &cs) mutable {
+      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
           std::lock_guard<std::mutex> l(existing->lock);
@@ -2899,7 +2912,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
         // events in existing->center's queue. Then if we mark down
         // `existing`, it will execute in another thread and clean up
         // connection. Previous event will result in segment fault
-        auto transfer_existing = [existing, exproto, reconnect]() mutable {
+        auto transfer_existing = [existing, exproto]() mutable {
           std::lock_guard<std::mutex> l(existing->lock);
           if (exproto->state == CLOSED) return;
           ceph_assert(exproto->state == NONE);
@@ -2908,7 +2921,7 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
           existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
           existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
                                               existing->read_handler);
-          if (!reconnect) {
+          if (!exproto->reconnecting) {
             exproto->run_continuation(exproto->send_server_ident());
           } else {
             exproto->run_continuation(exproto->send_reconnect_ok());
index f289b95fcb7abac60fa73cbeb9d4cd7bb6c5923f..d6721ffcf3dc49b516530067e8116488fa678a0a 100644 (file)
@@ -87,6 +87,7 @@ private:
   uint64_t connect_seq;
   uint64_t peer_global_seq;
   uint64_t message_seq;
+  bool reconnecting;
   bool replacing;
   bool can_write;
   std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
@@ -252,7 +253,7 @@ private:
   Ct<ProtocolV2> *handle_reconnect(char *payload, uint32_t length);
   Ct<ProtocolV2> *handle_existing_connection(AsyncConnectionRef existing);
   Ct<ProtocolV2> *reuse_connection(AsyncConnectionRef existing,
-                                   ProtocolV2 *exproto, bool reconnect);
+                                   ProtocolV2 *exproto);
   Ct<ProtocolV2> *send_server_ident();
   Ct<ProtocolV2> *send_reconnect_ok();
   Ct<ProtocolV2> *server_ready();