]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: add missing DelayedDelivery and delay injection
authorPiotr Dałek <piotr.dalek@ts.fujitsu.com>
Tue, 5 Apr 2016 07:37:23 +0000 (09:37 +0200)
committerHaomai Wang <haomai@xsky.com>
Tue, 3 May 2016 12:11:14 +0000 (20:11 +0800)
Delay injection was missing from a few spots, also, DelayedDelivery
was added.

Fixes: http://tracker.ceph.com/issues/15372
Signed-off-by: Piotr Dałek <piotr.dalek@ts.fujitsu.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 5e066372d2ca15dad577f7103f7a18f644d3934c..07204783a1319a09888872b993935fb19b231342 100644 (file)
@@ -176,9 +176,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 }
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
-  : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
-    out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
-    write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
+  : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), 
+    peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1),
+    port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
     recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
@@ -207,6 +207,18 @@ AsyncConnection::~AsyncConnection()
     delete[] recv_buf;
   if (state_buffer)
     delete[] state_buffer;
+  if (delay_thread) 
+    delete delay_thread;
+}
+
+void AsyncConnection::maybe_start_delay_thread()
+{
+  if (!delay_thread &&
+      async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
+    lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl;
+    delay_thread = new DelayedDelivery(this, async_msgr);
+    delay_thread->create("ms_async_delay");
+  }
 }
 
 /* return -1 means `fd` occurs error or closed, it should be closed
@@ -495,6 +507,16 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p)
   return len - state_offset;
 }
 
+void AsyncConnection::inject_delay() {
+  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(async_msgr->cct, 10) << __func__ << " sleep for " << 
+      async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+}
+
 void AsyncConnection::process()
 {
   ssize_t r = 0;
@@ -911,7 +933,15 @@ void AsyncConnection::process()
           state = STATE_OPEN;
 
           async_msgr->ms_fast_preprocess(message);
-          if (async_msgr->ms_can_fast_dispatch(message)) {
+          if (delay_thread) {
+            utime_t release;
+            if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+              release = message->get_recv_stamp();
+              release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+              ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl;
+            }
+            delay_thread->queue(release, message);
+          } else if (async_msgr->ms_can_fast_dispatch(message)) {
             lock.Unlock();
             async_msgr->ms_fast_dispatch(message);
             lock.Lock();
@@ -1347,7 +1377,7 @@ ssize_t AsyncConnection::_process_connection()
         if (is_queued())
           center->dispatch_event_external(write_handler);
         write_lock.Unlock();
-
+        maybe_start_delay_thread();
         break;
       }
 
@@ -1487,7 +1517,7 @@ ssize_t AsyncConnection::_process_connection()
         r = read_until(sizeof(newly_acked_seq), state_buffer);
         if (r < 0) {
           ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
-          goto fail;
+          goto fail_registered;
         } else if (r > 0) {
           break;
         }
@@ -1509,6 +1539,7 @@ ssize_t AsyncConnection::_process_connection()
         if (is_queued())
           center->dispatch_event_external(write_handler);
         write_lock.Unlock();
+        maybe_start_delay_thread();
         break;
       }
 
@@ -1521,6 +1552,10 @@ ssize_t AsyncConnection::_process_connection()
 
   return 0;
 
+fail_registered:
+  ldout(async_msgr->cct, 10) << "accept fault after register" << dendl;
+  inject_delay();
+
 fail:
   return -1;
 }
@@ -1655,13 +1690,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   lock.Unlock();
   AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
 
-  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
-    ldout(msgr->cct, 10) << __func__ << " sleep for "
-                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
-    utime_t t;
-    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
-    t.sleep();
-  }
+  inject_delay();
 
   lock.Lock();
   if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
@@ -1796,12 +1825,12 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
  replace:
   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
-  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
-    ldout(msgr->cct, 10) << __func__ << " sleep for "
-                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
-    utime_t t;
-    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
-    t.sleep();
+  inject_delay();
+  if (existing->delay_thread) {
+    existing->delay_thread->steal_for_pipe(this);
+    delay_thread = existing->delay_thread;
+    existing->delay_thread = NULL;
+    delay_thread->flush();
   }
 
   if (existing->policy.lossy) {
@@ -1911,14 +1940,8 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   // it's safe that here we don't acquire Connection's lock
   r = async_msgr->accept_conn(this);
 
-  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
-    ldout(msgr->cct, 10) << __func__ << " sleep for "
-                         << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
-    utime_t t;
-    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
-    t.sleep();
-  }
-
+  inject_delay();
+  
   lock.Lock();
   replacing = false;
   if (r < 0) {
@@ -1953,15 +1976,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
 
  fail_registered:
   ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
-
-  if (async_msgr->cct->_conf->ms_inject_internal_delays) {
-    ldout(async_msgr->cct, 10) << __func__ << " sleep for "
-                               << async_msgr->cct->_conf->ms_inject_internal_delays
-                               << dendl;
-    utime_t t;
-    t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
-    t.sleep();
-  }
+  inject_delay();
 
  fail:
   ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
@@ -2147,6 +2162,8 @@ void AsyncConnection::fault()
     ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
     center->dispatch_event_external(reset_handler);
     _stop();
+    if (delay_thread)
+      delay_thread->discard();
     return ;
   }
 
@@ -2160,6 +2177,9 @@ void AsyncConnection::fault()
   can_write = WriteStatus::NOWRITE;
   open_write = false;
 
+  // queue delayed items immediately
+  if (delay_thread)
+    delay_thread->flush();
   // requeue sent items
   requeue_sent();
   recv_start = recv_end = 0;
@@ -2219,6 +2239,8 @@ void AsyncConnection::was_session_reset()
   ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
   assert(lock.is_locked());
   Mutex::Locker l(write_lock);
+  if (delay_thread)
+    delay_thread->discard();
   discard_out_queue();
 
   center->dispatch_event_external(remote_reset_handler);
@@ -2238,6 +2260,14 @@ void AsyncConnection::was_session_reset()
 void AsyncConnection::_stop()
 {
   assert(lock.is_locked());
+  if (delay_thread && delay_thread->is_started()) {
+    ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
+    if (delay_thread->is_flushing()) {
+      delay_thread->wait_for_flush();
+    }
+    delay_thread->stop();
+    delay_thread->join();
+  }
   if (state == STATE_CLOSED)
     return ;
 
@@ -2263,6 +2293,7 @@ void AsyncConnection::_stop()
     center->delete_time_event(*it);
   // Make sure in-queue events will been processed
   center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
+
 }
 
 void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
@@ -2402,6 +2433,77 @@ void AsyncConnection::handle_ack(uint64_t seq)
   }
 }
 
+void AsyncConnection::DelayedDelivery::discard()
+{
+  Mutex::Locker l(delay_lock);
+  while (!delay_queue.empty()) {
+    Message *m = delay_queue.front().second;
+    // TODO: what to use here?
+    //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
+    m->put();
+    delay_queue.pop_front();
+  }
+}
+
+void AsyncConnection::DelayedDelivery::flush()
+{
+  Mutex::Locker l(delay_lock);
+  flush_count = delay_queue.size();
+  delay_cond.Signal();
+}
+
+void *AsyncConnection::DelayedDelivery::entry()
+{
+  Mutex::Locker locker(delay_lock);
+
+  while (!stop_delayed_delivery) {
+    if (delay_queue.empty()) {
+      delay_cond.Wait(delay_lock);
+      continue;
+    }
+    utime_t release = delay_queue.front().first;
+    Message *m = delay_queue.front().second;
+    string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type;
+    if (!flush_count &&
+        (release > ceph_clock_now(connection->async_msgr->cct) &&
+         (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
+      delay_cond.WaitUntil(delay_lock, release);
+      continue;
+    }
+    delay_queue.pop_front();
+    if (flush_count > 0) {
+      --flush_count;
+      active_flush = true;
+    }
+    if (connection->async_msgr->ms_can_fast_dispatch(m)) {
+      if (!stop_fast_dispatching_flag) {
+        delay_dispatching = true;
+        delay_lock.Unlock();
+        connection->async_msgr->ms_fast_dispatch(m);
+        delay_lock.Lock();
+        delay_dispatching = false;
+        if (stop_fast_dispatching_flag) {
+          // we need to let the stopping thread proceed
+          delay_cond.Signal();
+          delay_lock.Unlock();
+          delay_lock.Lock();
+        }
+      }
+    } else {
+      connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m)));
+    }
+    active_flush = false;
+  }
+  return NULL;
+}
+
+void AsyncConnection::DelayedDelivery::stop_fast_dispatching() {
+  Mutex::Locker l(delay_lock);
+  stop_fast_dispatching_flag = true;
+  while (delay_dispatching)
+    delay_cond.Wait(delay_lock);
+}
+
 void AsyncConnection::send_keepalive()
 {
   ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
index 28cf751a4123cb5d6678abf82ec7e542a1ec9fcb..c141c7b597aef23de57f84cec398ea292bc28d05 100644 (file)
@@ -77,6 +77,7 @@ class AsyncConnection : public Connection {
   void handle_ack(uint64_t seq);
   void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
   ssize_t write_message(Message *m, bufferlist& bl, bool more);
+  void inject_delay();
   ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
                     bufferlist &authorizer_reply) {
     bufferlist reply_bl;
@@ -88,8 +89,10 @@ class AsyncConnection : public Connection {
       reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
     }
     ssize_t r = try_send(reply_bl);
-    if (r < 0)
+    if (r < 0) {
+      inject_delay();
       return -1;
+    }
 
     state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
     return 0;
@@ -123,10 +126,77 @@ class AsyncConnection : public Connection {
     assert(write_lock.is_locked());
     return !out_q.empty();
   }
+  
+   /**
+   * The DelayedDelivery is for injecting delays into Message delivery off
+   * the socket. It is only enabled if delays are requested, and if they
+   * are then it pulls Messages off the DelayQueue and puts them into the
+   * AsyncMessenger event queue.
+   * This is a nearly direct copy & paste from SimpleMessenger, and as
+   * such, there was a problem during AsyncConnection shutdown, fixed by
+   * checking whether delay_thread isn't already stopped and refraining
+   * from stopping it again.
+   */
+  class DelayedDelivery: public Thread {
+    AsyncConnection *connection;
+    std::deque< pair<utime_t,Message*> > delay_queue;
+    Mutex delay_lock;
+    Cond delay_cond;
+    int flush_count;
+    AsyncMessenger *msgr;
+    bool active_flush;
+    bool stop_delayed_delivery;
+    bool delay_dispatching; // we are in fast dispatch now
+    bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+
+    public:
+    explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr)
+      : connection(p),
+        delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0),
+        msgr(omsgr),
+        active_flush(false),
+        stop_delayed_delivery(false),
+        delay_dispatching(false),
+        stop_fast_dispatching_flag(false) { }
+    ~DelayedDelivery() { discard(); }
+    void *entry();
+    void queue(utime_t release, Message *m) {
+      Mutex::Locker l(delay_lock);
+      delay_queue.push_back(make_pair(release, m));
+      delay_cond.Signal();
+    }
+    void discard();
+    void flush();
+    bool is_flushing() {
+      Mutex::Locker l(delay_lock);
+      return flush_count > 0 || active_flush;
+    }
+    void wait_for_flush() {
+      Mutex::Locker l(delay_lock);
+      while (flush_count > 0 || active_flush)
+        delay_cond.Wait(delay_lock);
+    }
+    void stop() {
+      delay_lock.Lock();
+      stop_delayed_delivery = true;
+      delay_cond.Signal();
+      delay_lock.Unlock();
+    }
+    void steal_for_pipe(AsyncConnection *new_owner) {
+      Mutex::Locker l(delay_lock);
+      connection = new_owner;
+    }
+    /**
+     * We need to stop fast dispatching before we need to stop putting
+     * normal messages into the DispatchQueue.
+     */
+    void stop_fast_dispatching();
+  } *delay_thread;
 
  public:
   AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
   ~AsyncConnection();
+  void maybe_start_delay_thread();
 
   ostream& _conn_prefix(std::ostream *_dout);