]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: ensure delay_state empty when building a new session
authorHaomai Wang <haomai@xsky.com>
Thu, 26 May 2016 02:25:16 +0000 (10:25 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 04:14:04 +0000 (12:14 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 4f8c0e3157a1291c05e15c3be8833b455eaceb82..f002be603a163a013a58d9707834c62c4c49616f 100644 (file)
@@ -1303,6 +1303,8 @@ ssize_t AsyncConnection::_process_connection()
           session_security.reset();
         }
 
+        if (delay_state)
+          assert(delay_state->ready());
         dispatch_queue->queue_connect(this);
         async_msgr->ms_deliver_handle_fast_connect(this);
 
@@ -1478,6 +1480,8 @@ ssize_t AsyncConnection::_process_connection()
         state = STATE_OPEN;
         memset(&connect_msg, 0, sizeof(connect_msg));
 
+        if (delay_state)
+          assert(delay_state->ready());
         // make sure no pending tick timer
         center->delete_time_event(last_tick_id);
         last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
@@ -2427,6 +2431,8 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
   {
     Mutex::Locker l(delay_lock);
     register_time_events.erase(id);
+    if (stop_dispatch)
+      return ;
     if (delay_queue.empty())
       return ;
     utime_t release = delay_queue.front().first;
@@ -2448,6 +2454,7 @@ void AsyncConnection::DelayedDelivery::do_request(int id)
 }
 
 void AsyncConnection::DelayedDelivery::flush() {
+  stop_dispatch = true;
   EventCenter::submit_to(
       center->get_id(), [this] () mutable {
     Mutex::Locker l(delay_lock);
@@ -2463,6 +2470,7 @@ void AsyncConnection::DelayedDelivery::flush() {
     for (auto i : register_time_events)
       center->delete_time_event(i);
     register_time_events.clear();
+    stop_dispatch = false;
   }, true);
 }
 
index 1a458c873e18bcb116872398581fd2cc32163e47..8bef56c38a05bb4295bf22850a1187df245d2814 100644 (file)
@@ -143,12 +143,14 @@ class AsyncConnection : public Connection {
     EventCenter *center;
     DispatchQueue *dispatch_queue;
     uint64_t conn_id;
+    std::atomic_bool stop_dispatch;
 
    public:
     explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
                              DispatchQueue *q, uint64_t cid)
       : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
-        msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid) { }
+        msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
+        stop_dispatch(false) { }
     ~DelayedDelivery() {
       assert(register_time_events.empty());
       assert(delay_queue.empty());
@@ -160,6 +162,7 @@ class AsyncConnection : public Connection {
       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
     }
     void discard() {
+      stop_dispatch = true;
       EventCenter::submit_to(center->get_id(), [this] () mutable {
         Mutex::Locker l(delay_lock);
         while (!delay_queue.empty()) {
@@ -171,8 +174,10 @@ class AsyncConnection : public Connection {
         for (auto i : register_time_events)
           center->delete_time_event(i);
         register_time_events.clear();
+        stop_dispatch = false;
       }, true);
     }
+    bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
     void flush();
   } *delay_state;