]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: mkpg simplification and cleanup, much nicer
authorSage Weil <sage@newdream.net>
Sat, 15 Mar 2008 03:27:44 +0000 (20:27 -0700)
committerSage Weil <sage@newdream.net>
Sat, 15 Mar 2008 03:27:44 +0000 (20:27 -0700)
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/PGMap.h
src/mon/PGMonitor.cc
src/mon/PGMonitor.h
src/osd/OSD.cc
src/osd/OSDMap.h
src/start.sh

index 236471dfc1f0c86a4d42c628f0bf84fa527573ad..89ef45dd2eafa73c1f37886c3dacd577920959d7 100644 (file)
@@ -166,11 +166,9 @@ bool OSDMonitor::update_from_paxos()
   }
   mon->store->put_int(osdmap.epoch, "osdmap_full","last_epoch");
 
-  // mkpg flag set?
-  if (osdmap.is_creating_pgs()) {
-    mon->pgmon->register_new_pgs();
-    mon->pgmon->send_pg_creates();
-  }
+  // kick pgmon, in case there are pg creations going on 
+  mon->pgmon->register_new_pgs();
+  mon->pgmon->send_pg_creates();
 
   // new map!
   bcast_latest_mds();
@@ -199,17 +197,6 @@ struct RetryClearMkpg : public Context {
 };
 */
 
-void OSDMonitor::try_clear_mkpg_flag()
-{
-  if (paxos->is_writeable()) {
-    dout(10) << "clear_mkpg_flag" << dendl;
-    pending_inc.mkpg |= OSDMap::Incremental::MKPG_FINISH;
-    //} else if (!mon->is_peon()) {
-    //dout(10) << "clear_mkpg_flag -- waiting for writeable" << dendl;
-    //paxos->wait_for_writeable(new RetryClearMkpg(this));
-  }
-}
-
 void OSDMonitor::encode_pending(bufferlist &bl)
 {
   dout(10) << "encode_pending e " << pending_inc.epoch
@@ -217,15 +204,7 @@ void OSDMonitor::encode_pending(bufferlist &bl)
   
   // finalize up pending_inc
   pending_inc.ctime = g_clock.now();
-  if ((pending_inc.mkpg & OSDMap::Incremental::MKPG_START) == 0 && 
-      (pending_inc.crush.length() ||
-       pending_inc.fullmap.length() ||
-       pending_inc.new_pg_num ||
-       pending_inc.new_localized_pg_num)) {
-    dout(2) << " setting mkpg flag" << dendl;
-    pending_inc.mkpg |= OSDMap::Incremental::MKPG_START;
-  }
-  
+
   // tell me about it
   for (map<int32_t,uint8_t>::iterator i = pending_inc.new_down.begin();
        i != pending_inc.new_down.end();
index 68b1c8d454e9b10c118f4e86a8df8f24fc1824c5..49fcbf767e4a8d7a0c77c639798930630b778b4e 100644 (file)
@@ -127,8 +127,6 @@ private:
 
   void mark_all_down();
 
-  void try_clear_mkpg_flag();
-
   void send_latest(entity_inst_t i, epoch_t start=0);
 
   void fake_osd_failure(int osd, bool down);
index 9645e0934bb519cac37b358f07d2df492e361ae3..6cd85aa79583509946a8fa5d167dcfad409716d6 100644 (file)
@@ -29,7 +29,7 @@ class PGMap {
 public:
   // the map
   version_t version;
-  epoch_t last_mkpg_scan;  // osdmap epoch
+  epoch_t last_pg_scan;  // osdmap epoch
   hash_map<pg_t,pg_stat_t> pg_stat;
   hash_map<int,osd_stat_t> osd_stat;
 
@@ -38,22 +38,22 @@ public:
     version_t version;
     map<pg_t,pg_stat_t> pg_stat_updates;
     map<int,osd_stat_t> osd_stat_updates;
-    epoch_t mkpg_scan;  // osdmap epoch
+    epoch_t pg_scan;  // osdmap epoch
 
     void _encode(bufferlist &bl) {
       ::_encode(version, bl);
       ::_encode(pg_stat_updates, bl);
       ::_encode(osd_stat_updates, bl);
-      ::_encode(mkpg_scan, bl);
+      ::_encode(pg_scan, bl);
     }
     void _decode(bufferlist& bl, int& off) {
       ::_decode(version, bl, off);
       ::_decode(pg_stat_updates, bl, off);
       ::_decode(osd_stat_updates, bl, off);
-      ::_decode(mkpg_scan, bl, off);
+      ::_decode(pg_scan, bl, off);
     }
 
-    Incremental() : version(0), mkpg_scan(0) {}
+    Incremental() : version(0), pg_scan(0) {}
   };
 
   void apply_incremental(Incremental& inc) {
@@ -75,8 +75,8 @@ public:
       osd_stat[p->first] = p->second;
       stat_osd_add(p->second);
     }
-    if (inc.mkpg_scan)
-      last_mkpg_scan = inc.mkpg_scan;
+    if (inc.pg_scan)
+      last_pg_scan = inc.pg_scan;
   }
 
   // aggregate stats (soft state)
@@ -90,7 +90,7 @@ public:
   int64_t total_osd_num_blocks_avail;
   int64_t total_osd_num_objects;
 
-  set<pg_t> creating_pgs;
+  set<pg_t> creating_pgs;   // lru: front = new additions, back = recently pinged
   
   void stat_zero() {
     num_pg = 0;
@@ -135,7 +135,7 @@ public:
   }
 
   PGMap() : version(0),
-           last_mkpg_scan(0),
+           last_pg_scan(0),
            num_pg(0), 
            total_pg_num_bytes(0), 
            total_pg_num_blocks(0), 
index 2f8638e204a9b0076cb4d04bb4b6c5fe6e9f2179..639872f0e06abeb144515233e8f6038d49ac8e38 100644 (file)
@@ -116,14 +116,7 @@ bool PGMonitor::update_from_paxos()
   pg_map._encode(bl);
   mon->store->put_bl_ss(bl, "pgmap", "latest");
 
-  // not creating pgs?
-  if (mon->osdmon->osdmap.is_creating_pgs() &&
-      pg_map.creating_pgs.empty()) {
-    // this may not work if osdmap not currently writeable.. but we'll get it eventually!
-    mon->osdmon->try_clear_mkpg_flag();  
-  } else {
-    send_pg_creates();
-  }
+  send_pg_creates();
 
   return true;
 }
@@ -274,23 +267,25 @@ struct RetryRegisterNewPgs : public Context {
 
 void PGMonitor::register_new_pgs()
 {
-  if (mon->is_peon()) return; // whatever.
+  if (mon->is_peon()) 
+    return; // whatever.
+
   if (!mon->osdmon->paxos->is_readable()) {
     dout(10) << "register_new_pgs -- osdmap not readable, waiting" << dendl;
     mon->osdmon->paxos->wait_for_readable(new RetryRegisterNewPgs(this));
     return;
   }
+
   if (!paxos->is_writeable()) {
     dout(10) << "register_new_pgs -- pgmap not writeable, waiting" << dendl;
     paxos->wait_for_writeable(new RetryRegisterNewPgs(this));
     return;
   }
 
-  dout(10) << "map pg_creating_from = " << mon->osdmon->osdmap.get_creating_pgs_from() 
-          << " vs last_mkpg_scan " << pg_map.last_mkpg_scan
-          << dendl;
-  if (mon->osdmon->osdmap.get_creating_pgs_from() <=
-      pg_map.last_mkpg_scan) {
+  dout(10) << "osdmap last_pg_change " << mon->osdmon->osdmap.get_last_pg_change()
+          << ", pgmap last_pg_scan " << pg_map.last_pg_scan << dendl;
+  if (mon->osdmon->osdmap.get_last_pg_change() <= pg_map.last_pg_scan ||
+      mon->osdmon->osdmap.get_last_pg_change() <= pending_inc.pg_scan) {
     dout(10) << "register_new_pgs -- i've already scanned pg space since last significant osdmap update" << dendl;
     return;
   }
@@ -326,16 +321,18 @@ void PGMonitor::register_new_pgs()
   } 
   dout(10) << "register_new_pgs registered " << created << " new pgs" << dendl;
   if (created) {
-    last_pg_create.clear();  // reset pg_create throttle timer
-    pending_inc.mkpg_scan = epoch;
+    last_sent_pg_create.clear();  // reset pg_create throttle timer
+    pending_inc.pg_scan = epoch;
     propose_pending();
   }
 }
 
-void PGMonitor::send_pg_creates(int onlyosd)
+void PGMonitor::send_pg_creates()
 {
   dout(10) << "send_pg_creates to " << pg_map.creating_pgs.size() << " pgs" << dendl;
+
   map<int, MOSDPGCreate*> msg;
+  utime_t now = g_clock.now();
 
   for (set<pg_t>::iterator p = pg_map.creating_pgs.begin();
        p != pg_map.creating_pgs.end();
@@ -343,29 +340,27 @@ void PGMonitor::send_pg_creates(int onlyosd)
     pg_t pgid = *p;
     vector<int> acting;
     int nrep = mon->osdmon->osdmap.pg_to_acting_osds(pgid, acting);
-    if (!nrep) {
-      dout(20) << "send_pg_creates  " << pgid << " -> no up devices, skipping" << dendl;
+    if (!nrep) 
       continue;  // blarney!
-    }
     int osd = acting[0];
-    if (onlyosd >= 0 && osd != onlyosd) continue;
-    dout(20) << "send_pg_creates  " << pgid << " -> osd" << osd << dendl;
+
+    // throttle?
+    if (last_sent_pg_create.count(osd) &&
+       now - g_conf.mon_pg_create_interval < last_sent_pg_create[osd]) 
+      continue;
+      
+    dout(20) << "send_pg_creates  " << pgid << " -> osd" << osd 
+            << " in epoch " << pg_map.pg_stat[pgid].created << dendl;
     if (msg.count(osd) == 0)
       msg[osd] = new MOSDPGCreate(mon->osdmon->osdmap.get_epoch());
     msg[osd]->mkpg[pgid] = pg_map.pg_stat[pgid].created;
   }
 
-  utime_t now = g_clock.now();
   for (map<int, MOSDPGCreate*>::iterator p = msg.begin();
        p != msg.end();
        p++) {
-    if (now - g_conf.mon_pg_create_interval < last_pg_create[p->first]) {
-      dout(10) << "NOT sending pg_create to osd" << p->first << dendl;
-      continue;  // throttle
-    } else {
-      dout(10) << "sending pg_create to osd" << p->first << dendl;
-      mon->messenger->send_message(p->second, mon->osdmon->osdmap.get_inst(p->first));
-      last_pg_create[p->first] = g_clock.now();
-    }
+    dout(10) << "sending pg_create to osd" << p->first << dendl;
+    mon->messenger->send_message(p->second, mon->osdmon->osdmap.get_inst(p->first));
+    last_sent_pg_create[p->first] = g_clock.now();
   }
 }
index daea34301db4571c978b4366bdb1b026ed2acf02..b3fc0c0395db4c97dcbd6cc898d66451b53f1ecb 100644 (file)
@@ -53,7 +53,7 @@ private:
   void handle_statfs(MStatfs *statfs);
   bool handle_pg_stats(MPGStats *stats);
 
-  map<int,utime_t> last_pg_create;  // per osd throttle
+  map<int,utime_t> last_sent_pg_create;  // per osd throttle
 
  public:
   PGMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p) { }
@@ -62,7 +62,7 @@ private:
 
 
   void register_new_pgs();
-  void send_pg_creates(int onlyosd=-1);
+  void send_pg_creates();
 
 };
 
index 0b35db2640cc6794c8a7944079551695dce7a8a4..5e6a1176e2948b40962bd39ef71b486fada3cab6 100644 (file)
@@ -1761,7 +1761,7 @@ void OSD::handle_pg_create(MOSDPGCreate *m)
       dout(10) << "mkpg " << pgid << " creating now" << dendl;
       try_create_pg(pgid, t);
     } else {
-      dout(10) << "mkpg " << pgid << " querying priors" << dendl;
+      dout(10) << "mkpg " << pgid << " e " << created << " : querying priors" << dendl;
       assert(0);
     }
   }
index cfea74c0011b7f6c021a658f9b9e3be8050e5ddb..f58e6a9adcd3d3672ca41896d5f64a6bd94feece 100644 (file)
@@ -73,6 +73,13 @@ public:
     epoch_t epoch;   // new epoch; we are a diff from epoch-1 to epoch
     utime_t ctime;
 
+    bool is_pg_change() {
+      return (fullmap.length() ||
+             crush.length() ||
+             new_pg_num ||
+             new_localized_pg_num);
+    }
+
     // full (rare)
     bufferlist fullmap;  // in leiu of below.
     bufferlist crush;
@@ -87,10 +94,6 @@ public:
     map<pg_t,uint32_t> new_pg_swap_primary;
     list<pg_t> old_pg_swap_primary;
     
-    static const __u8 MKPG_START = 1;
-    static const __u8 MKPG_FINISH = 2;
-    __u8 mkpg;
-    
     void encode(bufferlist& bl) {
       ::_encode(fsid, bl);
       ::_encode(epoch, bl); 
@@ -105,7 +108,6 @@ public:
       ::_encode(new_offload, bl);
       ::_encode(new_pg_swap_primary, bl);
       ::_encode(old_pg_swap_primary, bl);
-      ::_encode(mkpg, bl);
     }
     void decode(bufferlist& bl, int& off) {
       ::_decode(fsid, bl, off);
@@ -121,10 +123,9 @@ public:
       ::_decode(new_offload, bl, off);
       ::_decode(new_pg_swap_primary, bl, off);
       ::_decode(old_pg_swap_primary, bl, off);
-      ::_decode(mkpg, bl, off);
     }
 
-    Incremental(epoch_t e=0) : epoch(e), new_max_osd(-1), new_pg_num(0), new_localized_pg_num(0), mkpg(false) {
+    Incremental(epoch_t e=0) : epoch(e), new_max_osd(-1), new_pg_num(0), new_localized_pg_num(0) {
       fsid.major = fsid.minor = cpu_to_le64(0);
     }
   };
@@ -139,7 +140,7 @@ private:
   int32_t localized_pg_num_mask; // ditto
 
   // new pgs
-  epoch_t creating_pgs_from;  // most recent epoch initiating possible pg creation
+  epoch_t last_pg_change;  // most recent epoch initiating possible pg creation
 
   int32_t max_osd;
   vector<uint8_t>  osd_state;
@@ -155,7 +156,7 @@ private:
  public:
   OSDMap() : epoch(0), 
             pg_num(0), localized_pg_num(0),
-            creating_pgs_from(0),
+            last_pg_change(0),
             max_osd(0) { 
     fsid.major = fsid.minor = cpu_to_le64(0);
     calc_pg_masks();
@@ -183,11 +184,8 @@ private:
   const utime_t& get_ctime() const { return ctime; }
   const utime_t& get_mtime() const { return mtime; }
 
-  bool is_creating_pgs() const { 
-    return creating_pgs_from;
-  }
-  epoch_t get_creating_pgs_from() const {
-    return creating_pgs_from;
+  epoch_t get_last_pg_change() const {
+    return last_pg_change;
   }
 
   /***** cluster state *****/
@@ -294,14 +292,11 @@ private:
     ctime = inc.ctime;
 
     // full map?
-    if (inc.fullmap.length())
+    if (inc.fullmap.length()) 
       decode(inc.fullmap);
-
-    // mkpg flags?
-    if (inc.mkpg & Incremental::MKPG_FINISH) 
-      creating_pgs_from = 0;
-    if (inc.mkpg & Incremental::MKPG_START) 
-      creating_pgs_from = epoch;
+    
+    if (inc.is_pg_change())
+      last_pg_change = epoch;
     
     if (inc.fullmap.length())
       return;
@@ -363,7 +358,7 @@ private:
     ::_encode(mtime, blist);
     ::_encode(pg_num, blist);
     ::_encode(localized_pg_num, blist);
-    ::_encode(creating_pgs_from, blist);
+    ::_encode(last_pg_change, blist);
     
     ::_encode(max_osd, blist);
     ::_encode(osd_state, blist);
@@ -385,7 +380,7 @@ private:
     ::_decode(localized_pg_num, blist, off);
     calc_pg_masks();
 
-    ::_decode(creating_pgs_from, blist, off);
+    ::_decode(last_pg_change, blist, off);
 
     ::_decode(max_osd, blist, off);
     ::_decode(osd_state, blist, off);
index 4466874441d53ad1d8d25dd7f3d0d8737e253afc..94a6ebe6115a3ee600d9c2300b46bd11a3019c5c 100755 (executable)
@@ -29,7 +29,7 @@ $CEPH_BIN/mkmonfs --clobber mondata/mon0 --mon 0 --monmap .ceph_monmap
 ARGS="-d --bind $IP -o out --debug_ms 1"
 
 # start monitor
-$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 10 --debug_ms 1
+$CEPH_BIN/cmon $ARGS mondata/mon0 --debug_mon 20 --debug_ms 1
 
 # build and inject an initial osd map
 $CEPH_BIN/osdmaptool --clobber --createsimple .ceph_monmap 4 --print .ceph_osdmap