ObjectCacher::
ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l, flush_set_callback_t flush_callback,
- flush_set_callback_t commit_callback, void *callback_arg) :
+ void *flush_callback_arg) :
cct(cct_), objecter(o), filer(o), lock(l),
- flush_set_callback(flush_callback), commit_set_callback(commit_callback), flush_set_callback_arg(callback_arg),
+ flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
flusher_stop(false), flusher_thread(this),
stat_waiter(0),
stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) {
ldout(cct, 7) << "bh_write " << *bh << dendl;
// finishers
- C_WriteAck *onack = new C_WriteAck(this, bh->ob->oloc.pool,
- bh->ob->get_soid(), bh->start(), bh->length());
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->oloc.pool,
bh->ob->get_soid(), bh->start(), bh->length());
bh->start(), bh->length(),
bh->snapc, bh->bl, bh->last_write, 0,
oset->truncate_size, oset->truncate_seq,
- onack, oncommit);
+ NULL, oncommit);
// set bh last_write_tid
- onack->tid = tid;
oncommit->tid = tid;
bh->ob->last_write_tid = tid;
bh->last_write_tid = tid;
list<Context*> ls;
// waiters?
- if (ob->waitfor_ack.count(tid)) {
- ls.splice(ls.end(), ob->waitfor_ack[tid]);
- ob->waitfor_ack.erase(tid);
+ if (ob->waitfor_commit.count(tid)) {
+ ls.splice(ls.end(), ob->waitfor_commit[tid]);
+ ob->waitfor_commit.erase(tid);
}
assert(tid <= ob->last_write_tid);
assert(0);
}
- ob->last_ack_tid = tid;
+ ob->last_commit_tid = tid;
if (ob->can_close())
close_object(ob);
}
}
-void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
{
//lock.Lock();
- ldout(cct, 7) << "bh_write_ack "
+ ldout(cct, 7) << "bh_write_commit "
<< oid
<< " tid " << tid
<< " " << start << "~" << length
<< dendl;
if (objects[poolid].count(oid) == 0) {
- ldout(cct, 7) << "bh_write_ack no object cache" << dendl;
+ ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
assert(0);
} else {
Object *ob = objects[poolid][oid];
if (bh->start() < start &&
bh->end() > start+(loff_t)length) {
- ldout(cct, 20) << "bh_write_ack skipping " << *bh << dendl;
+ ldout(cct, 20) << "bh_write_commit skipping " << *bh << dendl;
continue;
}
// make sure bh is tx
if (!bh->is_tx()) {
- ldout(cct, 10) << "bh_write_ack skipping non-tx " << *bh << dendl;
+ ldout(cct, 10) << "bh_write_commit skipping non-tx " << *bh << dendl;
continue;
}
// make sure bh tid matches
if (bh->last_write_tid != tid) {
assert(bh->last_write_tid > tid);
- ldout(cct, 10) << "bh_write_ack newer tid on " << *bh << dendl;
+ ldout(cct, 10) << "bh_write_commit newer tid on " << *bh << dendl;
continue;
}
// ok! mark bh clean.
mark_clean(bh);
- ldout(cct, 10) << "bh_write_ack clean " << *bh << dendl;
+ ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
}
- // update object last_ack.
- assert(ob->last_ack_tid < tid);
- ob->last_ack_tid = tid;
-
- // waiters?
- if (ob->waitfor_ack.count(tid)) {
- list<Context*> ls;
- ls.splice(ls.begin(), ob->waitfor_ack[tid]);
- ob->waitfor_ack.erase(tid);
- finish_contexts(cct, ls);
- }
-
- // is the entire object set now clean?
- if (flush_set_callback && ob->oset->dirty_or_tx == 0) {
- flush_set_callback(flush_set_callback_arg, ob->oset);
- }
- }
- //lock.Unlock();
-}
-
-void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
-{
- //lock.Lock();
-
- // update object last_commit
- ldout(cct, 7) << "bh_write_commit "
- << oid
- << " tid " << tid
- << " " << start << "~" << length
- << dendl;
- if (objects[poolid].count(oid) == 0) {
- ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
- //assert(0);
- } else {
- Object *ob = objects[poolid][oid];
-
// update last_commit.
+ assert(ob->last_commit_tid < tid);
ob->last_commit_tid = tid;
// waiters?
}
// is the entire object set now clean and fully committed?
- if (commit_set_callback &&
- ob->last_commit_tid == ob->last_write_tid) {
- ObjectSet *oset = ob->oset;
- if (ob->can_close())
- close_object(ob);
- if (commit_set_callback) {
- if (oset->dirty_or_tx == 0) { // nothing dirty/tx
- commit_set_callback(flush_set_callback_arg, oset);
- }
- }
+ ObjectSet *oset = ob->oset;
+ if (ob->can_close())
+ close_object(ob);
+
+ // is the entire object set now clean?
+ if (flush_set_callback &&
+ oset->dirty_or_tx == 0) { // nothing dirty/tx
+ flush_set_callback(flush_set_callback_arg, oset);
}
}
-
- // lock.Unlock();
+ //lock.Unlock();
}
-
void ObjectCacher::flush(loff_t amount)
{
utime_t cutoff = ceph_clock_now(cct);
<< " on " << *ob
<< dendl;
if (onfinish != NULL)
- ob->waitfor_ack[ob->last_write_tid].push_back(gather.new_sub());
+ ob->waitfor_commit[ob->last_write_tid].push_back(gather.new_sub());
}
}
if (onfinish != NULL)
}
// did we truncate off dirty data?
- if (commit_set_callback &&
+ if (flush_set_callback &&
were_dirty && oset->dirty_or_tx == 0)
- commit_set_callback(flush_set_callback_arg, oset);
+ flush_set_callback(flush_set_callback_arg, oset);
}
map<loff_t, BufferHead*> data;
tid_t last_write_tid; // version of bh (if non-zero)
- tid_t last_ack_tid; // last update acked.
tid_t last_commit_tid; // last update commited.
int dirty_or_tx;
- map< tid_t, list<Context*> > waitfor_ack;
map< tid_t, list<Context*> > waitfor_commit;
list<Context*> waitfor_rd;
list<Context*> waitfor_wr;
Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, object_locator_t& l) :
oc(_oc),
oid(o), oset(os), set_item(this), oloc(l),
- last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
+ last_write_tid(0), last_commit_tid(0),
dirty_or_tx(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {
// add to set
bool can_close() {
return data.empty() && lock_state == LOCK_NONE &&
- waitfor_ack.empty() && waitfor_commit.empty() &&
+ waitfor_commit.empty() &&
waitfor_rd.empty() && waitfor_wr.empty() &&
dirty_or_tx == 0;
}
private:
Mutex& lock;
- flush_set_callback_t flush_set_callback, commit_set_callback;
+ flush_set_callback_t flush_set_callback;
void *flush_set_callback_arg;
vector<hash_map<sobject_t, Object*> > objects; // indexed by pool_id
public:
void bh_read_finish(int poolid, sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
- void bh_write_ack(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
void bh_write_commit(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
void lock_ack(int poolid, list<sobject_t>& oids, tid_t tid);
}
};
- class C_WriteAck : public Context {
- ObjectCacher *oc;
- int poolid;
- sobject_t oid;
- loff_t start;
- uint64_t length;
- public:
- tid_t tid;
- C_WriteAck(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
- oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
- void finish(int r) {
- oc->bh_write_ack(poolid, oid, start, length, tid);
- }
- };
class C_WriteCommit : public Context {
ObjectCacher *oc;
int poolid;
public:
ObjectCacher(CephContext *cct_, Objecter *o, Mutex& l,
flush_set_callback_t flush_callback,
- flush_set_callback_t commit_callback,
- void *callback_arg);
+ void *flush_callback_arg);
~ObjectCacher() {
// we should be empty.
for (vector<hash_map<sobject_t, Object *> >::iterator i = objects.begin();
{
out << "object["
<< ob.get_soid() << " oset " << ob.oset << dec
- << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid;
+ << " wr " << ob.last_write_tid << "/" << ob.last_commit_tid;
switch (ob.lock_state) {
case ObjectCacher::Object::LOCK_WRLOCKING: out << " wrlocking"; break;