writer_running(false),
in_q(&(r->dispatch_queue)),
send_keepalive(false),
+ send_keepalive_ack(false),
close_on_empty(false),
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0) {
pipe_lock.Lock();
continue;
}
+ if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
+ ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
+ ceph_timespec t;
+ int rc = tcp_read((char*)&t, sizeof(t));
+ pipe_lock.Lock();
+ if (rc < 0) {
+ ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
+ << cpp_strerror(errno) << dendl;
+ fault(true);
+ } else {
+ send_keepalive_ack = true;
+ keepalive_ack_stamp = utime_t(t);
+ ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
+ << dendl;
+ cond.Signal();
+ }
+ continue;
+ }
+ if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+ ldout(msgr->cct,20) << "reader got KEEPALIVE_ACK" << dendl;
+ struct ceph_timespec t;
+ int rc = tcp_read((char*)&t, sizeof(t));
+ pipe_lock.Lock();
+ if (rc < 0) {
+ ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
+ fault(true);
+ } else {
+ connection_state->last_keepalive_ack = utime_t(t);
+ }
+ continue;
+ }
// open ...
if (tag == CEPH_MSGR_TAG_ACK) {
// keepalive?
if (send_keepalive) {
- pipe_lock.Unlock();
- int rc = write_keepalive();
+ int rc;
+ if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ pipe_lock.Unlock();
+ rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
+ ceph_clock_now(msgr->cct));
+ } else {
+ pipe_lock.Unlock();
+ rc = write_keepalive();
+ }
pipe_lock.Lock();
if (rc < 0) {
- ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+ ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
+ << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault();
continue;
}
send_keepalive = false;
}
+ if (send_keepalive_ack) {
+ utime_t t = keepalive_ack_stamp;
+ pipe_lock.Unlock();
+ int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
+ pipe_lock.Lock();
+ if (rc < 0) {
+ ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+ fault();
+ continue;
+ }
+ send_keepalive_ack = false;
+ }
// send ack?
if (in_seq > in_seq_acked) {
return 0;
}
+int Pipe::write_keepalive2(char tag, const utime_t& t)
+{
+ ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
+ struct ceph_timespec ts;
+ t.encode_timeval(&ts);
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ struct iovec msgvec[2];
+ msgvec[0].iov_base = &tag;
+ msgvec[0].iov_len = 1;
+ msgvec[1].iov_base = &ts;
+ msgvec[1].iov_len = sizeof(ts);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 2;
+
+ if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
+ return -1;
+ return 0;
+}
+
int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist)
{
list<Message*> sent;
Cond cond;
bool send_keepalive;
+ bool send_keepalive_ack;
+ utime_t keepalive_ack_stamp;
bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
bool close_on_empty;
int do_sendmsg(struct msghdr *msg, int len, bool more=false);
int write_ack(uint64_t s);
int write_keepalive();
+ int write_keepalive2(char tag, const utime_t &t);
void fault(bool reader=false);
__u32 get_out_seq() { return out_seq; }
- bool is_queued() { return !out_q.empty() || send_keepalive; }
+ bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
entity_addr_t& get_peer_addr() { return peer_addr; }