]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: msgr2: fault handling
authorRicardo Dias <rdias@suse.com>
Wed, 7 Nov 2018 17:16:40 +0000 (17:16 +0000)
committerRicardo Dias <rdias@suse.com>
Wed, 23 Jan 2019 13:59:24 +0000 (13:59 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 8cdd9658e49a06ed9184dc0d020c83c192f314cb..9323e71f2c8398138580795445141143822b8485 100644 (file)
@@ -56,6 +56,7 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       cookie(0),
       message_seq(0),
       can_write(false),
+      connect_seq(0),
       bannerExchangeCallback(nullptr),
       next_frame_len(0),
       keepalive(false) {
@@ -112,12 +113,66 @@ void ProtocolV2::stop() {
   state = CLOSED;
 }
 
-void ProtocolV2::fault() {
+void ProtocolV2::fault() { _fault(); }
+
+void ProtocolV2::requeue_sent() {
+  if (sent.empty()) {
+    return;
+  }
+
+  list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  out_seq -= sent.size();
+  while (!sent.empty()) {
+    Message *m = sent.back();
+    sent.pop_back();
+    ldout(cct, 10) << __func__ << " " << *m << " for resend "
+                   << " (" << m->get_seq() << ")" << dendl;
+    rq.push_front(make_pair(bufferlist(), m));
+  }
+}
+
+void ProtocolV2::reset_recv_state() {
+  // clean read and write callbacks
+  connection->pendingReadLen.reset();
+  connection->writeCallback.reset();
+
+  uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+                          current_header.data_len;
+
+  if (state > THROTTLE_MESSAGE && state <= READ_MESSAGE_FRONT &&
+      connection->policy.throttler_messages) {
+    ldout(cct, 10) << __func__ << " releasing " << 1
+                   << " message to policy throttler "
+                   << connection->policy.throttler_messages->get_current()
+                   << "/" << connection->policy.throttler_messages->get_max()
+                   << dendl;
+    connection->policy.throttler_messages->put();
+  }
+  if (state > THROTTLE_BYTES && state <= READ_MESSAGE_FRONT) {
+    if (connection->policy.throttler_bytes) {
+      ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
+                     << " bytes to policy throttler "
+                     << connection->policy.throttler_bytes->get_current() << "/"
+                     << connection->policy.throttler_bytes->get_max() << dendl;
+      connection->policy.throttler_bytes->put(cur_msg_size);
+    }
+  }
+  if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_MESSAGE_FRONT) {
+    ldout(cct, 10)
+        << __func__ << " releasing " << cur_msg_size
+        << " bytes to dispatch_queue throttler "
+        << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+        << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
+    connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
+  }
+}
+
+CtPtr ProtocolV2::_fault() {
   ldout(cct, 10) << __func__ << dendl;
 
   if (state == CLOSED || state == NONE) {
     ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
-    return;
+    return nullptr;
   }
 
   if (connection->policy.lossy && state != START_CONNECT &&
@@ -125,8 +180,71 @@ void ProtocolV2::fault() {
     ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
     stop();
     connection->dispatch_queue->queue_reset(connection);
-    return;
+    return nullptr;
   }
+
+  connection->write_lock.lock();
+
+  can_write = false;
+  // requeue sent items
+  requeue_sent();
+
+  if (out_queue.empty() && state >= START_ACCEPT &&
+      state <= ACCEPTED_CLIENT_IDENT) {
+    ldout(cct, 10) << __func__ << " with nothing to send and in the half "
+                   << " accept state just closed" << dendl;
+    stop();
+    connection->dispatch_queue->queue_reset(connection);
+    connection->write_lock.unlock();
+    return nullptr;
+  }
+
+  connection->fault();
+  reset_recv_state();
+
+  if (connection->policy.standby && out_queue.empty() && state != WAIT) {
+    ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
+                   << dendl;
+    state = STANDBY;
+    connection->write_lock.unlock();
+    return nullptr;
+  }
+
+  connection->write_lock.unlock();
+
+  if (state != START_CONNECT && state != CONNECTING && state != WAIT) {
+    // policy maybe empty when state is in accept
+    if (connection->policy.server) {
+      ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
+      state = STANDBY;
+    } else {
+      ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
+      connect_seq++;
+      state = START_CONNECT;
+      connection->state = AsyncConnection::STATE_CONNECTING;
+    }
+    backoff = utime_t();
+    connection->center->dispatch_event_external(connection->read_handler);
+  } else {
+    if (state == WAIT) {
+      backoff.set_from_double(cct->_conf->ms_max_backoff);
+    } else if (backoff == utime_t()) {
+      backoff.set_from_double(cct->_conf->ms_initial_backoff);
+    } else {
+      backoff += backoff;
+      if (backoff > cct->_conf->ms_max_backoff)
+        backoff.set_from_double(cct->_conf->ms_max_backoff);
+    }
+
+    state = START_CONNECT;
+    connection->state = AsyncConnection::STATE_CONNECTING;
+    ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
+    // woke up again;
+    connection->register_time_events.insert(
+        connection->center->create_time_event(backoff.to_nsec() / 1000,
+                                              connection->wakeup_handler));
+  }
+  return nullptr;
 }
 
 void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
@@ -400,12 +518,11 @@ void ProtocolV2::write_event() {
     connection->write_lock.unlock();
     connection->lock.lock();
     connection->write_lock.lock();
-    // if (state == STANDBY && !connection->policy.server && is_queued()) {
-    //   ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
-    //   connection->_connect();
-    // } else
-    if (connection->cs && state != NONE && state != CLOSED &&
-        state != START_CONNECT) {
+    if (state == STANDBY && !connection->policy.server && is_queued()) {
+      ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+      connection->_connect();
+    } else if (connection->cs && state != NONE && state != CLOSED &&
+               state != START_CONNECT) {
       r = connection->_try_send();
       if (r < 0) {
         ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
@@ -530,6 +647,15 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
 
   if (connection->get_peer_type() == -1) {
     connection->set_peer_type(peer_type);
+
+    ceph_assert(state == ACCEPTING);
+    connection->policy = messenger->get_policy(peer_type);
+    ldout(cct, 10) << __func__ << " accept of host_type " << peer_type
+                   << ", policy.lossy=" << connection->policy.lossy
+                   << " policy.server=" << connection->policy.server
+                   << " policy.standby=" << connection->policy.standby
+                   << " policy.resetcheck=" << connection->policy.resetcheck
+                   << dendl;
   } else {
     if (connection->get_peer_type() != peer_type) {
       ldout(cct, 1) << __func__ << " connection peer type does not match what"
@@ -1518,6 +1644,8 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
     return WRITE(bl, handle_ident_missing_features_write);
   }
 
+  state = ACCEPTED_CLIENT_IDENT;
+
   // if everything is OK reply with server identification
   connection->peer_global_id = client_ident.gid;
   cookie = client_ident.cookie;
@@ -1555,7 +1683,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) {
     connection->inject_delay();
     return _fault();
   }
-  if (state != ACCEPTING) {
+  if (state != ACCEPTED_CLIENT_IDENT) {
     ldout(cct, 1) << __func__
                   << " state changed while accept_conn, it must be mark_down"
                   << dendl;
index 7de850756d0bf4034cfe439c57ba00741bba15de..616239d4fd919b9f603251aad2104804da0b5c2b 100644 (file)
@@ -14,12 +14,15 @@ private:
     CONNECTING,
     START_ACCEPT,
     ACCEPTING,
+    ACCEPTED_CLIENT_IDENT,
     READY,
     THROTTLE_MESSAGE,
     THROTTLE_BYTES,
     THROTTLE_DISPATCH_QUEUE,
     READ_MESSAGE_FRONT,
     READ_MESSAGE_COMPLETE,
+    STANDBY,
+    WAIT,
     CLOSED
   };
 
@@ -29,16 +32,20 @@ private:
                                       "CONNECTING",
                                       "START_ACCEPT",
                                       "ACCEPTING",
+                                      "ACCEPTED_CLIENT_IDENT",
                                       "READY",
                                       "THROTTLE_MESSAGE",
                                       "THROTTLE_BYTES",
                                       "THROTTLE_DISPATCH_QUEUE",
                                       "READ_MESSAGE_FRONT",
                                       "READ_MESSAGE_COMPLETE",
+                                      "STANDBY",
+                                      "WAIT",
                                       "CLOSED"};
     return statenames[state];
   }
 
+public:
   enum class Tag : uint32_t {
     AUTH_REQUEST,
     AUTH_BAD_METHOD,
@@ -315,6 +322,7 @@ private:
   bool can_write;
   std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
   std::list<Message *> sent;
+  __u32 connect_seq;
   std::atomic<uint64_t> out_seq{0};
   std::atomic<uint64_t> in_seq{0};
   std::atomic<uint64_t> ack_left{0};
@@ -342,11 +350,9 @@ private:
   Ct<ProtocolV2> *write(CONTINUATION_PARAM(next, ProtocolV2, int),
                         bufferlist &bl);
 
-  inline Ct<ProtocolV2> *_fault() {
-    fault();
-    return nullptr;
-  }
-
+  void requeue_sent();
+  void reset_recv_state();
+  Ct<ProtocolV2> *_fault();
   void discard_out_queue();
   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
   Message *_get_next_outgoing(bufferlist *bl);