public:
explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
void do_request(int fd_or_id) {
- conn->tick();
+ conn->tick(fd_or_id);
}
};
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
- recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()),
+ recv_start(0), recv_end(0),
+ last_active(ceph::coarse_mono_clock::now()),
+ inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000),
got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
{
dispatch_queue->queue_connect(this);
async_msgr->ms_deliver_handle_fast_connect(this);
+ // make sure no pending tick timer
+ center->delete_time_event(last_tick_id);
+ last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+
// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
write_lock.Lock();
ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
state = STATE_OPEN;
memset(&connect_msg, 0, sizeof(connect_msg));
+
+ // make sure no pending tick timer
+ center->delete_time_event(last_tick_id);
+ last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+
write_lock.Lock();
can_write = WriteStatus::CANWRITE;
if (is_queued())
process();
}
-void AsyncConnection::tick()
+void AsyncConnection::tick(uint64_t id)
{
- Mutex::Locker l(lock);
auto now = ceph::coarse_mono_clock::now();
- auto idle_period = std::chrono::duration_cast<std::chrono::seconds>(now - last_active).count();
- if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) {
+ ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id
+ << " last_active" << last_active << dendl;
+ assert(last_tick_id == id);
+ Mutex::Locker l(lock);
+ auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
+ if (inactive_timeout_us < (uint64_t)idle_period) {
ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
- << async_msgr->cct->_conf->ms_tcp_read_timeout
- << ", mark self fault." << dendl;
+ << inactive_timeout_us
+ << " us, mark self fault." << dendl;
fault();
+ } else if (is_connected()) {
+ last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
}
}