From e8e57d9a1d546d2007a9f82ab54e901464956ad3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 18 Aug 2008 16:38:59 -0700 Subject: [PATCH] osd: rough trimmer, non-functional --- src/include/interval_set.h | 2 +- src/osd/OSD.cc | 41 +++++++++++- src/osd/OSD.h | 16 +++++ src/osd/PG.cc | 39 +++++++---- src/osd/PG.h | 7 ++ src/osd/RAID4PG.h | 1 + src/osd/ReplicatedPG.cc | 129 +++++++++++++++++++++++++++++++++++-- src/osd/ReplicatedPG.h | 3 +- src/osd/osd_types.h | 6 +- 9 files changed, 220 insertions(+), 24 deletions(-) diff --git a/src/include/interval_set.h b/src/include/interval_set.h index e02a941b73df6..750571ef6aeae 100644 --- a/src/include/interval_set.h +++ b/src/include/interval_set.h @@ -88,7 +88,7 @@ class interval_set { return m == other.m; } - int size() { + int size() const { return _size; } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c382e7d07a468..0f9af1dfa4168 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -252,7 +252,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, const char *dev) : stat_oprate(5.0), read_latency_calc(g_conf.osd_max_opq<1 ? 1:g_conf.osd_max_opq), qlen_calc(3), - iat_averager(g_conf.osd_flash_crowd_iat_alpha) + iat_averager(g_conf.osd_flash_crowd_iat_alpha), + snap_trimmer_thread(this) { messenger = m; monmap = mm; @@ -1850,6 +1851,8 @@ void OSD::activate_map(ObjectStore::Transaction& t) if (pg->is_active()) { // update started counter pg->info.history.last_epoch_started = osdmap->get_epoch(); + if (!pg->info.removed_snaps.empty()) + pg->queue_snap_trim(); } else if (pg->is_primary() && !pg->is_active()) { // i am (inactive) primary @@ -3108,3 +3111,39 @@ void OSD::wait_for_no_ops() +void OSD::wake_snap_trimmer() +{ + osd_lock.Lock(); + if (!snap_trimmer_thread.is_started()) { + dout(10) << "wake_snap_trimmer - creating thread" << dendl; + snap_trimmer_thread.create(); + } else { + dout(10) << "wake_snap_trimmer - kicking thread" << dendl; + snap_trimmer_cond.Signal(); + } + osd_lock.Unlock(); +} + +void OSD::snap_trimmer() +{ + osd_lock.Lock(); + while (1) { + snap_trimmer_lock.Lock(); + if (pgs_pending_snap_removal.empty()) { + snap_trimmer_lock.Unlock(); + dout(10) << "snap_trimmer - no pgs pending trim, sleeping" << dendl; + snap_trimmer_cond.Wait(osd_lock); + continue; + } + + PG *pg = pgs_pending_snap_removal.front(); + pgs_pending_snap_removal.pop_front(); + snap_trimmer_lock.Unlock(); + osd_lock.Unlock(); + + pg->snap_trimmer(); + + osd_lock.Lock(); + } + osd_lock.Unlock(); +} diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ee2fd37f6828f..67e9e706df89b 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -275,6 +275,7 @@ private: // -- placement groups -- hash_map pg_map; hash_map > waiting_for_pg; + xlist pgs_pending_snap_removal; bool _have_pg(pg_t pgid); PG *_lookup_lock_pg(pg_t pgid); @@ -289,6 +290,21 @@ private: vector& last); void activate_pg(pg_t pgid, epoch_t epoch); + Mutex snap_trimmer_lock; + Cond snap_trimmer_cond; + + void wake_snap_trimmer(); + void snap_trimmer(); // thread entry + + struct SnapTrimmer : public Thread { + OSD *osd; + SnapTrimmer(OSD *o) : osd(o) {} + void *entry() { + osd->snap_trimmer(); + return NULL; + } + } snap_trimmer_thread; + void wake_pg_waiters(pg_t pgid) { if (waiting_for_pg.count(pgid)) { take_waiters(waiting_for_pg[pgid]); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 6874a93b9ae7b..4e7ed79fa0f18 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -962,12 +962,8 @@ void PG::activate(ObjectStore::Transaction& t, assert(info.last_complete >= log.bottom || log.backlog); - // write pg info - bufferlist bl; - ::encode(info, bl); - t.collection_setattr(info.pgid.to_coll(), "info", bl); - - // write log + // write pg info, log + write_info(t); write_log(t); // clean up stray objects @@ -1122,6 +1118,17 @@ void PG::activate(ObjectStore::Transaction& t, osd->take_waiters(waiting_for_active); } +void PG::queue_snap_trim() +{ + state_set(PG_STATE_SNAPTRIMQUEUE); + + osd->snap_trimmer_lock.Lock(); + osd->pgs_pending_snap_removal.push_back(&pending_snap_removal_item); + osd->snap_trimmer_lock.Unlock(); + + osd->wake_snap_trimmer(); // FIXME: we probably want to wait until at least peering completes? +} + struct C_PG_FinishRecovery : public Context { PG *pg; @@ -1143,9 +1150,7 @@ void PG::finish_recovery() finish_sync_event = new C_PG_FinishRecovery(this); ObjectStore::Transaction t; - bufferlist bl; - ::encode(info, bl); - t.collection_setattr(info.pgid.to_coll(), "info", bl); + write_info(t); osd->store->apply_transaction(t, finish_sync_event); } @@ -1156,6 +1161,10 @@ void PG::_finish_recovery(Context *c) finish_sync_event = 0; dout(10) << "_finish_recovery" << dendl; purge_strays(); + + if (!info.removed_snaps.empty()) + queue_snap_trim(); + update_stats(); } unlock(); @@ -1220,6 +1229,14 @@ void PG::clear_stats() } +void PG::write_info(ObjectStore::Transaction& t) +{ + // write pg info + bufferlist infobl; + ::encode(info, infobl); + t.collection_setattr(info.pgid.to_coll(), "info", infobl); +} + void PG::write_log(ObjectStore::Transaction& t) { dout(10) << "write_log" << dendl; @@ -1251,9 +1268,7 @@ void PG::write_log(ObjectStore::Transaction& t) t.collection_setattr(info.pgid.to_coll(), "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom)); t.collection_setattr(info.pgid.to_coll(), "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top)); - bufferlist infobl; - ::encode(info, infobl); - t.collection_setattr(info.pgid.to_coll(), "info", infobl); + write_info(t); dout(10) << "write_log to [" << ondisklog.bottom << "," << ondisklog.top << ")" << dendl; } diff --git a/src/osd/PG.h b/src/osd/PG.h index 9794cc3eb6210..9bb5b19c08c23 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -19,6 +19,7 @@ #include "include/types.h" #include "osd_types.h" #include "include/buffer.h" +#include "include/xlist.h" #include "OSDMap.h" #include "os/ObjectStore.h" @@ -517,6 +518,8 @@ protected: int role; // 0 = primary, 1 = replica, -1=none. int state; // see bit defns above + xlist::item pending_snap_removal_item; + // primary state public: vector acting; @@ -632,6 +635,7 @@ public: info(p), role(0), state(0), + pending_snap_removal_item(this), have_master_log(true), must_notify_mon(false), stat_num_bytes(0), stat_num_blocks(0), @@ -687,6 +691,7 @@ public: bool is_empty() const { return info.last_update == eversion_t(0,0); } // pg on-disk state + void write_info(ObjectStore::Transaction& t); void write_log(ObjectStore::Transaction& t); void append_log(ObjectStore::Transaction &t, const PG::Log::Entry &logentry, @@ -694,6 +699,7 @@ public: void read_log(ObjectStore *store); void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v); + void queue_snap_trim(); bool is_dup(osd_reqid_t rid) { return log.logged_req(rid); @@ -706,6 +712,7 @@ public: virtual void do_op(MOSDOp *op) = 0; virtual void do_sub_op(MOSDSubOp *op) = 0; virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; + virtual bool snap_trimmer() = 0; virtual bool same_for_read_since(epoch_t e) = 0; virtual bool same_for_modify_since(epoch_t e) = 0; diff --git a/src/osd/RAID4PG.h b/src/osd/RAID4PG.h index e8cdef9da0dc9..345d3164a00a7 100644 --- a/src/osd/RAID4PG.h +++ b/src/osd/RAID4PG.h @@ -47,6 +47,7 @@ protected: void cancel_recovery(); bool do_recovery(); + bool snap_trimmer() { return true; } public: RAID4PG(OSD *o, pg_t p) : PG(o,p) { } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index dcad830514b3d..0d3d5753ec200 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -444,6 +444,126 @@ void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r) } +bool ReplicatedPG::snap_trimmer() +{ + lock(); + dout(10) << "snap_trimmer" << dendl; + + state_clear(PG_STATE_SNAPTRIMQUEUE); + state_set(PG_STATE_SNAPTRIMMING); + update_stats(); + + while (info.removed_snaps.size() && + is_active()) { + snapid_t sn = *info.removed_snaps.begin(); + coll_t c = info.pgid.to_snap_coll(sn); + list ls; + osd->store->collection_list(c, ls); + + dout(10) << "snap_trimmer collection " << c << " has " << ls.size() << " items" << dendl; + + ObjectStore::Transaction t; + + for (list::iterator p = ls.begin(); p != ls.end(); p++) { + pobject_t coid = *p; + + bufferlist bl; + osd->store->getattr(info.pgid.to_coll(), coid, "snaps", bl); + bufferlist::iterator blp = bl.begin(); + vector snaps; + ::decode(snaps, blp); + vector newsnaps; + for (unsigned i=0; iosdmap->is_removed_snap(snaps[i])) + newsnaps.push_back(i); + + if (newsnaps.empty()) { + // remove + dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << " ... deleting" << dendl; + t.remove(info.pgid.to_coll(), coid); + t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid); + if (snaps.size() > 1) + t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid); + + // adjust head snapset + pobject_t head = coid; + head.oid.snap = CEPH_NOSNAP; + bufferlist bl; + osd->store->getattr(info.pgid.to_coll(), head, "snapset", bl); + bufferlist::iterator blp = bl.begin(); + SnapSet snapset; + ::decode(snapset, blp); + dout(10) << coid << " old head " << head << " snapset " << snapset << dendl; + + snapid_t last = coid.oid.snap; + vector::iterator p; + for (p = snapset.clones.begin(); p != snapset.clones.end(); p++) + if (*p == last) + break; + if (p == snapset.clones.begin()) { + // newest clone. + snapset.head_diffs.union_of(snapset.clone_diffs[last]); + } else { + // older clone + vector::iterator n = p; + n++; + if (n != snapset.clones.end()) + // not oldest clone. + snapset.clone_diffs[*n].union_of(snapset.clone_diffs[*p]); + } + snapset.clones.erase(p); + snapset.clone_diffs.erase(last); + + dout(10) << coid << " new head " << head << " snapset " << snapset << dendl; + + if (snapset.clones.empty() && !snapset.head_exists) { + dout(10) << coid << " removing head " << head << dendl; + t.remove(info.pgid.to_coll(), head); + } else { + bl.clear(); + ::encode(snapset, bl); + t.setattr(info.pgid.to_coll(), head, "snapset", bl); + } + } else { + // save adjusted snaps for this object + dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << dendl; + bl.clear(); + ::encode(newsnaps, bl); + t.setattr(info.pgid.to_coll(), coid, "snaps", bl); + + if (snaps[0] != newsnaps[0]) { + t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid); + t.collection_add(info.pgid.to_snap_coll(newsnaps[0]), info.pgid.to_coll(), coid); + } + if (snaps.size() > 1 && snaps[snaps.size()-1] != newsnaps[newsnaps.size()-1]) { + t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid); + if (newsnaps.size() > 1) + t.collection_add(info.pgid.to_snap_coll(newsnaps[newsnaps.size()-1]), info.pgid.to_coll(), coid); + } + } + + osd->store->apply_transaction(t); + + // give other threads a chance at this pg + unlock(); + lock(); + } + + info.removed_snaps.erase(sn); + } + + // done + dout(10) << "snap_trimmer done" << dendl; + state_clear(PG_STATE_SNAPTRIMMING); + update_stats(); + + ObjectStore::Transaction t; + write_info(t); + osd->store->apply_transaction(t); + unlock(); + return true; +} + // ======================================================================== // READS @@ -874,10 +994,7 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t assert(at_version > info.last_update); info.last_update = at_version; - // write pg info - bufferlist infobl; - ::encode(info, infobl); - t.collection_setattr(info.pgid.to_coll(), "info", infobl); + write_info(t); // prepare log append append_log(t, logentry, trim_to); @@ -1670,9 +1787,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // apply to disk! - bufferlist bl; - ::encode(info, bl); - t.collection_setattr(info.pgid.to_coll(), "info", bl); + write_info(t); unsigned r = osd->store->apply_transaction(t); assert(r == 0); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index b7f697ec37e0c..c46f96d3c23fb 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -141,7 +141,8 @@ public: void do_op(MOSDOp *op); void do_sub_op(MOSDSubOp *op); void do_sub_op_reply(MOSDSubOpReply *op); - + bool snap_trimmer(); + bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); bool same_for_rep_modify_since(epoch_t e); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 714c62bc67b4d..9fcd69e403e11 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -280,7 +280,8 @@ inline ostream& operator<<(ostream& out, const osd_stat_t& s) { #define PG_STATE_REPLAY 32 // crashed, waiting for replay #define PG_STATE_STRAY 64 // i must notify the primary i exist. #define PG_STATE_SPLITTING 128 // i am splitting -#define PG_STATE_SNAPTRIM 256 // i am trimming snapshot data +#define PG_STATE_SNAPTRIMQUEUE 256 // i am queued for snapshot trimming +#define PG_STATE_SNAPTRIMMING 512 // i am trimming snapshot data static inline std::string pg_state_string(int state) { std::string st; @@ -292,7 +293,8 @@ static inline std::string pg_state_string(int state) { if (state & PG_STATE_REPLAY) st += "replay+"; if (state & PG_STATE_STRAY) st += "stray+"; if (state & PG_STATE_SPLITTING) st += "splitting+"; - if (state & PG_STATE_SNAPTRIM) st += "trimmingsnap+"; + if (state & PG_STATE_SNAPTRIMQUEUE) st += "snaptrimqueue+"; + if (state & PG_STATE_SNAPTRIMMING) st += "snaptrimming+"; if (!st.length()) st = "inactive"; else -- 2.39.5