]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: rough trimmer, non-functional
authorSage Weil <sage@newdream.net>
Mon, 18 Aug 2008 23:38:59 +0000 (16:38 -0700)
committerSage Weil <sage@newdream.net>
Mon, 18 Aug 2008 23:38:59 +0000 (16:38 -0700)
src/include/interval_set.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/RAID4PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index e02a941b73df643c10c2b3c1c97b71260e1f4419..750571ef6aeaee5d58d4452f32d392c727eee932 100644 (file)
@@ -88,7 +88,7 @@ class interval_set {
     return m == other.m;
   }
 
-  int size() {
+  int size() const {
     return _size;
   }
 
index c382e7d07a468c8b055bee32e59e15efc47a3ec3..0f9af1dfa4168670185fc0c967f347afcc988b90 100644 (file)
@@ -252,7 +252,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) :
   stat_oprate(5.0),
   read_latency_calc(g_conf.osd_max_opq<1 ? 1:g_conf.osd_max_opq),
   qlen_calc(3),
-  iat_averager(g_conf.osd_flash_crowd_iat_alpha)
+  iat_averager(g_conf.osd_flash_crowd_iat_alpha),
+  snap_trimmer_thread(this)
 {
   messenger = m;
   monmap = mm;
@@ -1850,6 +1851,8 @@ void OSD::activate_map(ObjectStore::Transaction& t)
     if (pg->is_active()) {
       // update started counter
       pg->info.history.last_epoch_started = osdmap->get_epoch();
+      if (!pg->info.removed_snaps.empty())
+       pg->queue_snap_trim();
     }
     else if (pg->is_primary() && !pg->is_active()) {
       // i am (inactive) primary
@@ -3108,3 +3111,39 @@ void OSD::wait_for_no_ops()
 
 
 
+void OSD::wake_snap_trimmer()
+{
+  osd_lock.Lock();
+  if (!snap_trimmer_thread.is_started()) {
+    dout(10) << "wake_snap_trimmer - creating thread" << dendl;
+    snap_trimmer_thread.create();
+  } else {
+    dout(10) << "wake_snap_trimmer - kicking thread" << dendl;
+    snap_trimmer_cond.Signal();
+  }
+  osd_lock.Unlock();  
+}
+
+void OSD::snap_trimmer()
+{
+  osd_lock.Lock();
+  while (1) {
+    snap_trimmer_lock.Lock();
+    if (pgs_pending_snap_removal.empty()) {
+      snap_trimmer_lock.Unlock();
+      dout(10) << "snap_trimmer - no pgs pending trim, sleeping" << dendl;
+      snap_trimmer_cond.Wait(osd_lock);
+      continue;
+    }
+    
+    PG *pg = pgs_pending_snap_removal.front();
+    pgs_pending_snap_removal.pop_front();
+    snap_trimmer_lock.Unlock();
+    osd_lock.Unlock();
+
+    pg->snap_trimmer();
+
+    osd_lock.Lock();
+  }
+  osd_lock.Unlock();
+}
index ee2fd37f6828f9757ac00d6465aa914f908abd75..67e9e706df89b758a26b832f76c33e76aa475443 100644 (file)
@@ -275,6 +275,7 @@ private:
   // -- placement groups --
   hash_map<pg_t, PG*> pg_map;
   hash_map<pg_t, list<Message*> > waiting_for_pg;
+  xlist<PG*> pgs_pending_snap_removal;
 
   bool  _have_pg(pg_t pgid);
   PG   *_lookup_lock_pg(pg_t pgid);
@@ -289,6 +290,21 @@ private:
                          vector<int>& last);
   void activate_pg(pg_t pgid, epoch_t epoch);
 
+  Mutex snap_trimmer_lock;
+  Cond snap_trimmer_cond;
+
+  void wake_snap_trimmer();
+  void snap_trimmer();       // thread entry
+
+  struct SnapTrimmer : public Thread {
+    OSD *osd;
+    SnapTrimmer(OSD *o) : osd(o) {}
+    void *entry() {
+      osd->snap_trimmer();
+      return NULL;
+    }
+  } snap_trimmer_thread;
+
   void wake_pg_waiters(pg_t pgid) {
     if (waiting_for_pg.count(pgid)) {
       take_waiters(waiting_for_pg[pgid]);
index 6874a93b9ae7b3d0228eb1a7c80469ce688a287a..4e7ed79fa0f18bde1653d6047d476c54f29ed949 100644 (file)
@@ -962,12 +962,8 @@ void PG::activate(ObjectStore::Transaction& t,
 
   assert(info.last_complete >= log.bottom || log.backlog);
 
-  // write pg info
-  bufferlist bl;
-  ::encode(info, bl);
-  t.collection_setattr(info.pgid.to_coll(), "info", bl);
-  
-  // write log
+  // write pg info, log
+  write_info(t);
   write_log(t);
 
   // clean up stray objects
@@ -1122,6 +1118,17 @@ void PG::activate(ObjectStore::Transaction& t,
   osd->take_waiters(waiting_for_active);
 }
 
+void PG::queue_snap_trim()
+{
+  state_set(PG_STATE_SNAPTRIMQUEUE);
+
+  osd->snap_trimmer_lock.Lock();
+  osd->pgs_pending_snap_removal.push_back(&pending_snap_removal_item);
+  osd->snap_trimmer_lock.Unlock();
+
+  osd->wake_snap_trimmer();     // FIXME: we probably want to wait until at least peering completes?
+}
+
 
 struct C_PG_FinishRecovery : public Context {
   PG *pg;
@@ -1143,9 +1150,7 @@ void PG::finish_recovery()
   finish_sync_event = new C_PG_FinishRecovery(this);
 
   ObjectStore::Transaction t;
-  bufferlist bl;
-  ::encode(info, bl);
-  t.collection_setattr(info.pgid.to_coll(), "info", bl);
+  write_info(t);
   osd->store->apply_transaction(t, finish_sync_event);
 }
 
@@ -1156,6 +1161,10 @@ void PG::_finish_recovery(Context *c)
     finish_sync_event = 0;
     dout(10) << "_finish_recovery" << dendl;
     purge_strays();
+
+    if (!info.removed_snaps.empty())
+      queue_snap_trim();
+
     update_stats();
   }
   unlock();
@@ -1220,6 +1229,14 @@ void PG::clear_stats()
 }
 
 
+void PG::write_info(ObjectStore::Transaction& t)
+{
+  // write pg info
+  bufferlist infobl;
+  ::encode(info, infobl);
+  t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+}
+
 void PG::write_log(ObjectStore::Transaction& t)
 {
   dout(10) << "write_log" << dendl;
@@ -1251,9 +1268,7 @@ void PG::write_log(ObjectStore::Transaction& t)
   t.collection_setattr(info.pgid.to_coll(), "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
   t.collection_setattr(info.pgid.to_coll(), "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
   
-  bufferlist infobl;
-  ::encode(info, infobl);
-  t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+  write_info(t);
 
   dout(10) << "write_log to [" << ondisklog.bottom << "," << ondisklog.top << ")" << dendl;
 }
index 9794cc3eb6210055bb29837f11e2bc22640ca3d3..9bb5b19c08c2348878f12168ad9e0ded4c54afd6 100644 (file)
@@ -19,6 +19,7 @@
 #include "include/types.h"
 #include "osd_types.h"
 #include "include/buffer.h"
+#include "include/xlist.h"
 
 #include "OSDMap.h"
 #include "os/ObjectStore.h"
@@ -517,6 +518,8 @@ protected:
   int         role;    // 0 = primary, 1 = replica, -1=none.
   int         state;   // see bit defns above
 
+  xlist<PG*>::item pending_snap_removal_item;
+
   // primary state
  public:
   vector<int> acting;
@@ -632,6 +635,7 @@ public:
     info(p),
     role(0),
     state(0),
+    pending_snap_removal_item(this),
     have_master_log(true),
     must_notify_mon(false),
     stat_num_bytes(0), stat_num_blocks(0),
@@ -687,6 +691,7 @@ public:
   bool  is_empty() const { return info.last_update == eversion_t(0,0); }
 
   // pg on-disk state
+  void write_info(ObjectStore::Transaction& t);
   void write_log(ObjectStore::Transaction& t);
   void append_log(ObjectStore::Transaction &t, 
                   const PG::Log::Entry &logentry, 
@@ -694,6 +699,7 @@ public:
   void read_log(ObjectStore *store);
   void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
 
+  void queue_snap_trim();
 
   bool is_dup(osd_reqid_t rid) {
     return log.logged_req(rid);
@@ -706,6 +712,7 @@ public:
   virtual void do_op(MOSDOp *op) = 0;
   virtual void do_sub_op(MOSDSubOp *op) = 0;
   virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
+  virtual bool snap_trimmer() = 0;
 
   virtual bool same_for_read_since(epoch_t e) = 0;
   virtual bool same_for_modify_since(epoch_t e) = 0;
index e8cdef9da0dc9632eef8954a5261310d9d6d5bc2..345d3164a00a7ac4b3392a10abb7b6ce91507838 100644 (file)
@@ -47,6 +47,7 @@ protected:
   void cancel_recovery();
   bool do_recovery();
 
+  bool snap_trimmer() { return true; }
   
 public:
   RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
index dcad830514b3d66e67cd350cb35f862ef0cd1fdb..0d3d5753ec20037f6460933d12d6fd2f1c3c011d 100644 (file)
@@ -444,6 +444,126 @@ void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
 }
 
 
+bool ReplicatedPG::snap_trimmer()
+{
+  lock();
+  dout(10) << "snap_trimmer" << dendl;
+  
+  state_clear(PG_STATE_SNAPTRIMQUEUE);
+  state_set(PG_STATE_SNAPTRIMMING);
+  update_stats();
+
+  while (info.removed_snaps.size() &&
+        is_active()) {
+    snapid_t sn = *info.removed_snaps.begin();
+    coll_t c = info.pgid.to_snap_coll(sn);
+    list<pobject_t> ls;
+    osd->store->collection_list(c, ls);
+
+    dout(10) << "snap_trimmer collection " << c << " has " << ls.size() << " items" << dendl;
+
+    ObjectStore::Transaction t;
+
+    for (list<pobject_t>::iterator p = ls.begin(); p != ls.end(); p++) {
+      pobject_t coid = *p;
+
+      bufferlist bl;
+      osd->store->getattr(info.pgid.to_coll(), coid, "snaps", bl);
+      bufferlist::iterator blp = bl.begin();
+      vector<snapid_t> snaps;
+      ::decode(snaps, blp);
+      vector<snapid_t> newsnaps;
+      for (unsigned i=0; i<snaps.size(); i++)
+       if (!osd->osdmap->is_removed_snap(snaps[i]))
+         newsnaps.push_back(i);
+
+      if (newsnaps.empty()) {
+       // remove
+       dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << " ... deleting" << dendl;
+       t.remove(info.pgid.to_coll(), coid);
+       t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid);
+       if (snaps.size() > 1)
+         t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid);
+       
+       // adjust head snapset  
+       pobject_t head = coid;
+       head.oid.snap = CEPH_NOSNAP;
+       bufferlist bl;
+       osd->store->getattr(info.pgid.to_coll(), head, "snapset", bl);
+       bufferlist::iterator blp = bl.begin();
+       SnapSet snapset;
+       ::decode(snapset, blp);
+       dout(10) << coid << " old head " << head << " snapset " << snapset << dendl;
+
+       snapid_t last = coid.oid.snap;
+       vector<snapid_t>::iterator p;
+       for (p = snapset.clones.begin(); p != snapset.clones.end(); p++)
+         if (*p == last)
+           break;
+       if (p == snapset.clones.begin()) {
+         // newest clone.
+         snapset.head_diffs.union_of(snapset.clone_diffs[last]);
+       } else  {
+         // older clone
+         vector<snapid_t>::iterator n = p;
+         n++;
+         if (n != snapset.clones.end())
+           // not oldest clone.
+           snapset.clone_diffs[*n].union_of(snapset.clone_diffs[*p]);
+       }
+       snapset.clones.erase(p);
+       snapset.clone_diffs.erase(last);
+
+       dout(10) << coid << " new head " << head << " snapset " << snapset << dendl;
+
+       if (snapset.clones.empty() && !snapset.head_exists) {
+         dout(10) << coid << " removing head " << head << dendl;
+         t.remove(info.pgid.to_coll(), head);
+       } else {
+         bl.clear();
+         ::encode(snapset, bl);
+         t.setattr(info.pgid.to_coll(), head, "snapset", bl);
+       }
+      } else {
+       // save adjusted snaps for this object
+       dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << dendl;
+       bl.clear();
+       ::encode(newsnaps, bl);
+       t.setattr(info.pgid.to_coll(), coid, "snaps", bl);
+
+       if (snaps[0] != newsnaps[0]) {
+         t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid);
+         t.collection_add(info.pgid.to_snap_coll(newsnaps[0]), info.pgid.to_coll(), coid);
+       }
+       if (snaps.size() > 1 && snaps[snaps.size()-1] != newsnaps[newsnaps.size()-1]) {
+         t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid);
+         if (newsnaps.size() > 1)
+           t.collection_add(info.pgid.to_snap_coll(newsnaps[newsnaps.size()-1]), info.pgid.to_coll(), coid);
+       }             
+      }
+      
+      osd->store->apply_transaction(t);
+
+      // give other threads a chance at this pg
+      unlock();
+      lock();
+    }
+    
+    info.removed_snaps.erase(sn);
+  }  
+
+  // done
+  dout(10) << "snap_trimmer done" << dendl;
+  state_clear(PG_STATE_SNAPTRIMMING);
+  update_stats();
+
+  ObjectStore::Transaction t;
+  write_info(t);
+  osd->store->apply_transaction(t);
+  unlock();
+  return true;
+}
+
 
 // ========================================================================
 // READS
@@ -874,10 +994,7 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
   assert(at_version > info.last_update);
   info.last_update = at_version;
   
-  // write pg info
-  bufferlist infobl;
-  ::encode(info, infobl);
-  t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+  write_info(t);
 
   // prepare log append
   append_log(t, logentry, trim_to);
@@ -1670,9 +1787,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   
   
   // apply to disk!
-  bufferlist bl;
-  ::encode(info, bl);
-  t.collection_setattr(info.pgid.to_coll(), "info", bl);
+  write_info(t);
   unsigned r = osd->store->apply_transaction(t);
   assert(r == 0);
 
index b7f697ec37e0c8f2d98847b16ecc0f353f07e6ee..c46f96d3c23fb2b8eafd9eced04d080d03137aff 100644 (file)
@@ -141,7 +141,8 @@ public:
   void do_op(MOSDOp *op);
   void do_sub_op(MOSDSubOp *op);
   void do_sub_op_reply(MOSDSubOpReply *op);
-
+  bool snap_trimmer();
+  
   bool same_for_read_since(epoch_t e);
   bool same_for_modify_since(epoch_t e);
   bool same_for_rep_modify_since(epoch_t e);
index 714c62bc67b4d529ac210e96826aab3e266e4ea2..9fcd69e403e11bfdc5120a41925a41416669f09d 100644 (file)
@@ -280,7 +280,8 @@ inline ostream& operator<<(ostream& out, const osd_stat_t& s) {
 #define PG_STATE_REPLAY     32  // crashed, waiting for replay
 #define PG_STATE_STRAY      64  // i must notify the primary i exist.
 #define PG_STATE_SPLITTING 128  // i am splitting
-#define PG_STATE_SNAPTRIM  256  // i am trimming snapshot data
+#define PG_STATE_SNAPTRIMQUEUE  256  // i am queued for snapshot trimming
+#define PG_STATE_SNAPTRIMMING   512  // i am trimming snapshot data
 
 static inline std::string pg_state_string(int state) {
   std::string st;
@@ -292,7 +293,8 @@ static inline std::string pg_state_string(int state) {
   if (state & PG_STATE_REPLAY) st += "replay+";
   if (state & PG_STATE_STRAY) st += "stray+";
   if (state & PG_STATE_SPLITTING) st += "splitting+";
-  if (state & PG_STATE_SNAPTRIM) st += "trimmingsnap+";
+  if (state & PG_STATE_SNAPTRIMQUEUE) st += "snaptrimqueue+";
+  if (state & PG_STATE_SNAPTRIMMING) st += "snaptrimming+";
   if (!st.length()) 
     st = "inactive";
   else