]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: apply osdmap updates to pg_map osd_stat
authorSage Weil <sage@newdream.net>
Wed, 9 Apr 2008 17:43:02 +0000 (10:43 -0700)
committerSage Weil <sage@newdream.net>
Wed, 9 Apr 2008 18:14:43 +0000 (11:14 -0700)
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/PGMap.h
src/mon/PGMonitor.cc
src/mon/PGMonitor.h

index b0003ee0630295f9330bea8d7455c60e4b409fc4..e7cc31ba7cb619276ad78982f77b0b09aa3ed232 100644 (file)
@@ -170,9 +170,8 @@ bool OSDMonitor::update_from_paxos()
   }
   mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch");
 
-  // kick pgmon, in case there are pg creations going on 
-  mon->pgmon->register_new_pgs();
-  mon->pgmon->send_pg_creates();
+  // kick pgmon, make sure it's seen the latest map
+  mon->pgmon->check_osd_map(osdmap.epoch);
 
   // new map!
   bcast_latest_mds();
index 69793ec497a1edbd5ee39e1a7ffddabc8b21a62c..8ed40db7e15162da478df92342237abb2c3edf7f 100644 (file)
@@ -62,9 +62,6 @@ private:
   bool should_propose(double &delay);
 
   // ...
-  bool get_map_bl(epoch_t epoch, bufferlist &bl);
-  bool get_inc_map_bl(epoch_t epoch, bufferlist &bl);
-  
   void send_to_waiting();     // send current map to waiters.
   void send_full(entity_inst_t dest);
   void send_incremental(entity_inst_t dest, epoch_t since);
index dbf4a51d947b1dea11b8f445fccebedefe362778..3b08f00b155fd0d4c6ad951eb9217c147d6d40c3 100644 (file)
@@ -27,6 +27,7 @@ class PGMap {
 public:
   // the map
   version_t version;
+  epoch_t last_osdmap_epoch;   // last osdmap epoch i applied to the pgmap
   epoch_t last_pg_scan;  // osdmap epoch
   hash_map<pg_t,pg_stat_t> pg_stat;
   hash_map<int,osd_stat_t> osd_stat;
@@ -36,22 +37,28 @@ public:
     version_t version;
     map<pg_t,pg_stat_t> pg_stat_updates;
     map<int,osd_stat_t> osd_stat_updates;
+    set<int> osd_stat_rm;
+    epoch_t osdmap_epoch;
     epoch_t pg_scan;  // osdmap epoch
 
     void _encode(bufferlist &bl) {
       ::_encode(version, bl);
       ::_encode(pg_stat_updates, bl);
       ::_encode(osd_stat_updates, bl);
+      ::_encode(osd_stat_rm, bl);
+      ::_encode(osdmap_epoch, bl);
       ::_encode(pg_scan, bl);
     }
     void _decode(bufferlist& bl, int& off) {
       ::_decode(version, bl, off);
       ::_decode(pg_stat_updates, bl, off);
       ::_decode(osd_stat_updates, bl, off);
+      ::_decode(osd_stat_rm, bl, off);
+      ::_decode(osdmap_epoch, bl, off);
       ::_decode(pg_scan, bl, off);
     }
 
-    Incremental() : version(0), pg_scan(0) {}
+    Incremental() : version(0), osdmap_epoch(0), pg_scan(0) {}
   };
 
   void apply_incremental(Incremental& inc) {
@@ -73,6 +80,15 @@ public:
       osd_stat[p->first] = p->second;
       stat_osd_add(p->second);
     }
+    for (set<int>::iterator p = inc.osd_stat_rm.begin();
+        p != inc.osd_stat_rm.end();
+        p++) 
+      if (osd_stat.count(*p)) {
+       stat_osd_sub(osd_stat[*p]);
+       osd_stat.erase(*p);
+      }
+    if (inc.osdmap_epoch)
+      last_osdmap_epoch = inc.osdmap_epoch;
     if (inc.pg_scan)
       last_pg_scan = inc.pg_scan;
   }
@@ -138,7 +154,7 @@ public:
   uint64_t total_used_kb() { return total_kb() - total_avail_kb(); }
 
   PGMap() : version(0),
-           last_pg_scan(0),
+           last_osdmap_epoch(0), last_pg_scan(0),
            num_pg(0), 
            total_pg_num_bytes(0), 
            total_pg_num_blocks(0), 
@@ -152,11 +168,15 @@ public:
     ::_encode(version, bl);
     ::_encode(pg_stat, bl);
     ::_encode(osd_stat, bl);
+    ::_encode(last_osdmap_epoch, bl);
+    ::_encode(last_pg_scan, bl);
   }
   void _decode(bufferlist& bl, int& off) {
     ::_decode(version, bl, off);
     ::_decode(pg_stat, bl, off);
     ::_decode(osd_stat, bl, off);
+    ::_decode(last_osdmap_epoch, bl, off);
+    ::_decode(last_pg_scan, bl, off);
     stat_zero();
     for (hash_map<pg_t,pg_stat_t>::iterator p = pg_stat.begin();
         p != pg_stat.end();
index a31c8eade78dde668fe099c5eb3bc8db85a3e89e..421701cca1b7a2c58e2890fc3b73226c09982e19 100644 (file)
@@ -299,37 +299,87 @@ bool PGMonitor::prepare_pg_stats(MPGStats *stats)
 
 // ------------------------
 
-struct RetryRegisterNewPgs : public Context {
+struct RetryCheckOSDMap : public Context {
   PGMonitor *pgmon;
-  RetryRegisterNewPgs(PGMonitor *p) : pgmon(p) {}
+  epoch_t epoch;
+  RetryCheckOSDMap(PGMonitor *p, epoch_t e) : pgmon(p), epoch(e) {}
   void finish(int r) {
-    pgmon->register_new_pgs();
+    pgmon->check_osd_map(epoch);
   }
 };
 
-void PGMonitor::register_new_pgs()
+void PGMonitor::check_osd_map(epoch_t epoch)
 {
   if (mon->is_peon()) 
     return; // whatever.
 
+  if (pg_map.last_osdmap_epoch >= epoch) {
+    dout(10) << "check_osd_map already seen " << pg_map.last_osdmap_epoch << " >= " << epoch << dendl;
+    return;
+  }
+
   if (!mon->osdmon->paxos->is_readable()) {
     dout(10) << "register_new_pgs -- osdmap not readable, waiting" << dendl;
-    mon->osdmon->paxos->wait_for_readable(new RetryRegisterNewPgs(this));
+    mon->osdmon->paxos->wait_for_readable(new RetryCheckOSDMap(this, epoch));
     return;
   }
 
   if (!paxos->is_writeable()) {
     dout(10) << "register_new_pgs -- pgmap not writeable, waiting" << dendl;
-    paxos->wait_for_writeable(new RetryRegisterNewPgs(this));
+    paxos->wait_for_writeable(new RetryCheckOSDMap(this, epoch));
     return;
   }
 
+  // apply latest map(s)
+  for (epoch_t e = pg_map.last_osdmap_epoch+1;
+       e <= epoch;
+       e++) {
+    dout(10) << "check_osd_map applying osdmap e" << e << " to pg_map" << dendl;
+    bufferlist bl;
+    mon->store->get_bl_sn(bl, "osdmap", e);
+    assert(bl.length());
+    OSDMap::Incremental inc;
+    int off = 0;
+    inc.decode(bl, off);
+    for (map<int32_t,uint32_t>::iterator p = inc.new_offload.begin();
+        p != inc.new_offload.end();
+        p++)
+      if (p->second == 0x10000) {
+       dout(10) << "check_osd_map  osd" << p->first << " went OUT" << dendl;
+       pending_inc.osd_stat_rm.insert(p->first);
+      } else {
+       dout(10) << "check_osd_map  osd" << p->first << " is IN" << dendl;
+       pending_inc.osd_stat_rm.erase(p->first);
+       pending_inc.osd_stat_updates[p->first]; 
+      }
+  }
+
+  bool propose = false;
+  if (pg_map.last_osdmap_epoch < epoch) {
+    pending_inc.osdmap_epoch = epoch;
+    propose = true;
+  }
+
+  // scan pg space?
+  if (register_new_pgs())
+    propose = true;
+  
+  if (propose)
+    propose_pending();
+
+  send_pg_creates();
+}
+
+bool PGMonitor::register_new_pgs()
+{
+
+
   dout(10) << "osdmap last_pg_change " << mon->osdmon->osdmap.get_last_pg_change()
           << ", pgmap last_pg_scan " << pg_map.last_pg_scan << dendl;
   if (mon->osdmon->osdmap.get_last_pg_change() <= pg_map.last_pg_scan ||
       mon->osdmon->osdmap.get_last_pg_change() <= pending_inc.pg_scan) {
     dout(10) << "register_new_pgs -- i've already scanned pg space since last significant osdmap update" << dendl;
-    return;
+    return false;
   }
 
   // iterate over crush mapspace
@@ -398,8 +448,9 @@ void PGMonitor::register_new_pgs()
   if (created) {
     last_sent_pg_create.clear();  // reset pg_create throttle timer
     pending_inc.pg_scan = epoch;
-    propose_pending();
+    return true;
   }
+  return false;
 }
 
 void PGMonitor::send_pg_creates()
index 2affc2a1a6beead80b898e0ef96f4fa79b2caff7..9f871985d5e6c4b7a32c30eca66aa96deff6354e 100644 (file)
@@ -60,14 +60,16 @@ private:
 
   map<int,utime_t> last_sent_pg_create;  // per osd throttle
 
+  bool register_new_pgs();
+  void send_pg_creates();
+
  public:
   PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
   
   void tick();  // check state, take actions
 
 
-  void register_new_pgs();
-  void send_pg_creates();
+  void check_osd_map(epoch_t epoch);