From: Haomai Wang Date: Fri, 8 Jan 2016 03:47:23 +0000 (+0800) Subject: AsyncMessenger: cleanup dead connection if toomuch or too long X-Git-Tag: v10.0.3~29^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8c589340ce63792232f1bcec3aeb7ef134218591;p=ceph.git AsyncMessenger: cleanup dead connection if toomuch or too long Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index d1c624523443..35f688bd3461 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -378,15 +378,16 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, processor(this, cct, _nonce), lock("AsyncMessenger::lock"), nonce(_nonce), need_addr(true), listen_sd(-1), did_bind(false), - global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), + global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), reap_time_fd(0), cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); cct->lookup_or_create_singleton_object(pool, WorkerPool::name); - Worker *w = pool->get_worker(); - local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter()); + local_worker = pool->get_worker(); + local_connection = new AsyncConnection(cct, this, &local_worker->center, local_worker->get_perf_counter()); local_features = features; init_local_connection(); + reap_handler = new C_handle_reap(this); } /** @@ -395,6 +396,7 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, */ AsyncMessenger::~AsyncMessenger() { + delete reap_handler; assert(!did_bind); // either we didn't bind or we shut down the Processor local_connection->mark_down(); } @@ -737,13 +739,14 @@ void AsyncMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) lock.Unlock(); } -int AsyncMessenger::reap_dead(int max) +int AsyncMessenger::reap_dead(bool is_timer) { int num; + Mutex::Locker l(lock); Mutex::Locker l(deleted_lock); - while (!deleted_conns.empty() && num < max) { + while (!deleted_conns.empty()) { set::iterator it = deleted_conns.begin(); AsyncConnectionRef p = *it; ldout(cct, 5) << __func__ << " delete " << p << dendl; @@ -752,5 +755,8 @@ int AsyncMessenger::reap_dead(int max) ++num; } + if (is_timer) + reap_time_fd = 0; + return num; } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 5ff1aba98a8a..7fb702fec35f 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -321,11 +321,27 @@ private: int _send_message(Message *m, const entity_inst_t& dest); private: + static const uint64_t ReapDeadConnectionThreshold = 10; + static const uint64_t ReapDeadConnectionMaxPeriod = 30*1000*1000; + WorkerPool *pool; Processor processor; friend class Processor; + class C_handle_reap : public EventCallback { + AsyncMessenger *msgr; + + public: + C_handle_reap(AsyncMessenger *m): msgr(m) {} + void do_request(int id) { + // judge whether is a time event + msgr->reap_dead(id ? id : 0); + } + }; + // the worker run messenger's cron jobs + Worker *local_worker; + /// overall lock used for AsyncMessenger data structures Mutex lock; // AsyncMessenger stuff @@ -382,6 +398,9 @@ private: Mutex deleted_lock; set deleted_conns; + EventCallbackRef reap_handler; + uint64_t reap_time_fd; + /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; @@ -509,6 +528,13 @@ public: void unregister_conn(AsyncConnectionRef conn) { Mutex::Locker l(deleted_lock); deleted_conns.insert(conn); + + if (deleted_conns.size() >= ReapDeadConnectionThreshold) { + local_worker->dispatch_event_external(reap_handler); + } else if (!reap_time_fd) { + reap_time_fd = local_worker->create_time_event( + ReapDeadConnectionMaxPeriod, reap_handler); + } } /** @@ -518,7 +544,7 @@ public: * * See "deleted_conns" */ - int reap_dead(int max); + int reap_dead(bool is_timer); /** * @} // AsyncMessenger Internals diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index 7e0a195bbb4c..2575130305b0 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -133,7 +133,7 @@ class EventCenter { file_lock("AsyncMessenger::file_lock"), time_lock("AsyncMessenger::time_lock"), file_events(NULL), - driver(NULL), time_event_next_id(0), + driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) { last_time = time(NULL); }