From 3a4d19369fcebf046eae2f9bd851f85f604f6eb4 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 26 Aug 2011 09:47:03 -0700 Subject: [PATCH] objectcacher: only want for commit There was some old, weird stuff going on here where we would wait for the ACK and COMMIT separately. This is just wrong. Writeback does not complete until the data is committed on disk. Simplify by waiting only for commit, removing all the 'ack' code, and going back to a single callback (flush_set). I didn't notice this for 05063867e2a54176ffc9bbc73391f52766ab403f; both of these cleanups are needed to fix this. Signed-off-by: Sage Weil --- src/client/Client.cc | 1 - src/osdc/ObjectCacher.cc | 96 +++++++++++----------------------------- src/osdc/ObjectCacher.h | 28 +++--------- 3 files changed, 32 insertions(+), 93 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index bb9cafc6cbace..ffdb0c7b87dfd 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -162,7 +162,6 @@ Client::Client(Messenger *m, MonClient *mc) objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer); objecter->set_client_incarnation(0); // client always 0, for now. objectcacher = new ObjectCacher(cct, objecter, client_lock, - 0, // all ack callback client_flush_set_callback, // all commit callback (void*)this); filer = new Filer(objecter); diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index a23cf97ea6fac..ac2a60a6a8e80 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -18,9 +18,9 @@ ObjectCacher:: ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l, flush_set_callback_t flush_callback, - flush_set_callback_t commit_callback, void *callback_arg) : + void *flush_callback_arg) : cct(cct_), objecter(o), filer(o), lock(l), - flush_set_callback(flush_callback), commit_set_callback(commit_callback), flush_set_callback_arg(callback_arg), + flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg), flusher_stop(false), flusher_thread(this), stat_waiter(0), stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) { @@ -539,8 +539,6 @@ void ObjectCacher::bh_write(BufferHead *bh) ldout(cct, 7) << "bh_write " << *bh << dendl; // finishers - C_WriteAck *onack = new C_WriteAck(this, bh->ob->oloc.pool, - bh->ob->get_soid(), bh->start(), bh->length()); C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool, bh->ob->get_soid(), bh->start(), bh->length()); @@ -551,10 +549,9 @@ void ObjectCacher::bh_write(BufferHead *bh) bh->start(), bh->length(), bh->snapc, bh->bl, bh->last_write, 0, oset->truncate_size, oset->truncate_seq, - onack, oncommit); + NULL, oncommit); // set bh last_write_tid - onack->tid = tid; oncommit->tid = tid; bh->ob->last_write_tid = tid; bh->last_write_tid = tid; @@ -579,9 +576,9 @@ void ObjectCacher::lock_ack(int poolid, list& oids, tid_t tid) list ls; // waiters? - if (ob->waitfor_ack.count(tid)) { - ls.splice(ls.end(), ob->waitfor_ack[tid]); - ob->waitfor_ack.erase(tid); + if (ob->waitfor_commit.count(tid)) { + ls.splice(ls.end(), ob->waitfor_commit[tid]); + ob->waitfor_commit.erase(tid); } assert(tid <= ob->last_write_tid); @@ -610,7 +607,7 @@ void ObjectCacher::lock_ack(int poolid, list& oids, tid_t tid) assert(0); } - ob->last_ack_tid = tid; + ob->last_commit_tid = tid; if (ob->can_close()) close_object(ob); @@ -624,17 +621,17 @@ void ObjectCacher::lock_ack(int poolid, list& oids, tid_t tid) } } -void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid) +void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid) { //lock.Lock(); - ldout(cct, 7) << "bh_write_ack " + ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid << " " << start << "~" << length << dendl; if (objects[poolid].count(oid) == 0) { - ldout(cct, 7) << "bh_write_ack no object cache" << dendl; + ldout(cct, 7) << "bh_write_commit no object cache" << dendl; assert(0); } else { Object *ob = objects[poolid][oid]; @@ -649,65 +646,30 @@ void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_ if (bh->start() < start && bh->end() > start+(loff_t)length) { - ldout(cct, 20) << "bh_write_ack skipping " << *bh << dendl; + ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl; continue; } // make sure bh is tx if (!bh->is_tx()) { - ldout(cct, 10) << "bh_write_ack skipping non-tx " << *bh << dendl; + ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl; continue; } // make sure bh tid matches if (bh->last_write_tid != tid) { assert(bh->last_write_tid > tid); - ldout(cct, 10) << "bh_write_ack newer tid on " << *bh << dendl; + ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl; continue; } // ok! mark bh clean. mark_clean(bh); - ldout(cct, 10) << "bh_write_ack clean " << *bh << dendl; + ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl; } - // update object last_ack. - assert(ob->last_ack_tid < tid); - ob->last_ack_tid = tid; - - // waiters? - if (ob->waitfor_ack.count(tid)) { - list ls; - ls.splice(ls.begin(), ob->waitfor_ack[tid]); - ob->waitfor_ack.erase(tid); - finish_contexts(cct, ls); - } - - // is the entire object set now clean? - if (flush_set_callback && ob->oset->dirty_or_tx == 0) { - flush_set_callback(flush_set_callback_arg, ob->oset); - } - } - //lock.Unlock(); -} - -void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid) -{ - //lock.Lock(); - - // update object last_commit - ldout(cct, 7) << "bh_write_commit " - << oid - << " tid " << tid - << " " << start << "~" << length - << dendl; - if (objects[poolid].count(oid) == 0) { - ldout(cct, 7) << "bh_write_commit no object cache" << dendl; - //assert(0); - } else { - Object *ob = objects[poolid][oid]; - // update last_commit. + assert(ob->last_commit_tid < tid); ob->last_commit_tid = tid; // waiters? @@ -719,23 +681,19 @@ void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint } // is the entire object set now clean and fully committed? - if (commit_set_callback && - ob->last_commit_tid == ob->last_write_tid) { - ObjectSet *oset = ob->oset; - if (ob->can_close()) - close_object(ob); - if (commit_set_callback) { - if (oset->dirty_or_tx == 0) { // nothing dirty/tx - commit_set_callback(flush_set_callback_arg, oset); - } - } + ObjectSet *oset = ob->oset; + if (ob->can_close()) + close_object(ob); + + // is the entire object set now clean? + if (flush_set_callback && + oset->dirty_or_tx == 0) { // nothing dirty/tx + flush_set_callback(flush_set_callback_arg, oset); } } - - // lock.Unlock(); + //lock.Unlock(); } - void ObjectCacher::flush(loff_t amount) { utime_t cutoff = ceph_clock_now(cct); @@ -1494,7 +1452,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) << " on " << *ob << dendl; if (onfinish != NULL) - ob->waitfor_ack[ob->last_write_tid].push_back(gather.new_sub()); + ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub()); } } if (onfinish != NULL) @@ -1712,9 +1670,9 @@ void ObjectCacher::truncate_set(ObjectSet *oset, vector& exls) } // did we truncate off dirty data? - if (commit_set_callback && + if (flush_set_callback && were_dirty && oset->dirty_or_tx == 0) - commit_set_callback(flush_set_callback_arg, oset); + flush_set_callback(flush_set_callback_arg, oset); } diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 73fd7abd35804..f19ca63ae946a 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -145,12 +145,10 @@ class ObjectCacher { map data; tid_t last_write_tid; // version of bh (if non-zero) - tid_t last_ack_tid; // last update acked. tid_t last_commit_tid; // last update commited. int dirty_or_tx; - map< tid_t, list > waitfor_ack; map< tid_t, list > waitfor_commit; list waitfor_rd; list waitfor_wr; @@ -176,7 +174,7 @@ class ObjectCacher { Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, object_locator_t& l) : oc(_oc), oid(o), oset(os), set_item(this), oloc(l), - last_write_tid(0), last_ack_tid(0), last_commit_tid(0), + last_write_tid(0), last_commit_tid(0), dirty_or_tx(0), lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) { // add to set @@ -198,7 +196,7 @@ class ObjectCacher { bool can_close() { return data.empty() && lock_state == LOCK_NONE && - waitfor_ack.empty() && waitfor_commit.empty() && + waitfor_commit.empty() && waitfor_rd.empty() && waitfor_wr.empty() && dirty_or_tx == 0; } @@ -275,7 +273,7 @@ class ObjectCacher { private: Mutex& lock; - flush_set_callback_t flush_set_callback, commit_set_callback; + flush_set_callback_t flush_set_callback; void *flush_set_callback_arg; vector > objects; // indexed by pool_id @@ -466,7 +464,6 @@ class ObjectCacher { public: void bh_read_finish(int poolid, sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl); - void bh_write_ack(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t); void bh_write_commit(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t); void lock_ack(int poolid, list& oids, tid_t tid); @@ -485,20 +482,6 @@ class ObjectCacher { } }; - class C_WriteAck : public Context { - ObjectCacher *oc; - int poolid; - sobject_t oid; - loff_t start; - uint64_t length; - public: - tid_t tid; - C_WriteAck(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) : - oc(c), poolid(_poolid), oid(o), start(s), length(l) {} - void finish(int r) { - oc->bh_write_ack(poolid, oid, start, length, tid); - } - }; class C_WriteCommit : public Context { ObjectCacher *oc; int poolid; @@ -533,8 +516,7 @@ class ObjectCacher { public: ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l, flush_set_callback_t flush_callback, - flush_set_callback_t commit_callback, - void *callback_arg); + void *flush_callback_arg); ~ObjectCacher() { // we should be empty. for (vector >::iterator i = objects.begin(); @@ -693,7 +675,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob) { out << "object[" << ob.get_soid() << " oset " << ob.oset << dec - << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid; + << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid; switch (ob.lock_state) { case ObjectCacher::Object::LOCK_WRLOCKING: out << " wrlocking"; break; -- 2.39.5