]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncConnection: add tick timer
authorHaomai Wang <haomai@xsky.com>
Tue, 24 May 2016 17:22:47 +0000 (01:22 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 29 Jun 2016 04:14:04 +0000 (12:14 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h

index 918d36b70d9097896cabfce5434dc02bf619754b..366bdc6f4df52929d8a6087d9643ec96a693615f 100644 (file)
@@ -93,7 +93,7 @@ class C_tick_wakeup : public EventCallback {
  public:
   explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
   void do_request(int fd_or_id) {
-    conn->tick();
+    conn->tick(fd_or_id);
   }
 };
 
@@ -126,7 +126,9 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu
     dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
     open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
     recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
-    recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()),
+    recv_start(0), recv_end(0),
+    last_active(ceph::coarse_mono_clock::now()),
+    inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
     got_bad_auth(false), authorizer(NULL), replacing(false),
     is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
 {
@@ -1304,6 +1306,10 @@ ssize_t AsyncConnection::_process_connection()
         dispatch_queue->queue_connect(this);
         async_msgr->ms_deliver_handle_fast_connect(this);
 
+        // make sure no pending tick timer
+        center->delete_time_event(last_tick_id);
+        last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+
         // message may in queue between last _try_send and connection ready
         // write event may already notify and we need to force scheduler again
         write_lock.Lock();
@@ -1471,6 +1477,11 @@ ssize_t AsyncConnection::_process_connection()
         ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
         state = STATE_OPEN;
         memset(&connect_msg, 0, sizeof(connect_msg));
+
+        // make sure no pending tick timer
+        center->delete_time_event(last_tick_id);
+        last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+
         write_lock.Lock();
         can_write = WriteStatus::CANWRITE;
         if (is_queued())
@@ -2601,15 +2612,20 @@ void AsyncConnection::wakeup_from(uint64_t id)
   process();
 }
 
-void AsyncConnection::tick()
+void AsyncConnection::tick(uint64_t id)
 {
-  Mutex::Locker l(lock);
   auto now = ceph::coarse_mono_clock::now();
-  auto idle_period = std::chrono::duration_cast<std::chrono::seconds>(now - last_active).count();
-  if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) {
+  ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
+                             << " last_active" << last_active << dendl;
+  assert(last_tick_id == id);
+  Mutex::Locker l(lock);
+  auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
+  if (inactive_timeout_us < (uint64_t)idle_period) {
     ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
-                              << async_msgr->cct->_conf->ms_tcp_read_timeout
-                              << ", mark self fault." << dendl;
+                              << inactive_timeout_us
+                              << " us, mark self fault." << dendl;
     fault();
+  } else if (is_connected()) {
+    last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
   }
 }
index f583face7cd31e2e092b57bb2f0f2be23b2a7432..f9867a572f3362d94afad463b8ec1087241f79da 100644 (file)
@@ -321,6 +321,8 @@ class AsyncConnection : public Connection {
   uint32_t recv_end;
   set<uint64_t> register_time_events; // need to delete it if stop
   ceph::coarse_mono_clock::time_point last_active;
+  uint64_t last_tick_id = 0;
+  const uint64_t inactive_timeout_us;
 
   // Tis section are temp variables used by state transition
 
@@ -369,7 +371,7 @@ class AsyncConnection : public Connection {
   void handle_write();
   void process();
   void wakeup_from(uint64_t id);
-  void tick();
+  void tick(uint64_t id);
   void local_deliver();
   void stop(bool queue_reset) {
     lock.Lock();