return 0;
}
+int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest)
+{
+ rank->send_keepalive(dest);
+ return 0;
+}
+
void SimpleMessenger::Endpoint::_set_myaddr(entity_addr_t a)
dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
// standby?
- if (!q.empty() && state == STATE_STANDBY && !policy.server)
+ if (is_queued() && state == STATE_STANDBY && !policy.server)
state = STATE_CONNECTING;
// connect?
}
if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
- (!q.empty() || in_seq > in_seq_acked)) {
+ (is_queued() || in_seq > in_seq_acked)) {
+
+ // keepalive?
+ if (keepalive) {
+ lock.Unlock();
+ int rc = write_keepalive();
+ lock.Lock();
+ if (rc < 0) {
+ dout(2) << "writer couldn't write keepalive, " << strerror(errno) << dendl;
+ fault();
+ continue;
+ }
+ keepalive = false;
+ }
// send ack?
if (in_seq > in_seq_acked) {
dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
// encode and copy out of *m
- if (m->empty_payload())
- m->encode_payload();
- m->calc_front_crc();
-
- if (!g_conf.ms_nocrc)
- m->calc_data_crc();
- else
- m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+ m->encode();
dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
int rc = write_message(m);
return 0;
}
+int SimpleMessenger::Pipe::write_keepalive()
+{
+ dout(10) << "write_keepalive" << dendl;
+
+ char c = CEPH_MSGR_TAG_KEEPALIVE;
+
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ struct iovec msgvec[2];
+ msgvec[0].iov_base = &c;
+ msgvec[0].iov_len = 1;
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+
+ if (do_sendmsg(sd, &msg, 1) < 0)
+ return -1;
+ return 0;
+}
+
int SimpleMessenger::Pipe::write_message(Message *m)
{
lock.Unlock();
}
+void SimpleMessenger::send_keepalive(const entity_inst_t& dest)
+{
+ const entity_addr_t dest_addr = dest.addr;
+ entity_addr_t dest_proc_addr = dest_addr;
+ lock.Lock();
+ {
+ // local?
+ if (!rank_addr.is_local_to(dest_addr)) {
+ // remote.
+ Pipe *pipe = 0;
+ if (rank_pipe.count( dest_proc_addr )) {
+ // connected?
+ pipe = rank_pipe[ dest_proc_addr ];
+ pipe->lock.Lock();
+ if (pipe->state == Pipe::STATE_CLOSED) {
+ dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+ pipe->unregister_pipe();
+ pipe->lock.Unlock();
+ pipe = 0;
+ } else {
+ dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
+ pipe->_send_keepalive();
+ pipe->lock.Unlock();
+ }
+ }
+ if (!pipe) {
+ dout(20) << "send_keepalive remote, " << dest_addr << ", new pipe." << dendl;
+ // not connected.
+ pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type()));
+ pipe->send_keepalive();
+ }
+ }
+ }
+
+ lock.Unlock();
+}
+
map<int, list<Message*> > q; // priority queue
list<Message*> sent;
Cond cond;
+ bool keepalive;
__u32 connect_seq, peer_global_seq;
__u32 out_seq;
int write_message(Message *m);
int do_sendmsg(int sd, struct msghdr *msg, int len);
int write_ack(unsigned s);
+ int write_keepalive();
void fault(bool silent=false, bool reader=false);
void fail();
lock("SimpleMessenger::Pipe::lock"),
state(st),
reader_running(false), writer_running(false),
+ keepalive(false),
connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0),
reader_thread(this), writer_thread(this) { }
__u32 get_out_seq() { return out_seq; }
+ bool is_queued() { return !q.empty() || keepalive; }
+
void register_pipe();
void unregister_pipe();
void join() {
q[m->get_priority()].push_back(m);
cond.Signal();
}
+ void send_keepalive() {
+ lock.Lock();
+ _send_keepalive();
+ lock.Unlock();
+ }
+ void _send_keepalive() {
+ keepalive = true;
+ cond.Signal();
+ }
Message *_get_next_outgoing() {
Message *m = 0;
while (!m && !q.empty()) {
int send_message(Message *m, entity_inst_t dest);
int forward_message(Message *m, entity_inst_t dest);
int lazy_send_message(Message *m, entity_inst_t dest);
-
+ int send_keepalive(entity_inst_t dest);
+
void mark_down(entity_addr_t a);
void mark_up(entity_name_t a, entity_addr_t& i);
};
void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false);
void prepare_dest(const entity_inst_t& inst);
+ void send_keepalive(const entity_inst_t& addr);
// create a new messenger
Endpoint *new_entity(entity_name_t addr);