recv_max_prefetch(std::max<int64_t>(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0),
last_active(ceph::coarse_mono_clock::now()),
+ connect_timeout_us(cct->_conf->ms_connection_ready_timeout*1000*1000),
inactive_timeout_us(cct->_conf->ms_connection_idle_timeout*1000*1000),
msgr2(m2), state_offset(0),
worker(w), center(&w->center),read_buffer(nullptr)
case STATE_CONNECTING: {
ceph_assert(!policy.server);
+ // clear timer (if any) since we are connecting/re-connecting
+ if (last_tick_id) {
+ center->delete_time_event(last_tick_id);
+ last_tick_id = 0;
+ }
+
if (cs) {
center->delete_file_event(cs.fd(), EVENT_READABLE | EVENT_WRITABLE);
cs.close();
ldout(async_msgr->cct, 10)
<< __func__ << " connect successfully, ready to send banner" << dendl;
state = STATE_CONNECTION_ESTABLISHED;
+ ceph_assert(last_tick_id == 0);
+ // exclude TCP nonblock connect time
+ last_connect_started = ceph::coarse_mono_clock::now();
+ last_tick_id = center->create_time_event(
+ connect_timeout_us, tick_handler);
break;
}
<< " last_active=" << last_active << dendl;
std::lock_guard<std::mutex> l(lock);
last_tick_id = 0;
- auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>(now - last_active).count();
- if (inactive_timeout_us < (uint64_t)idle_period) {
- ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
- << inactive_timeout_us
- << " us, mark self fault." << dendl;
- protocol->fault();
- } else if (is_connected()) {
- last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+ if (!is_connected()) {
+ if (connect_timeout_us <=
+ (uint64_t)std::chrono::duration_cast<std::chrono::microseconds>
+ (now - last_connect_started).count()) {
+ ldout(async_msgr->cct, 0) << __func__ << " see no progress in more than "
+ << connect_timeout_us
+ << " us during connecting, fault."
+ << dendl;
+ protocol->fault();
+ } else {
+ last_tick_id = center->create_time_event(connect_timeout_us, tick_handler);
+ }
+ } else {
+ auto idle_period = std::chrono::duration_cast<std::chrono::microseconds>
+ (now - last_active).count();
+ if (inactive_timeout_us < (uint64_t)idle_period) {
+ ldout(async_msgr->cct, 1) << __func__ << " idle (" << idle_period
+ << ") for more than " << inactive_timeout_us
+ << " us, fault."
+ << dendl;
+ protocol->fault();
+ } else {
+ last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler);
+ }
}
}
if (exproto->state == CLOSED) return;
ceph_assert(exproto->state == NONE);
+ // we have called shutdown_socket above
+ ceph_assert(existing->last_tick_id == 0);
+ // restart timer since we are going to re-build connection
+ existing->last_connect_started = ceph::coarse_mono_clock::now();
+ existing->last_tick_id = existing->center->create_time_event(
+ existing->connect_timeout_us, existing->tick_handler);
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
exproto->state = ACCEPTING;
ceph_assert(exproto->state == NONE);
exproto->state = SESSION_ACCEPTING;
+ // we have called shutdown_socket above
+ ceph_assert(existing->last_tick_id == 0);
+ // restart timer since we are going to re-build connection
+ existing->last_connect_started = ceph::coarse_mono_clock::now();
+ existing->last_tick_id = existing->center->create_time_event(
+ existing->connect_timeout_us, existing->tick_handler);
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);