]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: make delay message happen within original thread
authorHaomai Wang <haomai@xsky.com>
Fri, 15 Apr 2016 03:43:42 +0000 (11:43 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 3 May 2016 12:11:14 +0000 (20:11 +0800)
Fixes: http://tracker.ceph.com/issues/15503
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 07204783a1319a09888872b993935fb19b231342..0d7ca05d5d29be4031beac201550ba25b36fc023 100644 (file)
@@ -176,7 +176,7 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
 }
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
-  : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), 
+  : Connection(cct, m), delay_state(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0), 
     peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1),
     port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
@@ -207,17 +207,15 @@ AsyncConnection::~AsyncConnection()
     delete[] recv_buf;
   if (state_buffer)
     delete[] state_buffer;
-  if (delay_thread) 
-    delete delay_thread;
+  assert(!delay_state);
 }
 
 void AsyncConnection::maybe_start_delay_thread()
 {
-  if (!delay_thread &&
+  if (!delay_state &&
       async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
-    lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl;
-    delay_thread = new DelayedDelivery(this, async_msgr);
-    delay_thread->create("ms_async_delay");
+    ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
+    delay_state = new DelayedDelivery(async_msgr, center);
   }
 }
 
@@ -932,15 +930,20 @@ void AsyncConnection::process()
 
           state = STATE_OPEN;
 
+          logger->inc(l_msgr_recv_messages);
+          logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+
           async_msgr->ms_fast_preprocess(message);
-          if (delay_thread) {
-            utime_t release;
+          if (delay_state) {
+            utime_t release = message->get_recv_stamp();
+            double delay_period = 0;
             if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
-              release = message->get_recv_stamp();
-              release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
-              ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl;
+              delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+              release += delay_period;
+              ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
+                                        << message << " " << *message << dendl;
             }
-            delay_thread->queue(release, message);
+            delay_state->queue(delay_period, release, message);
           } else if (async_msgr->ms_can_fast_dispatch(message)) {
             lock.Unlock();
             async_msgr->ms_fast_dispatch(message);
@@ -948,9 +951,6 @@ void AsyncConnection::process()
           } else {
             center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
           }
-          logger->inc(l_msgr_recv_messages);
-          logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
-
           break;
         }
 
@@ -1826,13 +1826,6 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
   ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
 
   inject_delay();
-  if (existing->delay_thread) {
-    existing->delay_thread->steal_for_pipe(this);
-    delay_thread = existing->delay_thread;
-    existing->delay_thread = NULL;
-    delay_thread->flush();
-  }
-
   if (existing->policy.lossy) {
     // disconnect from the Connection
     existing->center->dispatch_event_external(existing->reset_handler);
@@ -1862,6 +1855,10 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis
 
     // Clean up output buffer
     existing->outcoming_bl.clear();
+    if (existing->delay_state) {
+      existing->delay_state->flush();
+      assert(!delay_state);
+    }
     existing->requeue_sent();
 
     swap(existing->sd, sd);
@@ -2162,8 +2159,6 @@ void AsyncConnection::fault()
     ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
     center->dispatch_event_external(reset_handler);
     _stop();
-    if (delay_thread)
-      delay_thread->discard();
     return ;
   }
 
@@ -2178,8 +2173,8 @@ void AsyncConnection::fault()
   open_write = false;
 
   // queue delayed items immediately
-  if (delay_thread)
-    delay_thread->flush();
+  if (delay_state)
+    delay_state->flush();
   // requeue sent items
   requeue_sent();
   recv_start = recv_end = 0;
@@ -2239,8 +2234,8 @@ void AsyncConnection::was_session_reset()
   ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
   assert(lock.is_locked());
   Mutex::Locker l(write_lock);
-  if (delay_thread)
-    delay_thread->discard();
+  if (delay_state)
+    delay_state->discard();
   discard_out_queue();
 
   center->dispatch_event_external(remote_reset_handler);
@@ -2260,17 +2255,12 @@ void AsyncConnection::was_session_reset()
 void AsyncConnection::_stop()
 {
   assert(lock.is_locked());
-  if (delay_thread && delay_thread->is_started()) {
-    ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
-    if (delay_thread->is_flushing()) {
-      delay_thread->wait_for_flush();
-    }
-    delay_thread->stop();
-    delay_thread->join();
-  }
   if (state == STATE_CLOSED)
     return ;
 
+  if (delay_state)
+    delay_state->flush();
+
   ldout(async_msgr->cct, 1) << __func__ << dendl;
   Mutex::Locker l(write_lock);
   if (sd >= 0)
@@ -2433,75 +2423,57 @@ void AsyncConnection::handle_ack(uint64_t seq)
   }
 }
 
-void AsyncConnection::DelayedDelivery::discard()
+void AsyncConnection::DelayedDelivery::do_request(int id)
 {
-  Mutex::Locker l(delay_lock);
-  while (!delay_queue.empty()) {
-    Message *m = delay_queue.front().second;
-    // TODO: what to use here?
-    //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
-    m->put();
+  Message *m = nullptr;
+  {
+    Mutex::Locker l(delay_lock);
+    register_time_events.erase(id);
+    if (delay_queue.empty())
+      return ;
+    utime_t release = delay_queue.front().first;
+    m = delay_queue.front().second;
+    string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
+    utime_t now = ceph_clock_now(msgr->cct);
+    if ((release > now &&
+        (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
+      utime_t t = release - now;
+      t.sleep();
+    }
     delay_queue.pop_front();
   }
+  if (msgr->ms_can_fast_dispatch(m)) {
+    msgr->ms_fast_dispatch(m);
+  } else {
+    msgr->ms_deliver_dispatch(m);
+  }
 }
 
-void AsyncConnection::DelayedDelivery::flush()
-{
-  Mutex::Locker l(delay_lock);
-  flush_count = delay_queue.size();
-  delay_cond.Signal();
-}
-
-void *AsyncConnection::DelayedDelivery::entry()
-{
-  Mutex::Locker locker(delay_lock);
-
-  while (!stop_delayed_delivery) {
-    if (delay_queue.empty()) {
-      delay_cond.Wait(delay_lock);
-      continue;
-    }
-    utime_t release = delay_queue.front().first;
-    Message *m = delay_queue.front().second;
-    string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type;
-    if (!flush_count &&
-        (release > ceph_clock_now(connection->async_msgr->cct) &&
-         (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
-      delay_cond.WaitUntil(delay_lock, release);
-      continue;
-    }
-    delay_queue.pop_front();
-    if (flush_count > 0) {
-      --flush_count;
-      active_flush = true;
-    }
-    if (connection->async_msgr->ms_can_fast_dispatch(m)) {
-      if (!stop_fast_dispatching_flag) {
-        delay_dispatching = true;
-        delay_lock.Unlock();
-        connection->async_msgr->ms_fast_dispatch(m);
-        delay_lock.Lock();
-        delay_dispatching = false;
-        if (stop_fast_dispatching_flag) {
-          // we need to let the stopping thread proceed
-          delay_cond.Signal();
-          delay_lock.Unlock();
-          delay_lock.Lock();
-        }
+class C_flush_messages : public EventCallback {
+  std::deque<std::pair<utime_t, Message*> > delay_queue;
+  AsyncMessenger *msgr;
+ public:
+  C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {}
+  void do_request(int id) {
+    while (!delay_queue.empty()) {
+      Message *m = delay_queue.front().second;
+      if (msgr->ms_can_fast_dispatch(m)) {
+        msgr->ms_fast_dispatch(m);
+      } else {
+        msgr->ms_deliver_dispatch(m);
       }
-    } else {
-      connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m)));
+      delay_queue.pop_front();
     }
-    active_flush = false;
+    delete this;
   }
-  return NULL;
-}
+};
 
-void AsyncConnection::DelayedDelivery::stop_fast_dispatching() {
+void AsyncConnection::DelayedDelivery::flush() {
   Mutex::Locker l(delay_lock);
-  stop_fast_dispatching_flag = true;
-  while (delay_dispatching)
-    delay_cond.Wait(delay_lock);
+  center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr));
+  for (auto i : register_time_events)
+    center->delete_time_event(i);
+  register_time_events.clear();
 }
 
 void AsyncConnection::send_keepalive()
index c141c7b597aef23de57f84cec398ea292bc28d05..23577db36b4b9aaec51545261f0f919b741695a5 100644 (file)
@@ -126,72 +126,47 @@ class AsyncConnection : public Connection {
     assert(write_lock.is_locked());
     return !out_q.empty();
   }
-  
+
    /**
    * The DelayedDelivery is for injecting delays into Message delivery off
    * the socket. It is only enabled if delays are requested, and if they
    * are then it pulls Messages off the DelayQueue and puts them into the
    * AsyncMessenger event queue.
-   * This is a nearly direct copy & paste from SimpleMessenger, and as
-   * such, there was a problem during AsyncConnection shutdown, fixed by
-   * checking whether delay_thread isn't already stopped and refraining
-   * from stopping it again.
    */
-  class DelayedDelivery: public Thread {
-    AsyncConnection *connection;
-    std::deque< pair<utime_t,Message*> > delay_queue;
+  class DelayedDelivery : public EventCallback {
+    std::set<uint64_t> register_time_events; // need to delete it if stop
+    std::deque<std::pair<utime_t, Message*> > delay_queue;
     Mutex delay_lock;
-    Cond delay_cond;
-    int flush_count;
     AsyncMessenger *msgr;
-    bool active_flush;
-    bool stop_delayed_delivery;
-    bool delay_dispatching; // we are in fast dispatch now
-    bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+    EventCenter *center;
 
-    public:
-    explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr)
-      : connection(p),
-        delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0),
-        msgr(omsgr),
-        active_flush(false),
-        stop_delayed_delivery(false),
-        delay_dispatching(false),
-        stop_fast_dispatching_flag(false) { }
-    ~DelayedDelivery() { discard(); }
-    void *entry();
-    void queue(utime_t release, Message *m) {
-      Mutex::Locker l(delay_lock);
-      delay_queue.push_back(make_pair(release, m));
-      delay_cond.Signal();
+   public:
+    explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c)
+      : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
+        msgr(omsgr), center(c) { }
+    ~DelayedDelivery() {
+      assert(register_time_events.empty());
+      assert(delay_queue.empty());
     }
-    void discard();
-    void flush();
-    bool is_flushing() {
+    void do_request(int id) override;
+    void queue(double delay_period, utime_t release, Message *m) {
       Mutex::Locker l(delay_lock);
-      return flush_count > 0 || active_flush;
+      delay_queue.push_back(std::make_pair(release, m));
+      register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
-    void wait_for_flush() {
+    void discard() {
       Mutex::Locker l(delay_lock);
-      while (flush_count > 0 || active_flush)
-        delay_cond.Wait(delay_lock);
-    }
-    void stop() {
-      delay_lock.Lock();
-      stop_delayed_delivery = true;
-      delay_cond.Signal();
-      delay_lock.Unlock();
-    }
-    void steal_for_pipe(AsyncConnection *new_owner) {
-      Mutex::Locker l(delay_lock);
-      connection = new_owner;
+      while (!delay_queue.empty()) {
+        Message *m = delay_queue.front().second;
+        m->put();
+        delay_queue.pop_front();
+      }
+      for (auto i : register_time_events)
+        center->delete_time_event(i);
+      register_time_events.clear();
     }
-    /**
-     * We need to stop fast dispatching before we need to stop putting
-     * normal messages into the DispatchQueue.
-     */
-    void stop_fast_dispatching();
-  } *delay_thread;
+    void flush();
+  } *delay_state;
 
  public:
   AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
@@ -401,6 +376,10 @@ class AsyncConnection : public Connection {
     delete connect_handler;
     delete local_deliver_handler;
     delete wakeup_handler;
+    if (delay_state) {
+      delete delay_state;
+      delay_state = NULL;
+    }
   }
   PerfCounters *get_perf_counter() {
     return logger;