}
};
+class C_tick_wakeup : public EventCallback {
+ AsyncConnectionRef conn;
+
+ public:
+ explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
+ void do_request(int fd_or_id) {
+ conn->tick();
+ }
+};
+
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
dispatch_queue(q), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
- recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
+ recv_start(0), recv_end(0), last_active(ceph::coarse_mono_clock::now()),
+ got_bad_auth(false), authorizer(NULL), replacing(false),
is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c)
{
read_handler = new C_handle_read(this);
write_handler = new C_handle_write(this);
wakeup_handler = new C_time_wakeup(this);
+ tick_handler = new C_tick_wakeup(this);
memset(msgvec, 0, sizeof(msgvec));
// double recv_max_prefetch see "read_until"
recv_buf = new char[2*recv_max_prefetch];
int prev_state = state;
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
+ last_active = ceph::coarse_mono_clock::now();
do {
ldout(async_msgr->cct, 20) << __func__ << " prev state is " << get_state_name(prev_state) << dendl;
prev_state = state;
lock.Unlock();
process();
}
+
+void AsyncConnection::tick()
+{
+ Mutex::Locker l(lock);
+ auto now = ceph::coarse_mono_clock::now();
+ auto idle_period = std::chrono::duration_cast<std::chrono::seconds>(now - last_active).count();
+ if (async_msgr->cct->_conf->ms_tcp_read_timeout > (uint64_t)idle_period) {
+ ldout(async_msgr->cct, 1) << __func__ << " idle(" << idle_period << ") more than "
+ << async_msgr->cct->_conf->ms_tcp_read_timeout
+ << ", mark self fault." << dendl;
+ fault();
+ }
+}
using namespace std;
#include "auth/AuthSessionHandler.h"
+#include "common/ceph_time.h"
#include "common/Mutex.h"
#include "common/perf_counters.h"
#include "include/buffer.h"
EventCallbackRef read_handler;
EventCallbackRef write_handler;
EventCallbackRef wakeup_handler;
+ EventCallbackRef tick_handler;
struct iovec msgvec[ASYNC_IOV_MAX];
char *recv_buf;
uint32_t recv_max_prefetch;
uint32_t recv_start;
uint32_t recv_end;
set<uint64_t> register_time_events; // need to delete it if stop
+ ceph::coarse_mono_clock::time_point last_active;
// Tis section are temp variables used by state transition
void handle_write();
void process();
void wakeup_from(uint64_t id);
+ void tick();
void local_deliver();
void stop(bool queue_reset) {
lock.Lock();
delete read_handler;
delete write_handler;
delete wakeup_handler;
+ delete tick_handler;
if (delay_state) {
delete delay_state;
delay_state = NULL;