]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: cleanup dead connection if toomuch or too long
authorHaomai Wang <haomai@xsky.com>
Fri, 8 Jan 2016 03:47:23 +0000 (11:47 +0800)
committerHaomai Wang <haomai@xsky.com>
Wed, 13 Jan 2016 15:24:10 +0000 (23:24 +0800)
Signed-off-by: Haomai Wang <haomai@xsky.com>
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.h

index d1c6245234437ab101939fe733e5d1e2523007da..35f688bd34615e4b0fa9ab0e0c8ede7bf398003c 100644 (file)
@@ -378,15 +378,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
     processor(this, cct, _nonce),
     lock("AsyncMessenger::lock"),
     nonce(_nonce), need_addr(true), listen_sd(-1), did_bind(false),
-    global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
+    global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), reap_time_fd(0),
     cluster_protocol(0), stopped(true)
 {
   ceph_spin_init(&global_seq_lock);
   cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
-  Worker *w = pool->get_worker();
-  local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
+  local_worker = pool->get_worker();
+  local_connection = new AsyncConnection(cct, this, &local_worker->center, local_worker->get_perf_counter());
   local_features = features;
   init_local_connection();
+  reap_handler = new C_handle_reap(this);
 }
 
 /**
@@ -395,6 +396,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
  */
 AsyncMessenger::~AsyncMessenger()
 {
+  delete reap_handler;
   assert(!did_bind); // either we didn't bind or we shut down the Processor
   local_connection->mark_down();
 }
@@ -737,13 +739,14 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
   lock.Unlock();
 }
 
-int AsyncMessenger::reap_dead(int max)
+int AsyncMessenger::reap_dead(bool is_timer)
 {
   int num;
+
   Mutex::Locker l(lock);
   Mutex::Locker l(deleted_lock);
 
-  while (!deleted_conns.empty() && num < max) {
+  while (!deleted_conns.empty()) {
     set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
     AsyncConnectionRef p = *it;
     ldout(cct, 5) << __func__ << " delete " << p << dendl;
@@ -752,5 +755,8 @@ int AsyncMessenger::reap_dead(int max)
     ++num;
   }
 
+  if (is_timer)
+    reap_time_fd = 0;
+
   return num;
 }
index 5ff1aba98a8a303c4fe18f648a4ce91c502e58f2..7fb702fec35fda12ca32b897adcae2efa95a8483 100644 (file)
@@ -321,11 +321,27 @@ private:
   int _send_message(Message *m, const entity_inst_t& dest);
 
  private:
+  static const uint64_t ReapDeadConnectionThreshold = 10;
+  static const uint64_t ReapDeadConnectionMaxPeriod = 30*1000*1000;
+
   WorkerPool *pool;
 
   Processor processor;
   friend class Processor;
 
+  class C_handle_reap : public EventCallback {
+    AsyncMessenger *msgr;
+
+   public:
+    C_handle_reap(AsyncMessenger *m): msgr(m) {}
+    void do_request(int id) {
+      // judge whether is a time event
+      msgr->reap_dead(id ? id : 0);
+    }
+  };
+  // the worker run messenger's cron jobs
+  Worker *local_worker;
+
   /// overall lock used for AsyncMessenger data structures
   Mutex lock;
   // AsyncMessenger stuff
@@ -382,6 +398,9 @@ private:
   Mutex deleted_lock;
   set<AsyncConnectionRef> deleted_conns;
 
+  EventCallbackRef reap_handler;
+  uint64_t reap_time_fd;
+
   /// internal cluster protocol version, if any, for talking to entities of the same type.
   int cluster_protocol;
 
@@ -509,6 +528,13 @@ public:
   void unregister_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(deleted_lock);
     deleted_conns.insert(conn);
+
+    if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
+      local_worker->dispatch_event_external(reap_handler);
+    } else if (!reap_time_fd) {
+      reap_time_fd = local_worker->create_time_event(
+          ReapDeadConnectionMaxPeriod, reap_handler);
+    }
   }
 
   /**
@@ -518,7 +544,7 @@ public:
    *
    * See "deleted_conns"
    */
-  int reap_dead(int max);
+  int reap_dead(bool is_timer);
 
   /**
    * @} // AsyncMessenger Internals
index 7e0a195bbb4c72a59d9435567da74ccda8ff03eb..2575130305b03d74492e5d751e929644278f0e9c 100644 (file)
@@ -133,7 +133,7 @@ class EventCenter {
     file_lock("AsyncMessenger::file_lock"),
     time_lock("AsyncMessenger::time_lock"),
     file_events(NULL),
-    driver(NULL), time_event_next_id(0),
+    driver(NULL), time_event_next_id(1),
     notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
     last_time = time(NULL);
   }