return m == other.m;
}
- int size() {
+ int size() const {
return _size;
}
stat_oprate(5.0),
read_latency_calc(g_conf.osd_max_opq<1 ? 1:g_conf.osd_max_opq),
qlen_calc(3),
- iat_averager(g_conf.osd_flash_crowd_iat_alpha)
+ iat_averager(g_conf.osd_flash_crowd_iat_alpha),
+ snap_trimmer_thread(this)
{
messenger = m;
monmap = mm;
if (pg->is_active()) {
// update started counter
pg->info.history.last_epoch_started = osdmap->get_epoch();
+ if (!pg->info.removed_snaps.empty())
+ pg->queue_snap_trim();
}
else if (pg->is_primary() && !pg->is_active()) {
// i am (inactive) primary
+void OSD::wake_snap_trimmer()
+{
+ osd_lock.Lock();
+ if (!snap_trimmer_thread.is_started()) {
+ dout(10) << "wake_snap_trimmer - creating thread" << dendl;
+ snap_trimmer_thread.create();
+ } else {
+ dout(10) << "wake_snap_trimmer - kicking thread" << dendl;
+ snap_trimmer_cond.Signal();
+ }
+ osd_lock.Unlock();
+}
+
+void OSD::snap_trimmer()
+{
+ osd_lock.Lock();
+ while (1) {
+ snap_trimmer_lock.Lock();
+ if (pgs_pending_snap_removal.empty()) {
+ snap_trimmer_lock.Unlock();
+ dout(10) << "snap_trimmer - no pgs pending trim, sleeping" << dendl;
+ snap_trimmer_cond.Wait(osd_lock);
+ continue;
+ }
+
+ PG *pg = pgs_pending_snap_removal.front();
+ pgs_pending_snap_removal.pop_front();
+ snap_trimmer_lock.Unlock();
+ osd_lock.Unlock();
+
+ pg->snap_trimmer();
+
+ osd_lock.Lock();
+ }
+ osd_lock.Unlock();
+}
// -- placement groups --
hash_map<pg_t, PG*> pg_map;
hash_map<pg_t, list<Message*> > waiting_for_pg;
+ xlist<PG*> pgs_pending_snap_removal;
bool _have_pg(pg_t pgid);
PG *_lookup_lock_pg(pg_t pgid);
vector<int>& last);
void activate_pg(pg_t pgid, epoch_t epoch);
+ Mutex snap_trimmer_lock;
+ Cond snap_trimmer_cond;
+
+ void wake_snap_trimmer();
+ void snap_trimmer(); // thread entry
+
+ struct SnapTrimmer : public Thread {
+ OSD *osd;
+ SnapTrimmer(OSD *o) : osd(o) {}
+ void *entry() {
+ osd->snap_trimmer();
+ return NULL;
+ }
+ } snap_trimmer_thread;
+
void wake_pg_waiters(pg_t pgid) {
if (waiting_for_pg.count(pgid)) {
take_waiters(waiting_for_pg[pgid]);
assert(info.last_complete >= log.bottom || log.backlog);
- // write pg info
- bufferlist bl;
- ::encode(info, bl);
- t.collection_setattr(info.pgid.to_coll(), "info", bl);
-
- // write log
+ // write pg info, log
+ write_info(t);
write_log(t);
// clean up stray objects
osd->take_waiters(waiting_for_active);
}
+void PG::queue_snap_trim()
+{
+ state_set(PG_STATE_SNAPTRIMQUEUE);
+
+ osd->snap_trimmer_lock.Lock();
+ osd->pgs_pending_snap_removal.push_back(&pending_snap_removal_item);
+ osd->snap_trimmer_lock.Unlock();
+
+ osd->wake_snap_trimmer(); // FIXME: we probably want to wait until at least peering completes?
+}
+
struct C_PG_FinishRecovery : public Context {
PG *pg;
finish_sync_event = new C_PG_FinishRecovery(this);
ObjectStore::Transaction t;
- bufferlist bl;
- ::encode(info, bl);
- t.collection_setattr(info.pgid.to_coll(), "info", bl);
+ write_info(t);
osd->store->apply_transaction(t, finish_sync_event);
}
finish_sync_event = 0;
dout(10) << "_finish_recovery" << dendl;
purge_strays();
+
+ if (!info.removed_snaps.empty())
+ queue_snap_trim();
+
update_stats();
}
unlock();
}
+void PG::write_info(ObjectStore::Transaction& t)
+{
+ // write pg info
+ bufferlist infobl;
+ ::encode(info, infobl);
+ t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+}
+
void PG::write_log(ObjectStore::Transaction& t)
{
dout(10) << "write_log" << dendl;
t.collection_setattr(info.pgid.to_coll(), "ondisklog_bottom", &ondisklog.bottom, sizeof(ondisklog.bottom));
t.collection_setattr(info.pgid.to_coll(), "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
- bufferlist infobl;
- ::encode(info, infobl);
- t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+ write_info(t);
dout(10) << "write_log to [" << ondisklog.bottom << "," << ondisklog.top << ")" << dendl;
}
#include "include/types.h"
#include "osd_types.h"
#include "include/buffer.h"
+#include "include/xlist.h"
#include "OSDMap.h"
#include "os/ObjectStore.h"
int role; // 0 = primary, 1 = replica, -1=none.
int state; // see bit defns above
+ xlist<PG*>::item pending_snap_removal_item;
+
// primary state
public:
vector<int> acting;
info(p),
role(0),
state(0),
+ pending_snap_removal_item(this),
have_master_log(true),
must_notify_mon(false),
stat_num_bytes(0), stat_num_blocks(0),
bool is_empty() const { return info.last_update == eversion_t(0,0); }
// pg on-disk state
+ void write_info(ObjectStore::Transaction& t);
void write_log(ObjectStore::Transaction& t);
void append_log(ObjectStore::Transaction &t,
const PG::Log::Entry &logentry,
void read_log(ObjectStore *store);
void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
+ void queue_snap_trim();
bool is_dup(osd_reqid_t rid) {
return log.logged_req(rid);
virtual void do_op(MOSDOp *op) = 0;
virtual void do_sub_op(MOSDSubOp *op) = 0;
virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0;
+ virtual bool snap_trimmer() = 0;
virtual bool same_for_read_since(epoch_t e) = 0;
virtual bool same_for_modify_since(epoch_t e) = 0;
void cancel_recovery();
bool do_recovery();
+ bool snap_trimmer() { return true; }
public:
RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
}
+bool ReplicatedPG::snap_trimmer()
+{
+ lock();
+ dout(10) << "snap_trimmer" << dendl;
+
+ state_clear(PG_STATE_SNAPTRIMQUEUE);
+ state_set(PG_STATE_SNAPTRIMMING);
+ update_stats();
+
+ while (info.removed_snaps.size() &&
+ is_active()) {
+ snapid_t sn = *info.removed_snaps.begin();
+ coll_t c = info.pgid.to_snap_coll(sn);
+ list<pobject_t> ls;
+ osd->store->collection_list(c, ls);
+
+ dout(10) << "snap_trimmer collection " << c << " has " << ls.size() << " items" << dendl;
+
+ ObjectStore::Transaction t;
+
+ for (list<pobject_t>::iterator p = ls.begin(); p != ls.end(); p++) {
+ pobject_t coid = *p;
+
+ bufferlist bl;
+ osd->store->getattr(info.pgid.to_coll(), coid, "snaps", bl);
+ bufferlist::iterator blp = bl.begin();
+ vector<snapid_t> snaps;
+ ::decode(snaps, blp);
+ vector<snapid_t> newsnaps;
+ for (unsigned i=0; i<snaps.size(); i++)
+ if (!osd->osdmap->is_removed_snap(snaps[i]))
+ newsnaps.push_back(i);
+
+ if (newsnaps.empty()) {
+ // remove
+ dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << " ... deleting" << dendl;
+ t.remove(info.pgid.to_coll(), coid);
+ t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid);
+ if (snaps.size() > 1)
+ t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid);
+
+ // adjust head snapset
+ pobject_t head = coid;
+ head.oid.snap = CEPH_NOSNAP;
+ bufferlist bl;
+ osd->store->getattr(info.pgid.to_coll(), head, "snapset", bl);
+ bufferlist::iterator blp = bl.begin();
+ SnapSet snapset;
+ ::decode(snapset, blp);
+ dout(10) << coid << " old head " << head << " snapset " << snapset << dendl;
+
+ snapid_t last = coid.oid.snap;
+ vector<snapid_t>::iterator p;
+ for (p = snapset.clones.begin(); p != snapset.clones.end(); p++)
+ if (*p == last)
+ break;
+ if (p == snapset.clones.begin()) {
+ // newest clone.
+ snapset.head_diffs.union_of(snapset.clone_diffs[last]);
+ } else {
+ // older clone
+ vector<snapid_t>::iterator n = p;
+ n++;
+ if (n != snapset.clones.end())
+ // not oldest clone.
+ snapset.clone_diffs[*n].union_of(snapset.clone_diffs[*p]);
+ }
+ snapset.clones.erase(p);
+ snapset.clone_diffs.erase(last);
+
+ dout(10) << coid << " new head " << head << " snapset " << snapset << dendl;
+
+ if (snapset.clones.empty() && !snapset.head_exists) {
+ dout(10) << coid << " removing head " << head << dendl;
+ t.remove(info.pgid.to_coll(), head);
+ } else {
+ bl.clear();
+ ::encode(snapset, bl);
+ t.setattr(info.pgid.to_coll(), head, "snapset", bl);
+ }
+ } else {
+ // save adjusted snaps for this object
+ dout(10) << coid << " snaps " << snaps << " -> " << newsnaps << dendl;
+ bl.clear();
+ ::encode(newsnaps, bl);
+ t.setattr(info.pgid.to_coll(), coid, "snaps", bl);
+
+ if (snaps[0] != newsnaps[0]) {
+ t.collection_remove(info.pgid.to_snap_coll(snaps[0]), coid);
+ t.collection_add(info.pgid.to_snap_coll(newsnaps[0]), info.pgid.to_coll(), coid);
+ }
+ if (snaps.size() > 1 && snaps[snaps.size()-1] != newsnaps[newsnaps.size()-1]) {
+ t.collection_remove(info.pgid.to_snap_coll(snaps[snaps.size()-1]), coid);
+ if (newsnaps.size() > 1)
+ t.collection_add(info.pgid.to_snap_coll(newsnaps[newsnaps.size()-1]), info.pgid.to_coll(), coid);
+ }
+ }
+
+ osd->store->apply_transaction(t);
+
+ // give other threads a chance at this pg
+ unlock();
+ lock();
+ }
+
+ info.removed_snaps.erase(sn);
+ }
+
+ // done
+ dout(10) << "snap_trimmer done" << dendl;
+ state_clear(PG_STATE_SNAPTRIMMING);
+ update_stats();
+
+ ObjectStore::Transaction t;
+ write_info(t);
+ osd->store->apply_transaction(t);
+ unlock();
+ return true;
+}
+
// ========================================================================
// READS
assert(at_version > info.last_update);
info.last_update = at_version;
- // write pg info
- bufferlist infobl;
- ::encode(info, infobl);
- t.collection_setattr(info.pgid.to_coll(), "info", infobl);
+ write_info(t);
// prepare log append
append_log(t, logentry, trim_to);
// apply to disk!
- bufferlist bl;
- ::encode(info, bl);
- t.collection_setattr(info.pgid.to_coll(), "info", bl);
+ write_info(t);
unsigned r = osd->store->apply_transaction(t);
assert(r == 0);
void do_op(MOSDOp *op);
void do_sub_op(MOSDSubOp *op);
void do_sub_op_reply(MOSDSubOpReply *op);
-
+ bool snap_trimmer();
+
bool same_for_read_since(epoch_t e);
bool same_for_modify_since(epoch_t e);
bool same_for_rep_modify_since(epoch_t e);
#define PG_STATE_REPLAY 32 // crashed, waiting for replay
#define PG_STATE_STRAY 64 // i must notify the primary i exist.
#define PG_STATE_SPLITTING 128 // i am splitting
-#define PG_STATE_SNAPTRIM 256 // i am trimming snapshot data
+#define PG_STATE_SNAPTRIMQUEUE 256 // i am queued for snapshot trimming
+#define PG_STATE_SNAPTRIMMING 512 // i am trimming snapshot data
static inline std::string pg_state_string(int state) {
std::string st;
if (state & PG_STATE_REPLAY) st += "replay+";
if (state & PG_STATE_STRAY) st += "stray+";
if (state & PG_STATE_SPLITTING) st += "splitting+";
- if (state & PG_STATE_SNAPTRIM) st += "trimmingsnap+";
+ if (state & PG_STATE_SNAPTRIMQUEUE) st += "snaptrimqueue+";
+ if (state & PG_STATE_SNAPTRIMMING) st += "snaptrimming+";
if (!st.length())
st = "inactive";
else