processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), listen_sd(-1), did_bind(false),
- global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
+ global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), reap_time_fd(0),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
cct->lookup_or_create_singleton_object<WorkerPool>(pool, WorkerPool::name);
- Worker *w = pool->get_worker();
- local_connection = new AsyncConnection(cct, this, &w->center, w->get_perf_counter());
+ local_worker = pool->get_worker();
+ local_connection = new AsyncConnection(cct, this, &local_worker->center, local_worker->get_perf_counter());
local_features = features;
init_local_connection();
+ reap_handler = new C_handle_reap(this);
}
/**
*/
AsyncMessenger::~AsyncMessenger()
{
+ delete reap_handler;
assert(!did_bind); // either we didn't bind or we shut down the Processor
local_connection->mark_down();
}
lock.Unlock();
}
-int AsyncMessenger::reap_dead(int max)
+int AsyncMessenger::reap_dead(bool is_timer)
{
int num;
+
Mutex::Locker l(lock);
Mutex::Locker l(deleted_lock);
- while (!deleted_conns.empty() && num < max) {
+ while (!deleted_conns.empty()) {
set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
++num;
}
+ if (is_timer)
+ reap_time_fd = 0;
+
return num;
}
int _send_message(Message *m, const entity_inst_t& dest);
private:
+ static const uint64_t ReapDeadConnectionThreshold = 10;
+ static const uint64_t ReapDeadConnectionMaxPeriod = 30*1000*1000;
+
WorkerPool *pool;
Processor processor;
friend class Processor;
+ class C_handle_reap : public EventCallback {
+ AsyncMessenger *msgr;
+
+ public:
+ C_handle_reap(AsyncMessenger *m): msgr(m) {}
+ void do_request(int id) {
+ // judge whether is a time event
+ msgr->reap_dead(id ? id : 0);
+ }
+ };
+ // the worker run messenger's cron jobs
+ Worker *local_worker;
+
/// overall lock used for AsyncMessenger data structures
Mutex lock;
// AsyncMessenger stuff
Mutex deleted_lock;
set<AsyncConnectionRef> deleted_conns;
+ EventCallbackRef reap_handler;
+ uint64_t reap_time_fd;
+
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
void unregister_conn(AsyncConnectionRef conn) {
Mutex::Locker l(deleted_lock);
deleted_conns.insert(conn);
+
+ if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
+ local_worker->dispatch_event_external(reap_handler);
+ } else if (!reap_time_fd) {
+ reap_time_fd = local_worker->create_time_event(
+ ReapDeadConnectionMaxPeriod, reap_handler);
+ }
}
/**
*
* See "deleted_conns"
*/
- int reap_dead(int max);
+ int reap_dead(bool is_timer);
/**
* @} // AsyncMessenger Internals
file_lock("AsyncMessenger::file_lock"),
time_lock("AsyncMessenger::time_lock"),
file_events(NULL),
- driver(NULL), time_event_next_id(0),
+ driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c), owner(0), already_wakeup(0) {
last_time = time(NULL);
}