]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: keepalive
authorSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 19:59:01 +0000 (12:59 -0700)
committerSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 19:59:01 +0000 (12:59 -0700)
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 6059f197be187c0615b6dfb9a997f00d4ed50e56..8d28ebc5fdc699e3623a989d3fd90f15204f0747 100644 (file)
@@ -105,6 +105,7 @@ protected:
   virtual int lazy_send_message(Message *m, entity_inst_t dest) {
     return send_message(m, dest);
   }
+  virtual int send_keepalive(entity_inst_t dest) = 0;
 
   virtual void mark_down(entity_addr_t a) {}
 
index 268c6a6b46e457abbd97e15c2831ce878be9044b..26850796f58654ab26b825dcdbfd114cd4cf2b79 100644 (file)
@@ -434,6 +434,12 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
   return 0;
 }
 
+int SimpleMessenger::Endpoint::send_keepalive(entity_inst_t dest)
+{
+  rank->send_keepalive(dest);
+  return 0;
+}
+
 
 
 void SimpleMessenger::Endpoint::_set_myaddr(entity_addr_t a)
@@ -1380,7 +1386,7 @@ void SimpleMessenger::Pipe::writer()
     dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
 
     // standby?
-    if (!q.empty() && state == STATE_STANDBY && !policy.server)
+    if (is_queued() && state == STATE_STANDBY && !policy.server)
       state = STATE_CONNECTING;
 
     // connect?
@@ -1405,7 +1411,20 @@ void SimpleMessenger::Pipe::writer()
     }
 
     if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
-       (!q.empty() || in_seq > in_seq_acked)) {
+       (is_queued() || in_seq > in_seq_acked)) {
+
+      // keepalive?
+      if (keepalive) {
+       lock.Unlock();
+       int rc = write_keepalive();
+       lock.Lock();
+       if (rc < 0) {
+         dout(2) << "writer couldn't write keepalive, " << strerror(errno) << dendl;
+         fault();
+         continue;
+       }
+       keepalive = false;
+      }
 
       // send ack?
       if (in_seq > in_seq_acked) {
@@ -1432,14 +1451,7 @@ void SimpleMessenger::Pipe::writer()
         dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
 
        // encode and copy out of *m
-        if (m->empty_payload())
-         m->encode_payload();
-       m->calc_front_crc();
-
-       if (!g_conf.ms_nocrc)
-         m->calc_data_crc();
-       else
-         m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+       m->encode();
 
         dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
        int rc = write_message(m);
@@ -1673,6 +1685,25 @@ int SimpleMessenger::Pipe::write_ack(unsigned seq)
   return 0;
 }
 
+int SimpleMessenger::Pipe::write_keepalive()
+{
+  dout(10) << "write_keepalive" << dendl;
+
+  char c = CEPH_MSGR_TAG_KEEPALIVE;
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &c;
+  msgvec[0].iov_len = 1;
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  
+  if (do_sendmsg(sd, &msg, 1) < 0) 
+    return -1; 
+  return 0;
+}
+
 
 int SimpleMessenger::Pipe::write_message(Message *m)
 {
@@ -2082,6 +2113,43 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
   lock.Unlock();
 }
 
+void SimpleMessenger::send_keepalive(const entity_inst_t& dest)
+{
+  const entity_addr_t dest_addr = dest.addr;
+  entity_addr_t dest_proc_addr = dest_addr;
+  lock.Lock();
+  {
+    // local?
+    if (!rank_addr.is_local_to(dest_addr)) {
+      // remote.
+      Pipe *pipe = 0;
+      if (rank_pipe.count( dest_proc_addr )) {
+        // connected?
+        pipe = rank_pipe[ dest_proc_addr ];
+       pipe->lock.Lock();
+       if (pipe->state == Pipe::STATE_CLOSED) {
+         dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+         pipe->unregister_pipe();
+         pipe->lock.Unlock();
+         pipe = 0;
+       } else {
+         dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
+         pipe->_send_keepalive();
+         pipe->lock.Unlock();
+       }
+      }
+      if (!pipe) {
+       dout(20) << "send_keepalive remote, " << dest_addr << ", new pipe." << dendl;
+       // not connected.
+       pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type()));
+       pipe->send_keepalive();
+      }
+    }
+  }
+
+  lock.Unlock();
+}
+
 
 
 
index 3d8c6e98eb3433b629813fbe24c349ee8385198b..d16b48d82d29d48345b481bb130c0fb26acbba2d 100644 (file)
@@ -146,6 +146,7 @@ private:
     map<int, list<Message*> > q;  // priority queue
     list<Message*> sent;
     Cond cond;
+    bool keepalive;
     
     __u32 connect_seq, peer_global_seq;
     __u32 out_seq;
@@ -160,6 +161,7 @@ private:
     int write_message(Message *m);
     int do_sendmsg(int sd, struct msghdr *msg, int len);
     int write_ack(unsigned s);
+    int write_keepalive();
 
     void fault(bool silent=false, bool reader=false);
     void fail();
@@ -192,6 +194,7 @@ private:
       lock("SimpleMessenger::Pipe::lock"),
       state(st), 
       reader_running(false), writer_running(false),
+      keepalive(false),
       connect_seq(0), peer_global_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) { }
@@ -227,6 +230,8 @@ private:
 
     __u32 get_out_seq() { return out_seq; }
 
+    bool is_queued() { return !q.empty() || keepalive; }
+
     void register_pipe();
     void unregister_pipe();
     void join() {
@@ -245,6 +250,15 @@ private:
       q[m->get_priority()].push_back(m);
       cond.Signal();
     }
+    void send_keepalive() {
+      lock.Lock();
+      _send_keepalive();
+      lock.Unlock();
+    }    
+    void _send_keepalive() {
+      keepalive = true;
+      cond.Signal();
+    }
     Message *_get_next_outgoing() {
       Message *m = 0;
       while (!m && !q.empty()) {
@@ -374,7 +388,8 @@ private:
     int send_message(Message *m, entity_inst_t dest);
     int forward_message(Message *m, entity_inst_t dest);
     int lazy_send_message(Message *m, entity_inst_t dest);
-    
+    int send_keepalive(entity_inst_t dest);
+
     void mark_down(entity_addr_t a);
     void mark_up(entity_name_t a, entity_addr_t& i);
   };
@@ -455,6 +470,7 @@ public:
 
   void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false);  
   void prepare_dest(const entity_inst_t& inst);
+  void send_keepalive(const entity_inst_t& addr);  
 
   // create a new messenger
   Endpoint *new_entity(entity_name_t addr);