]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr policies; osds now batch and retry failure reports
authorSage Weil <sage@newdream.net>
Tue, 8 Apr 2008 23:26:32 +0000 (16:26 -0700)
committerSage Weil <sage@newdream.net>
Tue, 8 Apr 2008 23:26:32 +0000 (16:26 -0700)
src/cmds.cc
src/config.cc
src/config.h
src/cosd.cc
src/csyn.cc
src/mon/PGMap.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.cc
src/osd/OSD.h

index 15eb09f2fa5718bfd1508c29f8422a46838bdea2..5b01edd9d30c6bd64b21df6659a1e2b631881fd5 100644 (file)
@@ -67,6 +67,9 @@ int main(int argc, const char **argv)
   rank.bind();
   cout << "starting mds? at " << rank.get_rank_addr() << std::endl;
   rank.start();
+  
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+  //rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
 
   // start mds
   Messenger *m = rank.register_entity(entity_name_t::MDS(whoami));
index a26e26c3da2270975b49eb3bfd1977305a802bea..db830b27fbad3cfbb3e1db4b1a5f5cd185273131 100644 (file)
@@ -364,6 +364,7 @@ md_config_t g_conf = {
   osd_age_time: 0,
   osd_heartbeat_interval: 1,
   osd_heartbeat_grace: 30,
+  osd_failure_report_interval: 10,
   osd_pg_stats_interval:  5,
   osd_replay_window: 5,
   osd_max_pull: 2,
index 563b453d93d6c4d002dfdba9696637f7b805b8f6..5ea9005e9659aaa29cba1edab199f96091a37060 100644 (file)
@@ -265,6 +265,7 @@ struct md_config_t {
   int   osd_age_time;
   int   osd_heartbeat_interval;  
   int   osd_heartbeat_grace;
+  double osd_failure_report_interval;
   int   osd_pg_stats_interval;
   int   osd_replay_window;
   int   osd_max_pull;
index ead528829185420333823dd2f8d921c201272172..56aa8446746bf22d0218ad5308b6334a442549a0 100644 (file)
@@ -117,6 +117,9 @@ int main(int argc, const char **argv)
 
   rank.start();
 
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+
   // start osd
   Messenger *m = rank.register_entity(entity_name_t::OSD(whoami));
   assert(m);
index 74efd8b6700774884b6fe94b15408b049714a575..0980acf51c0e2058dff5e8f06761bfc5dd32871b 100644 (file)
@@ -59,13 +59,10 @@ int main(int argc, const char **argv, char *envp[]) {
   cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
   rank.start();
 
-  Rank::Policy client_policy;
-  client_policy.fail_interval = 0;
-  client_policy.drop_msg_callback = false;
-  rank.set_policy(entity_name_t::TYPE_CLIENT, client_policy);
-  rank.set_policy(entity_name_t::TYPE_MON, client_policy);
-  rank.set_policy(entity_name_t::TYPE_MDS, client_policy);
-  rank.set_policy(entity_name_t::TYPE_OSD, client_policy);
+  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
 
   list<Client*> clients;
   list<SyntheticClient*> synclients;
index fda824f0e70d523add2f7e6519546caa287350ca..dbf4a51d947b1dea11b8f445fccebedefe362778 100644 (file)
@@ -151,10 +151,12 @@ public:
   void _encode(bufferlist &bl) {
     ::_encode(version, bl);
     ::_encode(pg_stat, bl);
+    ::_encode(osd_stat, bl);
   }
   void _decode(bufferlist& bl, int& off) {
     ::_decode(version, bl, off);
     ::_decode(pg_stat, bl, off);
+    ::_decode(osd_stat, bl, off);
     stat_zero();
     for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
         p != pg_stat.end();
index d339af9239bbc31c25590193e3088ed63bedf6da..9ac20722ab08ee8ed88e0427c1b37c18c165c30e 100644 (file)
@@ -1169,6 +1169,12 @@ void Rank::Pipe::fault(bool onconnect)
   ::close(sd);
   sd = -1;
 
+  // lossy channel?
+  if (policy.retry_interval == 0) {
+    fail();
+    return;
+  }
+
   if (q.empty()) {
     if (state == STATE_CLOSING || onconnect) {
       dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
index 06eb75f915a311147d67d62e020e56c7b2d432fc..15bf85d1ed95e651a5fa36c1f8089a3408d3a4f3 100644 (file)
@@ -41,8 +41,8 @@ using namespace __gnu_cxx;
 class Rank {
 public:
   struct Policy {
-    float retry_interval;               // (initial)
-    float fail_interval;                // before we call ms_handle_failure
+    float retry_interval;               // (initial).  0 => lossy channel, fail immediately.
+    float fail_interval;                // before we call ms_handle_failure  0 => retry forever.
     bool drop_msg_callback;
     bool fail_callback;
     bool remote_reset_callback;
@@ -52,6 +52,16 @@ public:
       drop_msg_callback(true),
       fail_callback(true),
       remote_reset_callback(true) {}
+
+    Policy(float r, float f, bool dmc, bool fc, bool rrc) :
+      retry_interval(r), fail_interval(f),
+      drop_msg_callback(dmc),
+      fail_callback(fc),
+      remote_reset_callback(rrc) {}
+
+    static Policy fast_fail() { return Policy(0, 0, true, true, true); }
+    static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
+    static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, 0, false, true, true); }
   };
 
 
index 4775ba9d5ecd31247dd07d5d633513a6862786df..1b0304aecf403f3bfc10f75657ea0a0adbbf0a34 100644 (file)
@@ -882,14 +882,12 @@ void OSD::heartbeat()
       if (heartbeat_from_stamp[*p] < grace) {
        dout(0) << "no heartbeat from osd" << *p << " since " << heartbeat_from_stamp[*p]
                << " (cutoff " << grace << ")" << dendl;
-       int mon = monmap->pick_mon();
-       messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), osdmap->get_inst(*p), osdmap->get_epoch()),
-                               monmap->get_inst(mon));
+       queue_failure(*p);
       }
     } else
       heartbeat_from_stamp[*p] = now;  // fake initial
   }
-
+  maybe_report_failures();
 
   if (logger) logger->set("hbto", heartbeat_to.size());
   if (logger) logger->set("hbfrom", heartbeat_from.size());
@@ -920,7 +918,26 @@ void OSD::heartbeat()
   timer.add_event_after(wait, new C_Heartbeat(this));
 }
 
+void OSD::maybe_report_failures()
+{
+  if (pending_failures.empty())
+    return;  // nothing to report
+
+  utime_t now = g_clock.now();
+  if (last_failure_report + g_conf.osd_failure_report_interval > now)
+    return;  // not yet
+
+  last_failure_report = now;
 
+  int mon = monmap->pick_mon();
+  for (set<int>::iterator p = pending_failures.begin();
+       p != pending_failures.end();
+       p++)
+    messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), 
+                                           osdmap->get_inst(*p), osdmap->get_epoch()),
+                           monmap->get_inst(mon));
+  pending_failures.clear();
+}
 
 void OSD::send_pg_stats()
 {
@@ -1152,39 +1169,22 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
     return;
   }
 
-  if (dest.is_osd()) {
-    // failed osd.  drop message, report to mon.
-    if (!osdmap->have_inst(dest.num()) ||
-       (osdmap->get_inst(dest.num()) != inst)) {
-      dout(1) << "ms_handle_failure " << inst 
-             << ", already dropped/changed in osdmap, dropping " << *m
-             << dendl;
-    } else {
-      int mon = monmap->pick_mon();
-      dout(1) << "ms_handle_failure " << inst 
-             << ", dropping and reporting to mon" << mon 
-             << " " << *m
-             << dendl;
-      messenger->send_message(new MOSDFailure(monmap->fsid, messenger->get_myinst(), inst, 
-                                             osdmap->get_epoch()),
-                             monmap->get_inst(mon));
+  if (dest.is_mon()) {
+    if (m->get_type() == MSG_PGSTATS) {
+      MPGStats *pgstats = (MPGStats*)m;
+      dout(10) << "ms_handle_failure on " << *m << ", requeuing pg stats" << dendl;
+      pg_stat_queue_lock.Lock();
+      for (map<pg_t,pg_stat_t>::iterator p = pgstats->pg_stat.begin(); 
+          p != pgstats->pg_stat.end(); 
+          p++)
+       pg_stat_queue.insert(p->first);
+      pg_stat_queue_lock.Unlock();
     }
-    delete m;
-  } else if (dest.is_mon()) {
-    // resend to a different monitor.
-    int mon = monmap->pick_mon(true);
-    dout(1) << "ms_handle_failure " << inst 
-            << ", resending to mon" << mon 
-           << " " << *m
-            << dendl;
-    messenger->send_message(m, monmap->get_inst(mon));
-  }
-  else {
-    // client?
-    dout(1) << "ms_handle_failure " << inst 
-            << ", dropping " << *m << dendl;
-    delete m;
   }
+
+  dout(1) << "ms_handle_failure " << inst 
+         << ", dropping " << *m << dendl;
+  delete m;
 }
 
 
@@ -1342,6 +1342,7 @@ void OSD::handle_osd_map(MOSDMap *m)
            i++) {
         int osd = i->first;
         if (osd == whoami) continue;
+       pending_failures.erase(i->first);
         messenger->mark_down(osdmap->get_addr(i->first));
         peer_map_epoch.erase(entity_name_t::OSD(i->first));
       }
index 0b5814b3ef0bfbdae2f4ff554576c86511336b8b..3b75881432c4ac8e6806392817741c8b4426f2af 100644 (file)
@@ -331,6 +331,14 @@ private:
   void send_pg_stats(); 
 
 
+  // -- failures --
+  set<int> pending_failures;
+  utime_t last_failure_report;
+  void queue_failure(int n) {
+    pending_failures.insert(n);
+  }
+  void maybe_report_failures();
+
   // -- tids --
   // for ops i issue
   tid_t               last_tid;