From: Haomai Wang Date: Tue, 24 May 2016 17:22:47 +0000 (+0800) Subject: AsyncConnection: add tick timer X-Git-Tag: ses5-milestone5~574^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa4767cc76357fac12c1dd46a112790b302d1177;p=ceph.git AsyncConnection: add tick timer Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 918d36b70d90..366bdc6f4df5 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -93,7 +93,7 @@ class C_tick_wakeup : public EventCallback { public: explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {} void do_request(int fd_or_id) { - conn->tick(); + conn->tick(fd_or_id); } }; @@ -126,7 +126,9 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), - recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()), + recv_start(0), recv_end(0), + last_active(ceph::coarse_mono_clock::now()), + inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) { @@ -1304,6 +1306,10 @@ ssize_t AsyncConnection::_process_connection() dispatch_queue->queue_connect(this); async_msgr->ms_deliver_handle_fast_connect(this); + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again write_lock.Lock(); @@ -1471,6 +1477,11 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + + // make sure no pending tick timer + center->delete_time_event(last_tick_id); + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); + write_lock.Lock(); can_write = WriteStatus::CANWRITE; if (is_queued()) @@ -2601,15 +2612,20 @@ void AsyncConnection::wakeup_from(uint64_t id) process(); } -void AsyncConnection::tick() +void AsyncConnection::tick(uint64_t id) { - Mutex::Locker l(lock); auto now = ceph::coarse_mono_clock::now(); - auto idle_period = std::chrono::duration_cast(now - last_active).count(); - if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) { + ldout(async_msgr->cct, 20) << __func__ << " last_id=" << last_tick_id + << " last_active" << last_active << dendl; + assert(last_tick_id == id); + Mutex::Locker l(lock); + auto idle_period = std::chrono::duration_cast(now - last_active).count(); + if (inactive_timeout_us < (uint64_t)idle_period) { ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than " - << async_msgr->cct->_conf->ms_tcp_read_timeout - << ", mark self fault." << dendl; + << inactive_timeout_us + << " us, mark self fault." << dendl; fault(); + } else if (is_connected()) { + last_tick_id = center->create_time_event(inactive_timeout_us, tick_handler); } } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index f583face7cd3..f9867a572f33 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -321,6 +321,8 @@ class AsyncConnection : public Connection { uint32_t recv_end; set register_time_events; // need to delete it if stop ceph::coarse_mono_clock::time_point last_active; + uint64_t last_tick_id = 0; + const uint64_t inactive_timeout_us; // Tis section are temp variables used by state transition @@ -369,7 +371,7 @@ class AsyncConnection : public Connection { void handle_write(); void process(); void wakeup_from(uint64_t id); - void tick(); + void tick(uint64_t id); void local_deliver(); void stop(bool queue_reset) { lock.Lock();