processor(this, cct, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), need_addr(true), listen_sd(-1), did_bind(false),
- global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"), reap_time_fd(0),
+ global_seq(0), deleted_lock("AsyncMessenger::deleted_lock"),
cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
*/
AsyncMessenger::~AsyncMessenger()
{
- if (reap_time_fd)
- local_worker->center.delete_time_event(reap_time_fd);
delete reap_handler;
assert(!did_bind); // either we didn't bind or we shut down the Processor
local_connection->mark_down();
lock.Unlock();
}
-int AsyncMessenger::reap_dead(bool is_timer)
+int AsyncMessenger::reap_dead()
{
int num = 0;
++num;
}
- if (is_timer)
- reap_time_fd = 0;
-
return num;
}
int _send_message(Message *m, const entity_inst_t& dest);
private:
- static const uint64_t ReapDeadConnectionThreshold = 10;
- static const uint64_t ReapDeadConnectionMaxPeriod = 30*1000*1000;
+ static const uint64_t ReapDeadConnectionThreshold = 5;
WorkerPool *pool;
C_handle_reap(AsyncMessenger *m): msgr(m) {}
void do_request(int id) {
// judge whether is a time event
- msgr->reap_dead(id ? id : 0);
+ msgr->reap_dead();
}
};
// the worker run messenger's cron jobs
set<AsyncConnectionRef> deleted_conns;
EventCallbackRef reap_handler;
- uint64_t reap_time_fd;
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
local_worker->center.dispatch_event_external(reap_handler);
- } else if (!reap_time_fd) {
- reap_time_fd = local_worker->center.create_time_event(
- ReapDeadConnectionMaxPeriod, reap_handler);
}
}
*
* See "deleted_conns"
*/
- int reap_dead(bool is_timer);
+ int reap_dead();
/**
* @} // AsyncMessenger Internals
if (id >= time_event_next_id)
return ;
-
for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
it != time_events.end(); ++it) {
for (list<TimeEvent>::iterator j = it->second.begin();