]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: more heartbeat rework
authorSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 21:45:36 +0000 (14:45 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Fri, 20 May 2011 22:15:12 +0000 (15:15 -0700)
A few things:
 - track Connection* instead of entity_inst_t for hb peers
 - we can only send maps over the cluster_messenger
   - if peer is still alive, do that
   - if peer is not, send dying MOSDPing ping with YOU_DIED flag

src/messages/MOSDPing.h
src/osd/OSD.cc
src/osd/OSD.h
src/vstart.sh

index f27122bea66f66fbd6204761e6dd2e2f547d33af..d6b40768ed496d5b067d3cdfce46b7be57784fcc 100644 (file)
 
 class MOSDPing : public Message {
  public:
+  enum {
+    HEARTBEAT = 0,
+    REQUEST_HEARTBEAT = 1,
+    YOU_DIED = 2,
+  };
+  const char *get_op_name(int op) {
+    switch (op) {
+    case HEARTBEAT: return "heartbeat";
+    case REQUEST_HEARTBEAT: return "request_heartbeat";
+    case YOU_DIED: return "you_died";
+    default: return "???";
+    }
+  }
+
   ceph_fsid_t fsid;
   epoch_t map_epoch, peer_as_of_epoch;
-  bool ack;
+  __u8 op;
   osd_peer_stat_t peer_stat;
 
-  MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, bool a=false) : 
-    Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), ack(a), peer_stat(ps) { }
+  MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) : 
+    Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { }
   MOSDPing() {}
 private:
   ~MOSDPing() {}
@@ -40,23 +54,21 @@ public:
     ::decode(fsid, p);
     ::decode(map_epoch, p);
     ::decode(peer_as_of_epoch, p);
-    ::decode(ack, p);
+    ::decode(op, p);
     ::decode(peer_stat, p);
   }
   void encode_payload() {
     ::encode(fsid, payload);
     ::encode(map_epoch, payload);
     ::encode(peer_as_of_epoch, payload);
-    ::encode(ack, payload);
+    ::encode(op, payload);
     ::encode(peer_stat, payload);
   }
 
   const char *get_type_name() { return "osd_ping"; }
   void print(ostream& out) {
-    out << "osd_ping(e" << map_epoch << " as_of " << peer_as_of_epoch;
-    if (ack)
-      out << " ACK";
-    out << ")";
+    out << "osd_ping(e" << map_epoch << " as_of " << peer_as_of_epoch
+       << " " << get_op_name(op) << ")";
   }
 };
 
index 316e4ae152639b7dd566583db1bcfadd9a77b131..f564d2dda848a228e6b59490d0c370b5573d0d09 100644 (file)
@@ -1408,10 +1408,10 @@ void OSD::update_heartbeat_peers()
   old_from_stamp.swap(heartbeat_from_stamp);
 
   map<int, epoch_t> old_to, old_from;
-  map<int, entity_inst_t> old_inst;
+  map<int, Connection*> old_con;
   old_to.swap(heartbeat_to);
   old_from.swap(heartbeat_from);
-  old_inst.swap(heartbeat_inst);
+  old_con.swap(heartbeat_con);
 
   utime_t now = g_clock.now();
 
@@ -1422,10 +1422,11 @@ void OSD::update_heartbeat_peers()
        p != old_to.end();
        p++) {
     if (p->second > osdmap->get_epoch()) {
-      dout(10) << "update_heartbeat_peers: keeping newer _to peer " << old_inst[p->first]
+      dout(10) << "update_heartbeat_peers: keeping newer _to peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
               << " as of " << p->second << dendl;
       heartbeat_to[p->first] = p->second;
-      heartbeat_inst[p->first] = old_inst[p->first];
+      heartbeat_con[p->first] = old_con[p->first];
     }
   }
 
@@ -1442,9 +1443,10 @@ void OSD::update_heartbeat_peers()
       if (heartbeat_to.count(p))
        continue;
       heartbeat_to[p] = osdmap->get_epoch();
-      heartbeat_inst[p] = osdmap->get_hb_inst(p);
-      if (old_to.count(p) == 0 || old_inst[p] != heartbeat_inst[p])
-       dout(10) << "update_heartbeat_peers: new _to osd" << p << " " << heartbeat_inst[p] << dendl;
+      heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
+      if (old_to.count(p) == 0 || old_con[p] != heartbeat_con[p])
+       dout(10) << "update_heartbeat_peers: new _to osd" << p
+                << " " << heartbeat_con[p]->get_peer_addr() << dendl;
     }
     else if (pg->get_role() == 0) {
       assert(pg->acting[0] == whoami);
@@ -1454,74 +1456,91 @@ void OSD::update_heartbeat_peers()
        if (heartbeat_from.count(p))
          continue;
        heartbeat_from[p] = osdmap->get_epoch();
-       heartbeat_inst[p] = osdmap->get_hb_inst(p);
+       heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
        if (old_from_stamp.count(p) && old_from.count(p) &&
-           old_inst[p] == heartbeat_inst[p]) {
+           old_con[p] == heartbeat_con[p]) {
          // have a stamp _AND_ i'm not new to the set
          heartbeat_from_stamp[p] = old_from_stamp[p];
        } else {
-         dout(10) << "update_heartbeat_peers: new _from osd" << p << " " << heartbeat_inst[p] << dendl;
+         dout(10) << "update_heartbeat_peers: new _from osd" << p
+                  << " " << heartbeat_con[p]->get_peer_addr() << dendl;
          heartbeat_from_stamp[p] = now;  
-         MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, true); // request hb
-         heartbeat_messenger->send_message(m, heartbeat_inst[p]);
+         MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat,
+                                    MOSDPing::REQUEST_HEARTBEAT);
+         heartbeat_messenger->send_message(m, heartbeat_con[p]);
        }
       }
     }
   }
+
+  map<int, Connection*> down;
+
   for (map<int,epoch_t>::iterator p = old_to.begin();
        p != old_to.end();
        p++) {
-    assert(old_inst.count(p->first));
-    if (heartbeat_to.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first])
+    assert(old_con.count(p->first));
+    if (heartbeat_to.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
       continue;
     assert(p->second <= osdmap->get_epoch());
 
     // share latest map with this peer so they know not to expect
     // heartbeats from us.  otherwise they may mark us down!
-    dout(10) << "update_heartbeat_peers: sharing map with old _to peer " << old_inst[p->first] 
-            << " as of " << p->second << dendl;
-    _share_map_outgoing(old_inst[p->first]);
+    if (osdmap->is_up(p->first)) {
+      dout(10) << "update_heartbeat_peers: sharing map with old _to peer osd" << p->first << dendl;
+      _share_map_outgoing(osdmap->get_cluster_inst(p->first));
+    }
 
-    if (heartbeat_from.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
-      dout(10) << "update_heartbeat_peers: old _to peer " << old_inst[p->first] 
+    if (heartbeat_from.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
+      dout(10) << "update_heartbeat_peers: old _to peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
               << " is still a _from peer, not marking down" << dendl;
     } else {
-      dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first] 
+      dout(10) << "update_heartbeat_peers: will mark down old _to peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
               << " as of " << p->second << dendl;
-      Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]);
-      heartbeat_messenger->mark_disposable(con);
-      heartbeat_messenger->mark_down_on_empty(con);
-      con->put();
-      if (!osdmap->is_up(p->first))
-       forget_peer_epoch(p->first, osdmap->get_epoch());
+      down[p->first] = old_con[p->first];
     }
   }
   for (map<int,epoch_t>::iterator p = old_from.begin();
        p != old_from.end();
        p++) {
-    assert(old_inst.count(p->first));
-    if (heartbeat_from.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first])
+    assert(old_con.count(p->first));
+    if (heartbeat_from.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
       continue;
 
     // share latest map with this peer, just to be nice.
-    dout(10) << "update_heartbeat_peers: sharing map with old _from peer " << old_inst[p->first]
-            << dendl;
-    _share_map_outgoing(old_inst[p->first]);
+    if (osdmap->is_up(p->first)) {
+      dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd" << p->first << dendl;
+      _share_map_outgoing(osdmap->get_cluster_inst(p->first));
+    }
 
-    if (heartbeat_to.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) {
-      dout(10) << "update_heartbeat_peers: old _from peer " << old_inst[p->first]
+    if (heartbeat_to.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
+      dout(10) << "update_heartbeat_peers: old _from peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
               << " is still a _to peer, not marking down" << dendl;
     } else {
-      dout(10) << "update_heartbeat_peers: marking down old _from peer " << old_inst[p->first] 
+      dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
               << " as of " << p->second << dendl;
-      Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]);
-      heartbeat_messenger->mark_disposable(con);
-      heartbeat_messenger->mark_down_on_empty(con);
-      con->put();
-      if (!osdmap->is_up(p->first))
-       forget_peer_epoch(p->first, osdmap->get_epoch());
+      down[p->first] = old_con[p->first];
     }
   }
+  for (map<int, Connection*>::iterator p = down.begin(); p != down.end(); ++p) {
+    Connection *con = p->second;
+    heartbeat_messenger->mark_disposable(con);
+    if (!osdmap->is_up(p->first)) {
+      dout(10) << "update_heartbeat_peers: telling old peer osd" << p->first
+              << " " << old_con[p->first]->get_peer_addr()
+              << " they are down" << dendl;
+      heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
+                                                    heartbeat_epoch, my_stat,
+                                                    MOSDPing::YOU_DIED), con);
+    }
+    heartbeat_messenger->mark_down_on_empty(con);
+    con->put();
+    if (!osdmap->is_up(p->first))
+      forget_peer_epoch(p->first, osdmap->get_epoch());
+  }
 
   dout(10) << "update_heartbeat_peers: hb   to: " << heartbeat_to << dendl;
   dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
@@ -1545,7 +1564,10 @@ void OSD::reset_heartbeat_peers()
   heartbeat_to.clear();
   heartbeat_from.clear();
   heartbeat_from_stamp.clear();
-  heartbeat_inst.clear();
+  while (!heartbeat_con.empty()) {
+    heartbeat_con.begin()->second->put();
+    heartbeat_con.erase(heartbeat_con.begin());
+  }
   failure_queue.clear();
   heartbeat_lock.Unlock();
 
@@ -1565,7 +1587,8 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
   bool locked = map_lock.try_get_read();
 
-  if (m->ack) {
+  switch (m->op) {
+  case MOSDPing::REQUEST_HEARTBEAT:
     if (heartbeat_to.count(from) && m->peer_as_of_epoch <= heartbeat_to[from]) {
       dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
              << " request for heartbeats as_of " << m->peer_as_of_epoch
@@ -1574,41 +1597,52 @@ void OSD::handle_osd_ping(MOSDPing *m)
       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
              << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
       heartbeat_to[from] = m->peer_as_of_epoch;
-      heartbeat_inst[from] = m->get_source_inst();
+      heartbeat_con[from] = m->get_connection();
+      heartbeat_con[from]->get();
       
       if (locked && m->map_epoch && !is_booting())
        _share_map_incoming(m->get_source_inst(), m->map_epoch,
                            (Session*) m->get_connection()->get_priv());
     }
-  }
+    break;
 
-  if (heartbeat_from.count(from) &&
-      heartbeat_inst[from] == m->get_source_inst()) {
-    // only take peer stat or share map now if map_lock is uncontended
-    if (locked) {
-      dout(20) << "handle_osd_ping " << m->get_source_inst()
-              << " took stat " << m->peer_stat << dendl;
-      if (m->map_epoch && !is_booting())
-       _share_map_incoming(m->get_source_inst(), m->map_epoch,
-                           (Session*) m->get_connection()->get_priv());
-      take_peer_stat(from, m->peer_stat);  // only with map_lock held!
+  case MOSDPing::HEARTBEAT:
+    if (heartbeat_from.count(from) &&
+       heartbeat_con[from] == m->get_connection()) {
+      // only take peer stat or share map now if map_lock is uncontended
+      if (locked) {
+       dout(20) << "handle_osd_ping " << m->get_source_inst()
+                << " took stat " << m->peer_stat << dendl;
+       if (m->map_epoch && !is_booting())
+         _share_map_incoming(m->get_source_inst(), m->map_epoch,
+                             (Session*) m->get_connection()->get_priv());
+       take_peer_stat(from, m->peer_stat);  // only with map_lock held!
+      } else {
+       dout(20) << "handle_osd_ping " << m->get_source_inst()
+                << " dropped stat " << m->peer_stat << dendl;
+      }
+
+      note_peer_epoch(from, m->map_epoch);
+      
+      heartbeat_from_stamp[from] = g_clock.now();  // don't let _my_ lag interfere.
+      
+      // remove from failure lists if needed
+      if (failure_pending.count(from)) {
+       send_still_alive(from);
+       failure_pending.erase(from);
+      }
+      failure_queue.erase(from);
     } else {
-      dout(20) << "handle_osd_ping " << m->get_source_inst()
-              << " dropped stat " << m->peer_stat << dendl;
+      dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
     }
+    break;
 
-    note_peer_epoch(from, m->map_epoch);
-
-    heartbeat_from_stamp[from] = g_clock.now();  // don't let _my_ lag interfere.
-
-    // remove from failure lists if needed
-    if (failure_pending.count(from)) {
-      send_still_alive(from);
-      failure_pending.erase(from);
-    }
-    failure_queue.erase(from);
-  } else {
-    dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
+  case MOSDPing::YOU_DIED:
+    dout(10) << "handle_osd_ping " << m->get_source_inst() << " says i am down in " << m->map_epoch
+            << dendl;
+    monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME);
+    monc->renew_subs();
+    break;
   }
 
   if (locked) 
@@ -1616,6 +1650,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
   heartbeat_lock.Unlock();
   m->put();
+
 }
 
 void OSD::heartbeat_entry()
@@ -1707,7 +1742,7 @@ void OSD::heartbeat()
        i != heartbeat_to.end();
        i++) {
     int peer = i->first;
-    if (heartbeat_inst.count(peer)) {
+    if (heartbeat_con.count(peer)) {
       my_stat_on_peer[peer] = my_stat;
       dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
       Message *m = new MOSDPing(osdmap->get_fsid(),
@@ -1716,7 +1751,7 @@ void OSD::heartbeat()
                                my_stat);
       m->set_priority(CEPH_MSG_PRIO_HIGH);
       dout(30) << "heartbeat sending ping to osd" << peer << dendl;
-      heartbeat_messenger->send_message(m, heartbeat_inst[peer]);
+      heartbeat_messenger->send_message(m, heartbeat_con[peer]);
     }
   }
 
@@ -2361,13 +2396,6 @@ void OSD::forget_peer_epoch(int peer, epoch_t as_of)
   }
 }
 
-/*
- * share incremental maps with peers. it's possible we could send
- *   incremental 3->4 on cluster msgr
- *   incremental 4->5 on heartbeat msgr
- * and the second incremental would arrive first. who cares.  the peer
- * will figure it out sooner or later.
- */
 
 bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch,
                              Session* session)
@@ -2432,8 +2460,10 @@ void OSD::_share_map_outgoing(const entity_inst_t& inst)
     if (pe < osdmap->get_epoch()) {
       send_incremental_map(pe, inst);
       note_peer_epoch(peer, osdmap->get_epoch());
-    }
+    } else
+      dout(20) << "_share_map_outgoing " << inst << " already has epoch " << pe << dendl;
   } else {
+    dout(20) << "_share_map_outgoing " << inst << " don't know epoch, doing nothing" << dendl;
     // no idea about peer's epoch.
     // ??? send recent ???
     // do nothing.
@@ -3392,11 +3422,8 @@ void OSD::activate_map(ObjectStore::Transaction& t, list<Context*>& tfin)
 }
 
 
-void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
+MOSDMap *OSD::build_incremental_map_msg(epoch_t since)
 {
-  dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
-           << " to " << inst << dendl;
-  
   MOSDMap *m = new MOSDMap(monc->get_fsid());
   
   for (epoch_t e = osdmap->get_epoch();
@@ -3413,6 +3440,15 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la
       assert(0);  // we should have all maps.
     }
   }
+  return m;
+}
+
+void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy)
+{
+  dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch()
+           << " to " << inst << dendl;
+  
+  MOSDMap *m = build_incremental_map_msg(since);
   Messenger *msgr = client_messenger;
   if (entity_name_t::TYPE_OSD == inst.name._type)
     msgr = cluster_messenger;
index 5fdd78e35a3de190ae3387fa81ce1e3613a8eda1..8572f0693f9da73a36afaeba07a383740771db48 100644 (file)
@@ -238,7 +238,7 @@ private:
   epoch_t heartbeat_epoch;
   map<int, epoch_t> heartbeat_to, heartbeat_from;
   map<int, utime_t> heartbeat_from_stamp;
-  map<int, entity_inst_t> heartbeat_inst;
+  map<int, Connection*> heartbeat_con;
   utime_t last_mon_heartbeat;
   Messenger *heartbeat_messenger;
   
@@ -491,6 +491,7 @@ private:
   bool get_inc_map_bl(epoch_t e, bufferlist& bl);
   bool get_inc_map(epoch_t e, OSDMap::Incremental &inc);
   
+  MOSDMap *build_incremental_map_msg(epoch_t since);
   void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
 
 protected:
index 845c5fc1ec36090b4534073c808f746a7ac8880a..887c7fedbacd3066dd7807daec4664f16a769adc 100755 (executable)
@@ -161,7 +161,7 @@ else
         debug ms = 1'
     COSDDEBUG='
         lockdep = 1
-        debug ms = 1
+        debug ms = 20
         debug osd = 25
         debug monc = 20
         debug journal = 20