]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: restructure heartbeats
authorSage Weil <sage@newdream.net>
Thu, 4 Aug 2011 20:35:57 +0000 (13:35 -0700)
committerSage Weil <sage@newdream.net>
Thu, 4 Aug 2011 20:35:57 +0000 (13:35 -0700)
Primary sends explicit message to replicas asking for heartbeats to start,
or to stop.  Replicas never send heartbeats unless explicitly requested
to.

Do not try to mark down old heartbeat relationships; that gets messy, and
the overhead of open sockets is too small to worry about right now.

Signed-off-by: Sage Weil <sage@newdream.net>
src/cosd.cc
src/messages/MOSDPing.h
src/osd/OSD.cc
src/osd/OSD.h

index 0aae6d9c2be82661dd515eede598e2b0c6b42b96..0ded0fed00cc5a16ddedee773de0260e408a0c7e 100644 (file)
@@ -223,7 +223,8 @@ int main(int argc, const char **argv)
 
   SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context);
   SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context);
-  SimpleMessenger *messenger_hb = new SimpleMessenger(g_ceph_context);
+  SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context);
+  SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context);
 
   client_messenger->bind(g_conf->public_addr, getpid());
   cluster_messenger->bind(g_conf->cluster_addr, getpid());
@@ -232,7 +233,7 @@ int main(int argc, const char **argv)
   entity_addr_t hb_addr = g_conf->cluster_addr;
   if (!hb_addr.is_blank_ip())
     hb_addr.set_port(0);
-  messenger_hb->bind(hb_addr, getpid());
+  messenger_hbout->bind(hb_addr, getpid());
 
   cout << "starting osd" << whoami
        << " at " << client_messenger->get_ms_addr() 
@@ -243,7 +244,8 @@ int main(int argc, const char **argv)
 
   client_messenger->register_entity(entity_name_t::OSD(whoami));
   cluster_messenger->register_entity(entity_name_t::OSD(whoami));
-  messenger_hb->register_entity(entity_name_t::OSD(whoami));
+  messenger_hbin->register_entity(entity_name_t::OSD(whoami));
+  messenger_hbout->register_entity(entity_name_t::OSD(whoami));
 
   Throttle client_throttler(g_conf->osd_client_message_size_cap);
 
@@ -278,7 +280,9 @@ int main(int argc, const char **argv)
     return -1;
   global_init_chdir(g_ceph_context);
 
-  OSD *osd = new OSD(whoami, cluster_messenger, client_messenger, messenger_hb, &mc,
+  OSD *osd = new OSD(whoami, cluster_messenger, client_messenger,
+                    messenger_hbin, messenger_hbout,
+                    &mc,
                     g_conf->osd_data, g_conf->osd_journal);
   int err = osd->pre_init();
   if (err < 0) {
@@ -291,7 +295,8 @@ int main(int argc, const char **argv)
   global_init_shutdown_stderr(g_ceph_context);
 
   client_messenger->start();
-  messenger_hb->start();
+  messenger_hbin->start_with_nonce(getpid());
+  messenger_hbout->start();
   cluster_messenger->start();
 
   // start osd
@@ -303,13 +308,15 @@ int main(int argc, const char **argv)
   }
 
   client_messenger->wait();
-  messenger_hb->wait();
+  messenger_hbin->wait();
+  messenger_hbout->wait();
   cluster_messenger->wait();
 
   // done
   delete osd;
   client_messenger->destroy();
-  messenger_hb->destroy();
+  messenger_hbin->destroy();
+  messenger_hbout->destroy();
   cluster_messenger->destroy();
 
   // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
index e66507199c30f8e0c6b2e864f28eb082665d3dbe..49979fcd2ac6361ee498b6c6e556ec5fc0960317 100644 (file)
@@ -25,13 +25,15 @@ class MOSDPing : public Message {
  public:
   enum {
     HEARTBEAT = 0,
-    REQUEST_HEARTBEAT = 1,
+    START_HEARTBEAT = 1,
     YOU_DIED = 2,
+    STOP_HEARTBEAT = 3,
   };
   const char *get_op_name(int op) {
     switch (op) {
     case HEARTBEAT: return "heartbeat";
-    case REQUEST_HEARTBEAT: return "request_heartbeat";
+    case START_HEARTBEAT: return "start_heartbeat";
+    case STOP_HEARTBEAT: return "stop_heartbeat";
     case YOU_DIED: return "you_died";
     default: return "???";
     }
@@ -69,8 +71,7 @@ public:
 
   const char *get_type_name() { return "osd_ping"; }
   void print(ostream& out) {
-    out << "osd_ping(e" << map_epoch << " as_of " << peer_as_of_epoch
-       << " " << get_op_name(op) << ")";
+    out << "osd_ping(" << get_op_name(op) << " e" << map_epoch << " as_of " << peer_as_of_epoch << ")";
   }
 };
 
index de9fec2a97e5cf93a1b9d692dd0746767ef82e81..36a24ad24e8bd5268448c777967d1b6667727f67 100644 (file)
@@ -401,18 +401,18 @@ int OSD::peek_meta(const std::string &dev, std::string& magic,
 // cons/des
 
 OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
-        Messenger *hbm, MonClient *mc,
+        Messenger *hbinm, Messenger *hboutm, MonClient *mc,
         const std::string &dev, const std::string &jdev) :
-  Dispatcher(hbm->cct),
+  Dispatcher(external_messenger->cct),
   osd_lock("OSD::osd_lock"),
-  timer(hbm->cct, osd_lock),
+  timer(external_messenger->cct, osd_lock),
   cluster_messenger(internal_messenger),
   client_messenger(external_messenger),
   monc(mc),
   logger(NULL),
   store(NULL),
   map_in_progress(false),
-  clog(hbm->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+  clog(external_messenger->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
   whoami(id),
   dev_path(dev), journal_path(jdev),
   dispatch_running(false),
@@ -420,12 +420,13 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
             ceph_osd_feature_ro_compat,
             ceph_osd_feature_incompat),
   state(STATE_BOOTING), boot_epoch(0), up_epoch(0),
-  op_tp(hbm->cct, "OSD::op_tp", g_conf->osd_op_threads),
-  recovery_tp(hbm->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
-  disk_tp(hbm->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
+  op_tp(external_messenger->cct, "OSD::op_tp", g_conf->osd_op_threads),
+  recovery_tp(external_messenger->cct, "OSD::recovery_tp", g_conf->osd_recovery_threads),
+  disk_tp(external_messenger->cct, "OSD::disk_tp", g_conf->osd_disk_threads),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false), heartbeat_epoch(0),
-  heartbeat_messenger(hbm),
+  hbin_messenger(hbinm),
+  hbout_messenger(hboutm),
   heartbeat_thread(this),
   heartbeat_dispatcher(this),
   stat_lock("OSD::stat_lock"),
@@ -454,7 +455,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   rep_scrub_wq(this, g_conf->osd_scrub_thread_timeout, &disk_tp),
   remove_wq(this, g_conf->osd_remove_thread_timeout, &disk_tp),
   watch_lock("OSD::watch_lock"),
-  watch_timer(hbm->cct, watch_lock)
+  watch_timer(external_messenger->cct, watch_lock)
 {
   monc->set_messenger(client_messenger);
 
@@ -584,7 +585,8 @@ int OSD::init()
   client_messenger->add_dispatcher_head(&clog);
   cluster_messenger->add_dispatcher_head(this);
 
-  heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+  hbin_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+  hbout_messenger->add_dispatcher_head(&heartbeat_dispatcher);
 
   monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
   monc->init();
@@ -794,8 +796,8 @@ int OSD::shutdown()
 
   client_messenger->shutdown();
   cluster_messenger->shutdown();
-  if (heartbeat_messenger)
-    heartbeat_messenger->shutdown();
+  hbin_messenger->shutdown();
+  hbout_messenger->shutdown();
 
   monc->shutdown();
 
@@ -1308,62 +1310,28 @@ void OSD::update_heartbeat_peers()
   assert(osd_lock.is_locked());
   heartbeat_lock.Lock();
 
-  /*
-  for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
-    if (heartbeat_inst.count(p->first) == 0)
-      dout(0) << " no inst for _to " << p->first << dendl;
-  for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
-    if (heartbeat_inst.count(p->first) == 0)
-      dout(0) << " no inst for _from " << p->first << dendl;
-  */
-
   // filter heartbeat_from_stamp to only include osds that remain in
   // heartbeat_from.
   map<int, utime_t> old_from_stamp;
   old_from_stamp.swap(heartbeat_from_stamp);
 
-  map<int, epoch_t> old_to, old_from;
+  map<int, epoch_t> old_from;
   map<int, Connection*> old_con;
-  old_to.swap(heartbeat_to);
   old_from.swap(heartbeat_from);
-  old_con.swap(heartbeat_con);
+  old_con.swap(heartbeat_from_con);
 
   utime_t now = ceph_clock_now(g_ceph_context);
 
   heartbeat_epoch = osdmap->get_epoch();
 
-  // grandfather newer _to peers
-  for (map<int,epoch_t>::iterator p = old_to.begin();
-       p != old_to.end();
-       p++) {
-    if (p->second > osdmap->get_epoch()) {
-      dout(10) << "update_heartbeat_peers: keeping newer _to peer osd" << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " as of " << p->second << dendl;
-      heartbeat_to[p->first] = p->second;
-      heartbeat_con[p->first] = old_con[p->first];
-    }
-  }
-
-  // build heartbeat to/from set
+  // build heartbeat from set
   for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
        i != pg_map.end();
        i++) {
     PG *pg = i->second;
 
     // replicas ping primary.
-    if (pg->get_role() > 0) {
-      assert(pg->acting.size() > 1);
-      int p = pg->acting[0];
-      if (heartbeat_to.count(p))
-       continue;
-      heartbeat_to[p] = osdmap->get_epoch();
-      heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
-      if (old_to.count(p) == 0 || old_con[p] != heartbeat_con[p])
-       dout(10) << "update_heartbeat_peers: new _to osd" << p
-                << " " << heartbeat_con[p]->get_peer_addr() << dendl;
-    }
-    else if (pg->get_role() == 0) {
+    if (pg->get_role() == 0) {
       assert(pg->acting[0] == whoami);
       for (unsigned i=1; i<pg->acting.size(); i++) {
        int p = pg->acting[i]; // peer
@@ -1371,60 +1339,33 @@ void OSD::update_heartbeat_peers()
        if (heartbeat_from.count(p))
          continue;
        heartbeat_from[p] = osdmap->get_epoch();
-       if (!heartbeat_con.count(p)) {
-         // Don't update _con, might be from a newer map
-         heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p));
-       }
-       if (old_from_stamp.count(p) && old_from.count(p) &&
-           old_con[p] == heartbeat_con[p]) {
+       Connection *con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
+       heartbeat_from_con[p] = con;
+       if (old_from_stamp.count(p) && old_from.count(p) && old_con[p] == con) {
          // have a stamp _AND_ i'm not new to the set
          heartbeat_from_stamp[p] = old_from_stamp[p];
        } else {
          dout(10) << "update_heartbeat_peers: new _from osd" << p
-                  << " " << heartbeat_con[p]->get_peer_addr() << dendl;
+                  << " " << con->get_peer_addr() << dendl;
          heartbeat_from_stamp[p] = now;  
          MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch,
-                                    MOSDPing::REQUEST_HEARTBEAT);
-         heartbeat_messenger->send_message(m, heartbeat_con[p]);
+                                    MOSDPing::START_HEARTBEAT);
+         hbin_messenger->send_message(m, con);
        }
       }
     }
   }
 
-  map<int, Connection*> down;
-
-  for (map<int,epoch_t>::iterator p = old_to.begin();
-       p != old_to.end();
-       p++) {
-    assert(old_con.count(p->first));
-    if (heartbeat_to.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
-      continue;
-    assert(p->second <= osdmap->get_epoch());
-
-    // share latest map with this peer so they know not to expect
-    // heartbeats from us.  otherwise they may mark us down!
-    if (osdmap->is_up(p->first) && !is_booting()) {
-      dout(10) << "update_heartbeat_peers: sharing map with old _to peer osd" << p->first << dendl;
-      _share_map_outgoing(osdmap->get_cluster_inst(p->first));
-    }
-
-    if (heartbeat_from.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
-      dout(10) << "update_heartbeat_peers: old _to peer osd" << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " is still a _from peer, not marking down" << dendl;
-    } else {
-      dout(10) << "update_heartbeat_peers: will mark down old _to peer osd" << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " as of " << p->second << dendl;
-      down[p->first] = old_con[p->first];
-    }
-  }
   for (map<int,epoch_t>::iterator p = old_from.begin();
        p != old_from.end();
        p++) {
     assert(old_con.count(p->first));
-    if (heartbeat_from.count(p->first) && heartbeat_con[p->first] == old_con[p->first])
+    Connection *con = old_con[p->first];
+
+    if (heartbeat_from.count(p->first) && heartbeat_from_con[p->first] == con) {
+      con->put();
       continue;
+    }
 
     // share latest map with this peer, just to be nice.
     if (osdmap->is_up(p->first) && !is_booting()) {
@@ -1432,46 +1373,33 @@ void OSD::update_heartbeat_peers()
       _share_map_outgoing(osdmap->get_cluster_inst(p->first));
     }
 
-    if (heartbeat_to.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) {
-      dout(10) << "update_heartbeat_peers: old _from peer osd" << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " is still a _to peer, not marking down" << dendl;
-    } else {
-      dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " as of " << p->second << dendl;
-      down[p->first] = old_con[p->first];
-    }
-  }
-  for (map<int, Connection*>::iterator p = down.begin(); p != down.end(); ++p) {
-    Connection *con = p->second;
-    heartbeat_messenger->mark_disposable(con);
+    dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first
+            << " " << con->get_peer_addr()
+            << " as of " << p->second << dendl;
+    
     if (!osdmap->is_up(p->first) && !is_booting()) {
       dout(10) << "update_heartbeat_peers: telling old peer osd" << p->first
               << " " << old_con[p->first]->get_peer_addr()
               << " they are down" << dendl;
-      heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
-                                                    heartbeat_epoch,
-                                                    MOSDPing::YOU_DIED), con);
+      hbin_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch,
+                                               heartbeat_epoch,
+                                               MOSDPing::YOU_DIED), con);
+      hbin_messenger->mark_disposable(con);
+      hbin_messenger->mark_down_on_empty(con);
+    } else {
+      // tell them to stop sending heartbeats
+      hbin_messenger->send_message(new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch,
+                                               MOSDPing::STOP_HEARTBEAT), con);
     }
-    heartbeat_messenger->mark_down_on_empty(con);
-    con->put();
-    if (!osdmap->is_up(p->first))
+    if (!osdmap->is_up(p->first)) {
       forget_peer_epoch(p->first, osdmap->get_epoch());
+    }
+    con->put();
   }
 
   dout(10) << "update_heartbeat_peers: hb   to: " << heartbeat_to << dendl;
   dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
 
-  /*
-  for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
-    if (heartbeat_inst.count(p->first) == 0)
-      dout(0) << " no inst for _to " << p->first << dendl;
-  for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
-    if (heartbeat_inst.count(p->first) == 0)
-      dout(0) << " no inst for _from " << p->first << dendl;
-  */
-
   heartbeat_lock.Unlock();
 }
 
@@ -1482,9 +1410,13 @@ void OSD::reset_heartbeat_peers()
   heartbeat_to.clear();
   heartbeat_from.clear();
   heartbeat_from_stamp.clear();
-  while (!heartbeat_con.empty()) {
-    heartbeat_con.begin()->second->put();
-    heartbeat_con.erase(heartbeat_con.begin());
+  while (!heartbeat_to_con.empty()) {
+    heartbeat_to_con.begin()->second->put();
+    heartbeat_to_con.erase(heartbeat_to_con.begin());
+  }
+  while (!heartbeat_from_con.empty()) {
+    heartbeat_from_con.begin()->second->put();
+    heartbeat_from_con.erase(heartbeat_from_con.begin());
   }
   failure_queue.clear();
   heartbeat_lock.Unlock();
@@ -1502,52 +1434,73 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
   heartbeat_lock.Lock();
   int from = m->get_source().num();
-
   bool locked = map_lock.try_get_read();
   
-  // ignore (and mark down connection for) old messages 
-  epoch_t e = m->map_epoch;
-  if (!e)
-    e = m->peer_as_of_epoch;
-  if (e <= osdmap->get_epoch() &&
-      ((heartbeat_to.count(from) == 0 && heartbeat_from.count(from) == 0) ||
-       heartbeat_con[from] != m->get_connection())) {
-    dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
-           << " after old message from epoch " << e
-           << " <= current " << osdmap->get_epoch() << dendl;
-    heartbeat_messenger->mark_down(m->get_connection());
-    goto out;
-  } 
-
   switch (m->op) {
-  case MOSDPing::REQUEST_HEARTBEAT:
-    if (m->peer_as_of_epoch <= osdmap->get_epoch()) {
-      dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
-             << " request for heartbeats as_of " << m->peer_as_of_epoch
-             << " <= current " << osdmap->get_epoch() << dendl;
-    } else if (heartbeat_to.count(from) && m->peer_as_of_epoch <= heartbeat_to[from]) {
-      dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst()
-             << " request for heartbeats as_of " << m->peer_as_of_epoch
-             << " <= current _to as_of " << heartbeat_to[from] << dendl;
+  case MOSDPing::START_HEARTBEAT:
+    if (heartbeat_to.count(from)) {
+      if (heartbeat_to_con[from] != m->get_connection()) {
+       // different connection
+       if (heartbeat_to[from] > m->peer_as_of_epoch) {
+         dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
+                 << " after old start message from epoch " << m->peer_as_of_epoch
+                 << " <= current " << heartbeat_to[from] << dendl;
+         hbout_messenger->mark_down(m->get_connection());
+       } else {
+         dout(5) << "handle_osd_ping replacing old peer "
+                 << heartbeat_to_con[from]->get_peer_addr()
+                 << " epoch " << heartbeat_to[from]
+                 << " with new peer " << m->get_source_inst()
+                 << " epoch " << m->peer_as_of_epoch
+                 << dendl;
+         hbout_messenger->mark_down(heartbeat_to_con[from]);
+         heartbeat_to_con[from]->put();
+         
+         // remember new connection
+         heartbeat_to[from] = m->peer_as_of_epoch;
+         heartbeat_to_con[from] = m->get_connection();
+         heartbeat_to_con[from]->get();
+       }
+      } else {
+       // same connection
+       dout(5) << "handle_osd_ping dup peer " << m->get_source_inst()
+               << " request for heartbeats as_of " << m->peer_as_of_epoch
+               << ", same connection" << dendl;
+       heartbeat_to[from] = m->peer_as_of_epoch;
+      }
     } else {
+      // new
       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
              << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
       heartbeat_to[from] = m->peer_as_of_epoch;
-      if (heartbeat_con.count(from))
-       heartbeat_con[from]->put();
-      heartbeat_con[from] = m->get_connection();
-      heartbeat_con[from]->get();
+      heartbeat_to_con[from] = m->get_connection();
+      heartbeat_to_con[from]->get();
       
-      if (locked && m->map_epoch && !is_booting())
+      if (locked && m->map_epoch && !is_booting()) {
        _share_map_incoming(m->get_source_inst(), m->map_epoch,
                            (Session*) m->get_connection()->get_priv());
+      }
     }
     break;
 
-  case MOSDPing::HEARTBEAT:
-    if (heartbeat_from.count(from) &&
-       heartbeat_con[from] == m->get_connection()) {
+  case MOSDPing::STOP_HEARTBEAT:
+    {
+      if (heartbeat_to.count(from) && heartbeat_to_con[from] == m->get_connection()) {
+       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
+               << " stopping heartbeats as_of " << m->peer_as_of_epoch << dendl;
+       heartbeat_to.erase(from);
+       //hbout_messenger->mark_down(heartbeat_to_con[from]);
+       heartbeat_to_con[from]->put();
+       heartbeat_to_con.erase(from);
+      } else {
+       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
+               << " ignoring stop request as_of " << m->peer_as_of_epoch << dendl;
+      }
+    }
+    break;
 
+  case MOSDPing::HEARTBEAT:
+    if (heartbeat_from.count(from) && heartbeat_from_con[from] == m->get_connection()) {
       dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl;
 
       note_peer_epoch(from, m->map_epoch);
@@ -1575,7 +1528,6 @@ void OSD::handle_osd_ping(MOSDPing *m)
     break;
   }
 
-out:
   if (locked) 
     map_lock.put_read();
 
@@ -1659,15 +1611,13 @@ void OSD::heartbeat()
        i != heartbeat_to.end();
        i++) {
     int peer = i->first;
-    if (heartbeat_con.count(peer)) {
-      dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
-      Message *m = new MOSDPing(osdmap->get_fsid(),
-                               map_locked ? osdmap->get_epoch():0, 
-                               i->second, MOSDPing::HEARTBEAT);
-      m->set_priority(CEPH_MSG_PRIO_HIGH);
-      dout(30) << "heartbeat sending ping to osd" << peer << dendl;
-      heartbeat_messenger->send_message(m, heartbeat_con[peer]);
-    }
+    dout(30) << "heartbeat allocating ping for osd" << peer << dendl;
+    Message *m = new MOSDPing(osdmap->get_fsid(),
+                             map_locked ? osdmap->get_epoch():0, 
+                             i->second, MOSDPing::HEARTBEAT);
+    m->set_priority(CEPH_MSG_PRIO_HIGH);
+    dout(30) << "heartbeat sending ping to osd" << peer << dendl;
+    hbout_messenger->send_message(m, heartbeat_to_con[peer]);
   }
 
   if (map_locked) {
@@ -1958,12 +1908,12 @@ void OSD::send_boot()
     cluster_messenger->set_ip(cluster_addr);
     dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
   }
-  entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
+  entity_addr_t hb_addr = hbout_messenger->get_myaddr();
   if (hb_addr.is_blank_ip()) {
     int port = hb_addr.get_port();
     hb_addr = cluster_addr;
     hb_addr.set_port(port);
-    heartbeat_messenger->set_ip(hb_addr);
+    hbout_messenger->set_ip(hb_addr);
     dout(10) << " assuming hb_addr ip matches cluster_addr" << dendl;
   }
   MOSDBoot *mboot = new MOSDBoot(superblock, hb_addr, cluster_addr);
@@ -2866,7 +2816,17 @@ void OSD::note_down_osd(int peer)
 
   heartbeat_lock.Lock();
 
-  // note: update_heartbeat_peers will mark down the heartbeat connection.
+  // clean out down osds i am sending heartbeats _to_.
+  // update_heartbeat_peers() will clean out peers i expect heartbeast _from_.
+  if (heartbeat_to.count(peer) &&
+      heartbeat_to[peer] < osdmap->get_epoch()) {
+    dout(10) << "note_down_osd osd" << peer << " marking down hbout connection "
+            << heartbeat_to_con[peer]->get_peer_addr() << dendl;
+    hbout_messenger->mark_down(heartbeat_to_con[peer]);
+    heartbeat_to_con[peer]->put();
+    heartbeat_to_con.erase(peer);
+    heartbeat_to.erase(peer);
+  }
 
   failure_queue.erase(peer);
   failure_pending.erase(peer);
@@ -3100,7 +3060,7 @@ void OSD::handle_osd_map(MOSDMap *m)
     } else if (!osdmap->is_up(whoami) ||
               !osdmap->get_addr(whoami).probably_equals(client_messenger->get_myaddr()) ||
               !osdmap->get_cluster_addr(whoami).probably_equals(cluster_messenger->get_myaddr()) ||
-              !osdmap->get_hb_addr(whoami).probably_equals(heartbeat_messenger->get_myaddr())) {
+              !osdmap->get_hb_addr(whoami).probably_equals(hbout_messenger->get_myaddr())) {
       if (!osdmap->is_up(whoami))
        clog.warn() << "map e" << osdmap->get_epoch()
                    << " wrongly marked me down or wrong addr";
@@ -3112,23 +3072,23 @@ void OSD::handle_osd_map(MOSDMap *m)
        clog.warn() << "map e" << osdmap->get_epoch()
                    << " had wrong client addr (" << osdmap->get_cluster_addr(whoami)
                    << " != my " << cluster_messenger->get_myaddr();
-      else if (osdmap->get_hb_addr(whoami).probably_equals(heartbeat_messenger->get_myaddr()))
+      else if (osdmap->get_hb_addr(whoami).probably_equals(hbout_messenger->get_myaddr()))
        clog.warn() << "map e" << osdmap->get_epoch()
                    << " had wrong client addr (" << osdmap->get_hb_addr(whoami)
-                   << " != my " << heartbeat_messenger->get_myaddr();
+                   << " != my " << hbout_messenger->get_myaddr();
       
       state = STATE_BOOTING;
       up_epoch = 0;
       do_restart = true;
 
       int cport = cluster_messenger->get_myaddr().get_port();
-      int hbport = heartbeat_messenger->get_myaddr().get_port();
+      int hbport = hbout_messenger->get_myaddr().get_port();
 
       int r = cluster_messenger->rebind(hbport);
       if (r != 0)
        do_shutdown = true;  // FIXME: do_restart?
 
-      r = heartbeat_messenger->rebind(cport);
+      r = hbout_messenger->rebind(cport);
       if (r != 0)
        do_shutdown = true;  // FIXME: do_restart?
 
index 9aee7a19869aedfeb0f7efb1ca9e5473830b8bc1..b2bb3ec5e52e2ef5938e9f799ec605a6826d192f 100644 (file)
@@ -248,9 +248,9 @@ private:
   epoch_t heartbeat_epoch;
   map<int, epoch_t> heartbeat_to, heartbeat_from;
   map<int, utime_t> heartbeat_from_stamp;
-  map<int, Connection*> heartbeat_con;
+  map<int, Connection*> heartbeat_to_con, heartbeat_from_con;
   utime_t last_mon_heartbeat;
-  Messenger *heartbeat_messenger;
+  Messenger *hbin_messenger, *hbout_messenger;
   
   void update_heartbeat_peers();
   void reset_heartbeat_peers();
@@ -964,8 +964,8 @@ protected:
  public:
   /* internal and external can point to the same messenger, they will still
    * be cleaned up properly*/
-  OSD(int id, Messenger *internal, Messenger *external, Messenger *hbm, MonClient *mc,
-      const std::string &dev, const std::string &jdev);
+  OSD(int id, Messenger *internal, Messenger *external, Messenger *hbmin, Messenger *hbmout,
+      MonClient *mc, const std::string &dev, const std::string &jdev);
   ~OSD();
 
   // static bits