RefCountedObject *priv;
int peer_type;
entity_addr_t peer_addr;
- utime_t last_keepalive_ack;
+ utime_t last_keepalive, last_keepalive_ack;
private:
uint64_t features;
public:
rx_buffers.erase(tid);
}
+ utime_t get_last_keepalive() const {
+ return last_keepalive;
+ }
+ void set_last_keepalive(utime_t t) {
+ last_keepalive = t;
+ }
utime_t get_last_keepalive_ack() const {
return last_keepalive_ack;
}
+ void set_last_keepalive_ack(utime_t t) {
+ last_keepalive_ack = t;
+ }
+
};
typedef boost::intrusive_ptr<Connection> ConnectionRef;
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
+ set_last_keepalive(ceph_clock_now(NULL));
} else if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
state = STATE_OPEN_KEEPALIVE2;
} else if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
utime_t kp_t = utime_t(*t);
write_lock.Lock();
_send_keepalive_or_ack(true, &kp_t);
- write_lock.Unlock();
+ write_lock.Unlock();
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
+ set_last_keepalive(ceph_clock_now(NULL));
state = STATE_OPEN;
break;
}
}
t = (ceph_timespec*)state_buffer;
- last_keepalive_ack = utime_t(*t);
+ set_last_keepalive_ack(utime_t(*t));
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
state = STATE_OPEN;
break;
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
ldout(msgr->cct,20) << "reader got KEEPALIVE" << dendl;
pipe_lock.Lock();
+ connection_state->set_last_keepalive(ceph_clock_now(NULL));
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
keepalive_ack_stamp = utime_t(t);
ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
<< dendl;
+ connection_state->set_last_keepalive(ceph_clock_now(NULL));
cond.Signal();
}
continue;
ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
fault(true);
} else {
- connection_state->last_keepalive_ack = utime_t(t);
+ connection_state->set_last_keepalive_ack(utime_t(t));
}
continue;
}