From dbc4f60eb8ec813985886b38b6a74d9b4c70aef8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 31 Aug 2009 12:59:01 -0700 Subject: [PATCH] msgr: keepalive --- src/msg/Messenger.h | 1 + src/msg/SimpleMessenger.cc | 88 +++++++++++++++++++++++++++++++++----- src/msg/SimpleMessenger.h | 18 +++++++- 3 files changed, 96 insertions(+), 11 deletions(-) diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 6059f197be187..8d28ebc5fdc69 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -105,6 +105,7 @@ protected: virtual int lazy_send_message(Message *m, entity_inst_t dest) { return send_message(m, dest); } + virtual int send_keepalive(entity_inst_t dest) = 0; virtual void mark_down(entity_addr_t a) {} diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 268c6a6b46e45..26850796f5865 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -434,6 +434,12 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) return 0; } +int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest) +{ + rank->send_keepalive(dest); + return 0; +} + void SimpleMessenger::Endpoint::_set_myaddr(entity_addr_t a) @@ -1380,7 +1386,7 @@ void SimpleMessenger::Pipe::writer() dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl; // standby? - if (!q.empty() && state == STATE_STANDBY && !policy.server) + if (is_queued() && state == STATE_STANDBY && !policy.server) state = STATE_CONNECTING; // connect? @@ -1405,7 +1411,20 @@ void SimpleMessenger::Pipe::writer() } if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && - (!q.empty() || in_seq > in_seq_acked)) { + (is_queued() || in_seq > in_seq_acked)) { + + // keepalive? + if (keepalive) { + lock.Unlock(); + int rc = write_keepalive(); + lock.Lock(); + if (rc < 0) { + dout(2) << "writer couldn't write keepalive, " << strerror(errno) << dendl; + fault(); + continue; + } + keepalive = false; + } // send ack? if (in_seq > in_seq_acked) { @@ -1432,14 +1451,7 @@ void SimpleMessenger::Pipe::writer() dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; // encode and copy out of *m - if (m->empty_payload()) - m->encode_payload(); - m->calc_front_crc(); - - if (!g_conf.ms_nocrc) - m->calc_data_crc(); - else - m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; + m->encode(); dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(m); @@ -1673,6 +1685,25 @@ int SimpleMessenger::Pipe::write_ack(unsigned seq) return 0; } +int SimpleMessenger::Pipe::write_keepalive() +{ + dout(10) << "write_keepalive" << dendl; + + char c = CEPH_MSGR_TAG_KEEPALIVE; + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &c; + msgvec[0].iov_len = 1; + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + + if (do_sendmsg(sd, &msg, 1) < 0) + return -1; + return 0; +} + int SimpleMessenger::Pipe::write_message(Message *m) { @@ -2082,6 +2113,43 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lock.Unlock(); } +void SimpleMessenger::send_keepalive(const entity_inst_t& dest) +{ + const entity_addr_t dest_addr = dest.addr; + entity_addr_t dest_proc_addr = dest_addr; + lock.Lock(); + { + // local? + if (!rank_addr.is_local_to(dest_addr)) { + // remote. + Pipe *pipe = 0; + if (rank_pipe.count( dest_proc_addr )) { + // connected? + pipe = rank_pipe[ dest_proc_addr ]; + pipe->lock.Lock(); + if (pipe->state == Pipe::STATE_CLOSED) { + dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + pipe->unregister_pipe(); + pipe->lock.Unlock(); + pipe = 0; + } else { + dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl; + pipe->_send_keepalive(); + pipe->lock.Unlock(); + } + } + if (!pipe) { + dout(20) << "send_keepalive remote, " << dest_addr << ", new pipe." << dendl; + // not connected. + pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type())); + pipe->send_keepalive(); + } + } + } + + lock.Unlock(); +} + diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 3d8c6e98eb343..d16b48d82d29d 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -146,6 +146,7 @@ private: map > q; // priority queue list sent; Cond cond; + bool keepalive; __u32 connect_seq, peer_global_seq; __u32 out_seq; @@ -160,6 +161,7 @@ private: int write_message(Message *m); int do_sendmsg(int sd, struct msghdr *msg, int len); int write_ack(unsigned s); + int write_keepalive(); void fault(bool silent=false, bool reader=false); void fail(); @@ -192,6 +194,7 @@ private: lock("SimpleMessenger::Pipe::lock"), state(st), reader_running(false), writer_running(false), + keepalive(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { } @@ -227,6 +230,8 @@ private: __u32 get_out_seq() { return out_seq; } + bool is_queued() { return !q.empty() || keepalive; } + void register_pipe(); void unregister_pipe(); void join() { @@ -245,6 +250,15 @@ private: q[m->get_priority()].push_back(m); cond.Signal(); } + void send_keepalive() { + lock.Lock(); + _send_keepalive(); + lock.Unlock(); + } + void _send_keepalive() { + keepalive = true; + cond.Signal(); + } Message *_get_next_outgoing() { Message *m = 0; while (!m && !q.empty()) { @@ -374,7 +388,8 @@ private: int send_message(Message *m, entity_inst_t dest); int forward_message(Message *m, entity_inst_t dest); int lazy_send_message(Message *m, entity_inst_t dest); - + int send_keepalive(entity_inst_t dest); + void mark_down(entity_addr_t a); void mark_up(entity_name_t a, entity_addr_t& i); }; @@ -455,6 +470,7 @@ public: void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false); void prepare_dest(const entity_inst_t& inst); + void send_keepalive(const entity_inst_t& addr); // create a new messenger Endpoint *new_entity(entity_name_t addr); -- 2.39.5