]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: remove useless pararmeter in DelayedDelivery.
authorJianpeng Ma <jianpeng.ma@intel.com>
Fri, 27 Oct 2017 11:11:36 +0000 (19:11 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Fri, 27 Oct 2017 11:11:36 +0000 (19:11 +0800)
Now DelayedDelivery is a timer rather than a thread. So there is no case
be called ahead.

Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index cf2f7517b45ee50dae2a09667fe8d2ff7abb51c6..a2d4fc15bc7d5fdd9451ece1d9fec16f654a5685 100644 (file)
@@ -745,15 +745,13 @@ void AsyncConnection::process()
           auto fast_dispatch_time = ceph::mono_clock::now();
           logger->tinc(l_msgr_running_recv_time, fast_dispatch_time - recv_start_time);
           if (delay_state) {
-            utime_t release = message->get_recv_stamp();
             double delay_period = 0;
             if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
               delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
-              release += delay_period;
-              ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
-                                        << message << " " << *message << dendl;
+              ldout(async_msgr->cct, 1) << "queue_received will delay after " << (ceph_clock_now() + delay_period)
+                                       << " on " << message << " " << *message << dendl;
             }
-            delay_state->queue(delay_period, release, message);
+            delay_state->queue(delay_period, message);
           } else if (async_msgr->ms_can_fast_dispatch(message)) {
             lock.unlock();
             dispatch_queue->fast_dispatch(message);
@@ -2288,15 +2286,7 @@ void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
       return ;
     if (delay_queue.empty())
       return ;
-    utime_t release = delay_queue.front().first;
-    m = delay_queue.front().second;
-    string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
-    utime_t now = ceph_clock_now();
-    if ((release > now &&
-        (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
-      utime_t t = release - now;
-      t.sleep();
-    }
+    m = delay_queue.front();
     delay_queue.pop_front();
   }
   if (msgr->ms_can_fast_dispatch(m)) {
@@ -2312,7 +2302,7 @@ void AsyncConnection::DelayedDelivery::flush() {
       center->get_id(), [this] () mutable {
     std::lock_guard<std::mutex> l(delay_lock);
     while (!delay_queue.empty()) {
-      Message *m = delay_queue.front().second;
+      Message *m = delay_queue.front();
       if (msgr->ms_can_fast_dispatch(m)) {
         dispatch_queue->fast_dispatch(m);
       } else {
index 89606b005f4eea37e9c0a204ddd92ebe41c0e04b..4ad5ddbed11fc65bb01aaab87ca15c2554238523 100644 (file)
@@ -139,7 +139,7 @@ class AsyncConnection : public Connection {
    */
   class DelayedDelivery : public EventCallback {
     std::set<uint64_t> register_time_events; // need to delete it if stop
-    std::deque<std::pair<utime_t, Message*> > delay_queue;
+    std::deque<Message*> delay_queue;
     std::mutex delay_lock;
     AsyncMessenger *msgr;
     EventCenter *center;
@@ -158,9 +158,9 @@ class AsyncConnection : public Connection {
     }
     void set_center(EventCenter *c) { center = c; }
     void do_request(uint64_t id) override;
-    void queue(double delay_period, utime_t release, Message *m) {
+    void queue(double delay_period, Message *m) {
       std::lock_guard<std::mutex> l(delay_lock);
-      delay_queue.push_back(std::make_pair(release, m));
+      delay_queue.push_back(m);
       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
     void discard() {
@@ -168,7 +168,7 @@ class AsyncConnection : public Connection {
       center->submit_to(center->get_id(), [this] () mutable {
         std::lock_guard<std::mutex> l(delay_lock);
         while (!delay_queue.empty()) {
-          Message *m = delay_queue.front().second;
+          Message *m = delay_queue.front();
           dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
           m->put();
           delay_queue.pop_front();