assert(ob->can_close());
// ok!
- objects.erase(ob->get_soid());
+ objects[ob->layout.ol_pgid.pool].erase(ob->get_soid());
delete ob;
}
mark_rx(bh);
// finisher
- C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_soid(), bh->start(), bh->length());
+ C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->layout.ol_pgid.pool,
+ bh->ob->get_soid(), bh->start(), bh->length());
ObjectSet *oset = bh->ob->oset;
onfinish);
}
-void ObjectCacher::bh_read_finish(sobject_t oid, loff_t start, uint64_t length, bufferlist &bl)
+void ObjectCacher::bh_read_finish(int poolid, sobject_t oid, loff_t start, uint64_t length, bufferlist &bl)
{
//lock.Lock();
dout(7) << "bh_read_finish "
bl.push_back(bp);
}
- if (objects.count(oid) == 0) {
+ if (objects[poolid].count(oid) == 0) {
dout(7) << "bh_read_finish no object cache" << dendl;
} else {
- Object *ob = objects[oid];
+ Object *ob = objects[poolid][oid];
// apply to bh's!
loff_t opos = start;
dout(7) << "bh_write " << *bh << dendl;
// finishers
- C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_soid(), bh->start(), bh->length());
- C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_soid(), bh->start(), bh->length());
+ C_WriteAck *onack = new C_WriteAck(this, bh->ob->layout.ol_pgid.pool,
+ bh->ob->get_soid(), bh->start(), bh->length());
+ C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->layout.ol_pgid.pool,
+ bh->ob->get_soid(), bh->start(), bh->length());
ObjectSet *oset = bh->ob->oset;
mark_tx(bh);
}
-void ObjectCacher::lock_ack(list<sobject_t>& oids, tid_t tid)
+void ObjectCacher::lock_ack(int poolid, list<sobject_t>& oids, tid_t tid)
{
for (list<sobject_t>::iterator i = oids.begin();
i != oids.end();
i++) {
sobject_t oid = *i;
- if (objects.count(oid) == 0) {
+ if (objects[poolid].count(oid) == 0) {
dout(7) << "lock_ack no object cache" << dendl;
assert(0);
}
- Object *ob = objects[oid];
+ Object *ob = objects[poolid][oid];
list<Context*> ls;
}
}
-void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
{
//lock.Lock();
<< " tid " << tid
<< " " << start << "~" << length
<< dendl;
- if (objects.count(oid) == 0) {
+ if (objects[poolid].count(oid) == 0) {
dout(7) << "bh_write_ack no object cache" << dendl;
assert(0);
} else {
- Object *ob = objects[oid];
+ Object *ob = objects[poolid][oid];
// apply to bh's!
for (map<loff_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
//lock.Unlock();
}
-void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
{
//lock.Lock();
<< " tid " << tid
<< " " << start << "~" << length
<< dendl;
- if (objects.count(oid) == 0) {
+ if (objects[poolid].count(oid) == 0) {
dout(7) << "bh_write_commit no object cache" << dendl;
//assert(0);
} else {
- Object *ob = objects[oid];
+ Object *ob = objects[poolid][oid];
// update last_commit.
ob->last_commit_tid = tid;
ex_it != extents.end();
ex_it++) {
sobject_t soid(ex_it->oid, snapid);
- assert(objects.count(soid));
- Object *o = objects[soid];
+ assert(objects[oset->poolid].count(soid));
+ Object *o = objects[oset->poolid][soid];
rdunlock(o);
}
}
// make sure we aren't already locking/locked...
sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP);
Object *o = 0;
- if (objects.count(oid))
+ if (objects[oset->poolid].count(oid))
o = get_object(oid, oset, wr->extents.front().layout);
if (!o ||
(o->lock_state != Object::LOCK_WRLOCK &&
ex_it != extents.end();
ex_it++) {
sobject_t soid(ex_it->oid, CEPH_NOSNAP);
- assert(objects.count(soid));
- Object *o = objects[soid];
+ assert(objects[oset->poolid].count(soid));
+ Object *o = objects[oset->poolid][soid];
wrunlock(o);
}
o->lock_state = Object::LOCK_RDLOCKING;
- C_LockAck *ack = new C_LockAck(this, o->get_soid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+ C_LockAck *ack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+ o->get_soid(), 0, 0);
commit->tid =
ack->tid =
op = CEPH_OSD_OP_WRLOCK;
}
- C_LockAck *ack = new C_LockAck(this, o->get_soid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+ C_LockAck *ack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+ o->get_soid(), 0, 0);
commit->tid =
ack->tid =
o->lock_state = Object::LOCK_RDUNLOCKING;
- C_LockAck *lockack = new C_LockAck(this, o->get_soid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+ C_LockAck *lockack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+ o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit);
o->lock_state = Object::LOCK_WRUNLOCKING;
}
- C_LockAck *lockack = new C_LockAck(this, o->get_soid());
- C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+ C_LockAck *lockack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+ C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+ o->get_soid(), 0, 0);
commit->tid =
lockack->tid =
o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit);
dout(10) << "release_all" << dendl;
uint64_t unclean = 0;
- hash_map<sobject_t, Object*>::iterator p = objects.begin();
- while (p != objects.end()) {
- hash_map<sobject_t, Object*>::iterator n = p;
- n++;
-
- Object *ob = p->second;
-
- loff_t o_unclean = release(ob);
- unclean += o_unclean;
-
- if (o_unclean)
- dout(10) << "release_all " << *ob
- << " has " << o_unclean << " bytes left"
- << dendl;
+ vector<hash_map<sobject_t, Object*> >::iterator i = objects.begin();
+ while (i != objects.end()) {
+ hash_map<sobject_t, Object*>::iterator p = i->begin();
+ while (p != i->end()) {
+ hash_map<sobject_t, Object*>::iterator n = p;
+ n++;
+
+ Object *ob = p->second;
+
+ loff_t o_unclean = release(ob);
+ unclean += o_unclean;
+
+ if (o_unclean)
+ dout(10) << "release_all " << *ob
+ << " has " << o_unclean << " bytes left"
+ << dendl;
p = n;
+ }
+ ++i;
}
if (unclean) {
++p) {
ObjectExtent &ex = *p;
sobject_t soid(ex.oid, CEPH_NOSNAP);
- if (objects.count(soid) == 0)
+ if (objects[oset->poolid].count(soid) == 0)
continue;
- Object *ob = objects[soid];
+ Object *ob = objects[oset->poolid][soid];
// purge or truncate?
if (ex.offset == 0) {
dout(10) << "verify_stats" << dendl;
loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0;
-
- for (hash_map<sobject_t, Object*>::const_iterator p = objects.begin();
- p != objects.end();
- p++) {
- Object *ob = p->second;
- for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
- q != ob->data.end();
- q++) {
- BufferHead *bh = q->second;
- switch (bh->get_state()) {
- case BufferHead::STATE_MISSING:
- missing += bh->length();
- break;
- case BufferHead::STATE_CLEAN:
- clean += bh->length();
- break;
- case BufferHead::STATE_DIRTY:
- dirty += bh->length();
- break;
- case BufferHead::STATE_TX:
- tx += bh->length();
- break;
- case BufferHead::STATE_RX:
- rx += bh->length();
- break;
- default:
- assert(0);
+ for (vector<hash_map<sobject_t, Object*> >::const_iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ for (hash_map<sobject_t, Object*>::const_iterator p = i->begin();
+ p != i->end();
+ ++p) {
+ Object *ob = p->second;
+ for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
+ q != ob->data.end();
+ q++) {
+ BufferHead *bh = q->second;
+ switch (bh->get_state()) {
+ case BufferHead::STATE_MISSING:
+ missing += bh->length();
+ break;
+ case BufferHead::STATE_CLEAN:
+ clean += bh->length();
+ break;
+ case BufferHead::STATE_DIRTY:
+ dirty += bh->length();
+ break;
+ case BufferHead::STATE_TX:
+ tx += bh->length();
+ break;
+ case BufferHead::STATE_RX:
+ rx += bh->length();
+ break;
+ default:
+ assert(0);
+ }
}
}
}
// ObjectCacher::Object fields
ObjectCacher *oc;
sobject_t oid;
+ friend class ObjectSet;
+
public:
ObjectSet *oset;
xlist<Object*>::item set_item;
- private:
ceph_object_layout layout;
-
- friend class ObjectSet;
public:
map<loff_t, BufferHead*> data;
inodeno_t ino;
uint64_t truncate_seq, truncate_size;
+ int poolid;
xlist<Object*> objects;
xlist<Object*> uncommitted;
int dirty_tx;
- ObjectSet(void *p, inodeno_t i) : parent(p), ino(i), truncate_seq(0), truncate_size(0), dirty_tx(0) {}
+ ObjectSet(void *p, int _poolid, inodeno_t i) : parent(p), ino(i), truncate_seq(0),
+ truncate_size(0), poolid(_poolid), dirty_tx(0) {}
};
flush_set_callback_t flush_set_callback, commit_set_callback;
void *flush_set_callback_arg;
- hash_map<sobject_t, Object*> objects;
+ vector<hash_map<sobject_t, Object*> > objects; // indexed by pool_id
set<BufferHead*> dirty_bh;
LRU lru_dirty, lru_rest;
// objects
Object *get_object_maybe(sobject_t oid, ceph_object_layout &l) {
// have it?
- if (objects.count(oid))
- return objects[oid];
+ if ((l.ol_pgid.pool < objects.size()) &&
+ (objects[l.ol_pgid.pool].count(oid)))
+ return objects[l.ol_pgid.pool][oid];
return NULL;
}
Object *get_object(sobject_t oid, ObjectSet *oset, ceph_object_layout &l) {
// have it?
- if (objects.count(oid))
- return objects[oid];
+ if (l.ol_pgid.pool < objects.size()) {
+ if (objects[l.ol_pgid.pool].count(oid))
+ return objects[l.ol_pgid.pool][oid];
+ } else {
+ objects.resize(l.ol_pgid.pool+1);
+ }
// create it.
Object *o = new Object(this, oid, oset, l);
- objects[oid] = o;
+ objects[l.ol_pgid.pool][oid] = o;
return o;
}
void close_object(Object *ob);
void wrunlock(Object *o);
public:
- void bh_read_finish(sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
- void bh_write_ack(sobject_t oid, loff_t offset, uint64_t length, tid_t t);
- void bh_write_commit(sobject_t oid, loff_t offset, uint64_t length, tid_t t);
- void lock_ack(list<sobject_t>& oids, tid_t tid);
+ void bh_read_finish(int poolid, sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
+ void bh_write_ack(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
+ void bh_write_commit(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
+ void lock_ack(int poolid, list<sobject_t>& oids, tid_t tid);
class C_ReadFinish : public Context {
ObjectCacher *oc;
+ int poolid;
sobject_t oid;
loff_t start;
uint64_t length;
public:
bufferlist bl;
- C_ReadFinish(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_ReadFinish(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+ oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
void finish(int r) {
- oc->bh_read_finish(oid, start, length, bl);
+ oc->bh_read_finish(poolid, oid, start, length, bl);
}
};
class C_WriteAck : public Context {
ObjectCacher *oc;
+ int poolid;
sobject_t oid;
loff_t start;
uint64_t length;
public:
tid_t tid;
- C_WriteAck(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteAck(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+ oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
void finish(int r) {
- oc->bh_write_ack(oid, start, length, tid);
+ oc->bh_write_ack(poolid, oid, start, length, tid);
}
};
class C_WriteCommit : public Context {
ObjectCacher *oc;
+ int poolid;
sobject_t oid;
loff_t start;
uint64_t length;
public:
tid_t tid;
- C_WriteCommit(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+ C_WriteCommit(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+ oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
void finish(int r) {
- oc->bh_write_commit(oid, start, length, tid);
+ oc->bh_write_commit(poolid, oid, start, length, tid);
}
};
class C_LockAck : public Context {
ObjectCacher *oc;
public:
+ int poolid;
list<sobject_t> oids;
tid_t tid;
- C_LockAck(ObjectCacher *c, sobject_t o) : oc(c) {
+ C_LockAck(ObjectCacher *c, int _poolid, sobject_t o) : oc(c), poolid(_poolid) {
oids.push_back(o);
}
void finish(int r) {
- oc->lock_ack(oids, tid);
+ oc->lock_ack(poolid, oids, tid);
}
};