From 9cbe39b170e96fa5f8c3e92948e25dbf557e9abf Mon Sep 17 00:00:00 2001 From: sage Date: Thu, 11 May 2006 04:36:49 +0000 Subject: [PATCH] more objectcacher changes some osd locking cleanup/tweaking git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@765 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/Client.cc | 2 +- ceph/ebofs/Ebofs.cc | 20 ++-- ceph/ebofs/Ebofs.h | 4 +- ceph/include/types.h | 22 +++- ceph/mds/MDS.h | 1 - ceph/messages/MOSDOp.h | 4 +- ceph/osd/Fake.h | 20 ++-- ceph/osd/OSD.cc | 55 ++++------ ceph/osd/OSD.h | 5 +- ceph/osd/ObjectStore.h | 4 +- ceph/osdc/ObjectCacher.cc | 224 ++++++++++++++++++++++++++++---------- ceph/osdc/ObjectCacher.h | 68 ++++++++---- ceph/osdc/Objecter.cc | 15 ++- ceph/osdc/Objecter.h | 12 +- 14 files changed, 308 insertions(+), 148 deletions(-) diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 78091be392be0..2579a862f3dfe 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -77,7 +77,7 @@ Client::Client(Messenger *m) // osd interfaces osdmap = new OSDMap(); // initially blank.. see mount() objecter = new Objecter(messenger, osdmap); - objectcacher = new ObjectCacher(objecter); + objectcacher = new ObjectCacher(objecter, client_lock); filer = new Filer(objecter, objectcacher); } diff --git a/ceph/ebofs/Ebofs.cc b/ceph/ebofs/Ebofs.cc index 7c55299cf1096..8206c07a3c71d 100644 --- a/ceph/ebofs/Ebofs.cc +++ b/ceph/ebofs/Ebofs.cc @@ -2033,7 +2033,7 @@ void Ebofs::do_csetattrs(map > > &cmods } } -int Ebofs::setattr(object_t oid, const char *name, void *value, size_t size, Context *onsafe) +int Ebofs::setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe) { ebofs_lock.Lock(); dout(8) << "setattr " << hex << oid << dec << " '" << name << "' len " << size << endl; @@ -2073,7 +2073,8 @@ int Ebofs::getattr(object_t oid, const char *name, void *value, size_t size) if (on->attr.count(n) == 0) { r = -1; } else { - memcpy(value, on->attr[n].data, MIN( on->attr[n].len, (int)size )); + r = MIN( on->attr[n].len, (int)size ); + memcpy(value, on->attr[n].data, r ); } put_onode(on); ebofs_lock.Unlock(); @@ -2279,7 +2280,7 @@ int Ebofs::collection_list(coll_t cid, list& ls) } -int Ebofs::collection_setattr(coll_t cid, const char *name, void *value, size_t size) +int Ebofs::collection_setattr(coll_t cid, const char *name, const void *value, size_t size) { ebofs_lock.Lock(); dout(10) << "collection_setattr " << hex << cid << dec << " '" << name << "' len " << size << endl; @@ -2312,12 +2313,17 @@ int Ebofs::collection_getattr(coll_t cid, const char *name, void *value, size_t } string n(name); - if (cn->attr.count(n) == 0) return -1; - memcpy(value, cn->attr[n].data, MIN( cn->attr[n].len, (int)size )); - + int r; + if (cn->attr.count(n) == 0) { + r = -1; + } else { + r = MIN( cn->attr[n].len, (int)size ); + memcpy(value, cn->attr[n].data, r); + } + put_cnode(cn); ebofs_lock.Unlock(); - return 0; + return r; } int Ebofs::collection_rmattr(coll_t cid, const char *name) diff --git a/ceph/ebofs/Ebofs.h b/ceph/ebofs/Ebofs.h index 683cdceb39da9..cacca9b5de32d 100644 --- a/ceph/ebofs/Ebofs.h +++ b/ceph/ebofs/Ebofs.h @@ -267,7 +267,7 @@ class Ebofs : public ObjectStore { bool write_will_block(); // object attr - int setattr(object_t oid, const char *name, void *value, size_t size, + int setattr(object_t oid, const char *name, const void *value, size_t size, Context *onsafe=0); int getattr(object_t oid, const char *name, void *value, size_t size); int rmattr(object_t oid, const char *name, @@ -286,7 +286,7 @@ class Ebofs : public ObjectStore { int collection_remove(coll_t c, object_t o); int collection_list(coll_t c, list& o); - int collection_setattr(object_t oid, const char *name, void *value, size_t size); + int collection_setattr(object_t oid, const char *name, const void *value, size_t size); int collection_getattr(object_t oid, const char *name, void *value, size_t size); int collection_rmattr(coll_t cid, const char *name); int collection_listattr(object_t oid, vector& attrs); diff --git a/ceph/include/types.h b/ceph/include/types.h index 52ef1c4bf503e..7bb3a06a9756e 100644 --- a/ceph/include/types.h +++ b/ceph/include/types.h @@ -216,12 +216,32 @@ struct inode_t { }; + +// lame 128-bit value class. +class lame128_t { +public: + __uint64_t hi, lo; + lame128_t(__uint64_t h=0, __uint64_t l=0) : hi(h), lo(l) {} +}; + +inline ostream& operator<<(ostream& out, lame128_t& oid) { + return out << oid.hi << "." << oid.lo; +} + + // osd types typedef __uint64_t ps_t; // placement seed typedef __uint64_t pg_t; // placement group -typedef __uint64_t object_t; // object id typedef __uint64_t coll_t; // collection id +#ifdef OBJECT128 +typedef lame128_t object_t; +#else +typedef __uint64_t object_t; // object id +#endif + + + #define PG_NONE 0xffffffffffffffffLL diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index 066b74b852f80..0cb67d923c8d7 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -34,7 +34,6 @@ using namespace __gnu_cxx; #include "common/Logger.h" #include "common/Mutex.h" -typedef __uint64_t object_t; diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index 5b0b74c70aafc..3d8541d2bd9f0 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -84,8 +84,8 @@ class MOSDOp : public Message { public: bool hack_blah; - const long get_tid() { return st.tid; } - const msg_addr_t get_asker() { return st.asker; } + const long get_tid() { return st.tid; } + const msg_addr_t& get_asker() { return st.asker; } const object_t get_oid() { return st.oid; } const pg_t get_pg() { return st.pg; } diff --git a/ceph/osd/Fake.h b/ceph/osd/Fake.h index a616538e77a0e..3d20fe7239276 100644 --- a/ceph/osd/Fake.h +++ b/ceph/osd/Fake.h @@ -109,23 +109,24 @@ class FakeStoreAttrs { class FakeAttrSet { public: - map attrs; + map attrs; int getattr(const char *name, void *value, size_t size) { - if (attrs.count(name)) { - size_t l = attrs[name].length(); - if (l > size) l = size; + string n = name; + if (attrs.count(n)) { + size_t l = MIN( attrs[n].length(), size ); bufferlist bl; - bl.append(attrs[name]); + bl.append(attrs[n]); bl.copy(0, l, (char*)value); return l; } return -1; } - int setattr(const char *name, void *value, size_t size) { + int setattr(const char *name, const void *value, size_t size) { + string n = name; bufferptr bp(new buffer((char*)value,size)); - attrs[name] = bp; + attrs[n] = bp; return 0; } @@ -135,7 +136,8 @@ class FakeStoreAttrs { } int rmattr(const char *name) { - attrs.erase(name); + string n = name; + attrs.erase(n); return 0; } @@ -148,7 +150,7 @@ class FakeStoreAttrs { public: int setattr(object_t oid, const char *name, - void *value, size_t size, + const void *value, size_t size, Context *onsafe=0) { faker_lock.Lock(); int r = fakeoattrs[oid].setattr(name, value, size); diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index c716131d424d8..68ede1738010b 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -948,11 +948,13 @@ PG *OSD::create_pg(pg_t pgid) assert(!pg_exists(pgid)); PG *pg = new PG(this, pgid); + pg_map[pgid] = pg; + + store->create_collection(pgid); //pg->info.created = osdmap->get_epoch(); //pg->store(store); - pg_map[pgid] = pg; return pg; } @@ -1730,19 +1732,20 @@ void OSD::wait_for_no_ops() bool OSD::block_if_wrlocked(MOSDOp* op) { object_t oid = op->get_oid(); + msg_addr_t source; - int r; + int len; - r = store->getattr(oid, "wrlock", &source, sizeof(msg_addr_t)); - /* - if ((r >= 0) && (MSG_ADDR_NUM(source) != MSG_ADDR_NUM(op->get_source()))) - { - //the object is locked for writing by another client -- add the op to the waiting queue - waiting_for_wr_unlock[oid].push_back(op); + len = store->getattr(oid, "wrlock", &source, sizeof(msg_addr_t)); + cout << "getattr returns " << len << " on " << hex << oid << dec << endl; + + if (len == sizeof(source) && + source != op->get_asker()) { + //the object is locked for writing by someone else -- add the op to the waiting queue + waiting_for_wr_unlock[oid].push_back(op); + return true; + } - return true; - } - */ return false; //the object wasn't locked, so the operation can be handled right away } @@ -2040,29 +2043,19 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit) { object_t oid = op->get_oid(); pg_t pgid = op->get_pg(); - int optype = op->get_op(); int r; - if ((optype == OSD_OP_WRITE) || - (optype == OSD_OP_REP_WRITE) || - (optype == OSD_OP_DELETE) || - (optype == OSD_OP_REP_DELETE) || - (optype == OSD_OP_TRUNCATE) || - (optype == OSD_OP_REP_TRUNCATE) || - (optype == OSD_OP_WRLOCK) || - (optype == OSD_OP_REP_WRLOCK)) - { - //if the target object is locked for writing by another client, put 'op' to the waiting queue - if (block_if_wrlocked(op)) { - return; //op will be handled later, after the object becomes unlocked - } - } + // if the target object is locked for writing by another client, put 'op' to the waiting queue + // for _any_ op type -- eg only the locker can unlock! + if (block_if_wrlocked(op)) + return; // op will be handled later, after the object becomes unlocked + // everybody will want to update the version. map > setattrs; setattrs["version"] = pair(&version, sizeof(version)); - // and pg also gets version update. + // and pg also gets matching version update. map > > cmods; cmods[pgid] = setattrs; @@ -2072,7 +2065,7 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit) case OSD_OP_WRLOCK: case OSD_OP_REP_WRLOCK: // lock object - r = store->setattr(oid, "wrlock", &op->get_source(), sizeof(msg_addr_t), oncommit); + r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit); break; case OSD_OP_WRUNLOCK: @@ -2082,9 +2075,9 @@ void OSD::op_apply(MOSDOp *op, version_t version, Context* oncommit) //unblock all operations that were waiting for this object to become unlocked if (waiting_for_wr_unlock.count(oid)) { - take_waiters(waiting_for_wr_unlock[oid]); - waiting_for_wr_unlock.erase(oid); - } + take_waiters(waiting_for_wr_unlock[oid]); + waiting_for_wr_unlock.erase(oid); + } break; case OSD_OP_WRITE: diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index d4242f36bc938..b95eb43b38eee 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -109,7 +109,7 @@ public: finished.splice(finished.end(), ls); } - //object locking + // object locking hash_map > waiting_for_wr_unlock; /** list of operations for each object waiting for 'wrunlock' */ bool block_if_wrlocked(MOSDOp* op); @@ -155,7 +155,6 @@ public: void activate_map(list& ls); - // -- replication -- // PG @@ -167,7 +166,7 @@ public: void close_pg(pg_t pg); // close in-memory state void remove_pg(pg_t pg); // remove state from store - __uint64_t last_tid; + tid_t last_tid; hash_map > waiting_for_pg; diff --git a/ceph/osd/ObjectStore.h b/ceph/osd/ObjectStore.h index 7ee8b8dd25c60..bff84b1e8bbca 100644 --- a/ceph/osd/ObjectStore.h +++ b/ceph/osd/ObjectStore.h @@ -129,7 +129,7 @@ class ObjectStore { } virtual int setattr(object_t oid, const char *name, - void *value, size_t size, + const void *value, size_t size, Context *onsafe=0) {return 0;} //= 0; virtual int getattr(object_t oid, const char *name, void *value, size_t size) {return 0;} //= 0; @@ -150,7 +150,7 @@ class ObjectStore { virtual int collection_list(coll_t c, list& o) {return 0;}//= 0; virtual int collection_setattr(coll_t cid, const char *name, - void *value, size_t size) {return 0;} //= 0; + const void *value, size_t size) {return 0;} //= 0; virtual int collection_getattr(coll_t cid, const char *name, void *value, size_t size) {return 0;} //= 0; virtual int collection_listattr(coll_t cid, char *attrs, size_t size) {return 0;} //= 0; diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc index e8ad74e150f08..5343b021ae36c 100644 --- a/ceph/osdc/ObjectCacher.cc +++ b/ceph/osdc/ObjectCacher.cc @@ -2,6 +2,10 @@ #include "ObjectCacher.h" #include "Objecter.h" +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_client) cout << "ocacher." << pthread_self() << " " + + /*** ObjectCacher::BufferHead ***/ @@ -13,7 +17,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off) // split off right ObjectCacher::BufferHead *right = new BufferHead(); - right->set_version(bh->get_version()); + right->last_write_tid = bh->last_write_tid; right->set_state(bh->get_state()); off_t newleftlen = off - bh->start(); @@ -43,7 +47,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off) p--; while (p != bh->waitfor_read.begin()) { if (p->first < right->start()) break; - dout(0) << "split moving waiters at block " << p->first << " to right bh" << endl; + dout(0) << "split moving waiters at byte " << p->first << " to right bh" << endl; right->waitfor_read[p->first].swap( p->second ); o = p; p--; @@ -68,8 +72,9 @@ void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right) // data left->bl.claim_append(right->bl); - // version - left->set_version( MAX( left->get_version(), right->get_version() ) ); + // version + // note: this is sorta busted, but shouldn't be used, cuz we're pbly about to write.. right? + left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid ); // waiters for (map >::iterator p = right->waitfor_read.begin(); @@ -178,12 +183,10 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd, * map a range of extents on an object's buffer cache. * - combine any bh's we're writing into one * - break up bufferheads that don't fall completely within the range - * - increase the bh version number (to be larger than any it subsumes) */ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr) { BufferHead *final = 0; - version_t max_version = 0; for (list::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); @@ -191,7 +194,8 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr if (ex_it->oid != oid) continue; - dout(10) << "map_write " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl; + dout(10) << "map_write oex " << hex << ex_it->oid << dec + << " " << ex_it->start << "~" << ex_it->length << endl; map::iterator p = data.lower_bound(ex_it->start); // p->first >= start @@ -226,10 +230,6 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr dout(10) << "p is " << *p->second << endl; - // note highest version we see - if (max_version < p->second->get_version()) - max_version = p->second->get_version(); - if (p->first <= cur) { BufferHead *bh = p->second; dout(10) << "map_write bh " << *bh << " intersected" << endl; @@ -290,10 +290,9 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr // set versoin assert(final); - final->set_version(max_version+1); dout(10) << "map_write final is " << *final << endl; - return 0; + return final; } /*** ObjectCacher ***/ @@ -305,22 +304,16 @@ void ObjectCacher::bh_read(Object *ob, BufferHead *bh) dout(7) << "bh_read on " << *bh << endl; // finisher - C_ReadFinish *fin = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length()); + C_ReadFinish *onfinish = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length()); - // read req - Objecter::OSDRead *rd = new Objecter::OSDRead(&fin->bl); - - // object extent - ObjectExtent ex(ob->get_oid(), bh->start(), bh->length()); - ex.buffer_extents[0] = bh->length(); - rd->extents.push_back(ex); - // go - objecter->readx(rd, fin); + objecter->read(ob->get_oid(), bh->start(), bh->length(), &onfinish->bl, + onfinish); } void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl) { + lock.Lock(); dout(7) << "bh_read_finish " << hex << oid << dec << " " << start << "~" << length @@ -328,50 +321,166 @@ void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, buff if (objects.count(oid) == 0) { dout(7) << "bh_read_finish no object cache" << endl; - return; + } else { + Object *ob = objects[oid]; + + // apply to bh's! + off_t opos = start; + map::iterator p = ob->data.lower_bound(opos); + + while (p != ob->data.end() && + opos < start+length) { + BufferHead *bh = p->second; + + if (bh->start() > opos) { + dout(1) << "weirdness: gap when applying read results, " + << opos << "~" << bh->start() - opos + << endl; + opos = bh->start(); + p++; + continue; + } + + if (!bh->is_rx()) { + dout(10) << "bh_read_finish skipping non-rx " << *bh << endl; + continue; + } + + assert(bh->start() == opos); // we don't merge rx bh's... yet! + assert(bh->length() < start+length-opos); + + bh->bl.substr_of(bl, + start+length-opos, + bh->length()); + mark_clean(bh); + dout(10) << "bh_read_finish read " << *bh << endl; + + // finishers? + // called with lock held. + list ls; + for (map >::iterator p = bh->waitfor_read.begin(); + p != bh->waitfor_read.end(); + p++) + ls.splice(ls.end(), p->second); + bh->waitfor_read.clear(); + finish_contexts(ls); + } } - Object *ob = objects[oid]; + lock.Unlock(); +} + - // apply to bh's! - off_t opos = start; - map::iterator p = ob->data.lower_bound(opos); +void ObjectCacher::bh_write(Object *ob, BufferHead *bh) +{ + dout(7) << "bh_write " << *bh << endl; - while (p != ob->data.end() && - opos < start+length) { - BufferHead *bh = p->second; - - if (bh->start() > opos) { - dout(1) << "weirdness: gap when applying read results, " - << opos << "~" << bh->start() - opos - << endl; - opos = bh->start(); - p++; - continue; - } + // finishers + C_WriteAck *onack = new C_WriteAck(this, ob->get_oid(), bh->start(), bh->length()); + C_WriteCommit *oncommit = new C_WriteCommit(this, ob->get_oid(), bh->start(), bh->length()); - if (!bh->is_rx()) { - dout(10) << "bh_read_finish skipping non-rx " << *bh << endl; - continue; - } + // go + tid_t tid = objecter->write(ob->get_oid(), bh->start(), bh->length(), bh->bl, + onack, oncommit); - assert(bh->start() == opos); // we don't merge rx bh's... yet! - assert(bh->length() < start+length-opos); + // set bh last_write_tid + onack->tid = tid; + oncommit->tid = tid; + bh->last_write_tid = tid; +} - bh->bl.substr_of(bl, - start+length-opos, - bh->length()); - bh->set_version(1); - mark_clean(bh); - dout(10) << "bh_read_finish read " << *bh << endl; +void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t tid) +{ + lock.Lock(); + + dout(7) << "bh_write_ack " + << hex << oid << dec + << " tid " << tid + << " " << start << "~" << length + << endl; + if (objects.count(oid) == 0) { + dout(7) << "bh_write_ack no object cache" << endl; + assert(0); + } else { + Object *ob = objects[oid]; + + // apply to bh's! + off_t opos = start; + map::iterator p = ob->data.lower_bound(opos); + + while (p != ob->data.end() && + opos < start+length) { + BufferHead *bh = p->second; + + if (bh->start() < start && + bh->end() > start+length) { + dout(20) << "bh_write_ack skipping " << *bh << endl; + continue; + } + + // make sure bh is tx + if (!bh->is_tx()) { + dout(10) << "bh_write_ack skipping non-tx " << *bh << endl; + continue; + } + + // make sure bh tid matches + if (bh->last_write_tid != tid) { + assert(bh->last_write_tid > tid); + dout(10) << "bh_write_ack newer tid on " << *bh << endl; + continue; + } + + // ok! mark bh clean. + mark_clean(bh); + } + + // update object last_ack. + assert(ob->last_ack_tid < tid); + ob->last_ack_tid = tid; + + // waiters? + if (ob->waitfor_ack.count(tid)) { + list ls; + ls.splice(ls.begin(), ob->waitfor_ack[tid]); + ob->waitfor_ack.erase(tid); + finish_contexts(ls); + } } + lock.Unlock(); } - -void ObjectCacher::bh_write(Object *ob, BufferHead *bh) +void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid_t tid) { - assert(0); + lock.Lock(); + + // update object last_commit + dout(7) << "bh_write_commit " + << hex << oid << dec + << " tid " << tid + << " " << start << "~" << length + << endl; + if (objects.count(oid) == 0) { + dout(7) << "bh_write_commit no object cache" << endl; + assert(0); + } else { + Object *ob = objects[oid]; + + // update last_commit. + ob->last_commit_tid = tid; + + // waiters? + if (ob->waitfor_commit.count(tid)) { + list ls; + ls.splice(ls.begin(), ob->waitfor_commit[tid]); + ob->waitfor_commit.erase(tid); + finish_contexts(ls); + } + } + + lock.Unlock(); } + /* public */ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) @@ -522,11 +631,12 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) f_it != ex_it->buffer_extents.end(); f_it++) { size_t bhoff = bh->start() - opos; - assert(f_it->second < bh->length() - bhoff); + assert(f_it->second <= bh->length() - bhoff); - bufferlist frag; + bufferlist frag; frag.substr_of(wr->bl, f_it->first, f_it->second); + bh->bl.claim_append(frag); opos += f_it->second; } diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index a7a2117b6eee7..62b942f0dc52d 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -32,21 +32,20 @@ class ObjectCacher { struct { off_t start, length; // bh extent in object } ex; - - version_t version; // version of cached object (if non-zero) - + public: bufferlist bl; + tid_t last_write_tid; // version of bh (if non-zero) - map > waitfor_read; + map< off_t, list > waitfor_read; public: // cons BufferHead() : state(STATE_MISSING), ref(0), - version(0) {} - + last_write_tid(0) {} + // extent off_t start() { return ex.start; } void set_start(off_t s) { ex.start = s; } @@ -54,11 +53,7 @@ class ObjectCacher { void set_length(off_t l) { ex.length = l; } off_t end() { return ex.start + ex.length; } off_t last() { return end() - 1; } - - // version - version_t get_version() { return version; } - void set_version(version_t v) { version = v; } - + // states void set_state(int s) { if (s == STATE_RX || s == STATE_TX) get(); @@ -99,6 +94,12 @@ class ObjectCacher { public: map data; + tid_t last_ack_tid; // last update acked. + tid_t last_commit_tid; // last update commited. + + map< tid_t, list > waitfor_ack; + map< tid_t, list > waitfor_commit; + // lock static const int LOCK_NONE = 0; static const int LOCK_WRLOCK = 1; @@ -109,6 +110,7 @@ class ObjectCacher { Object(ObjectCacher *_oc, object_t o, inodeno_t i) : oc(_oc), oid(o), ino(i), + last_ack_tid(0), last_commit_tid(0), lock_state(LOCK_NONE) {} @@ -147,8 +149,6 @@ class ObjectCacher { bool is_empty() { return data.empty(); } // mid-level - int scan_versions(off_t start, off_t len, - version_t& low, version_t& high); BufferHead *split(BufferHead *bh, off_t off); void merge(BufferHead *left, BufferHead *right); @@ -157,14 +157,15 @@ class ObjectCacher { map& missing, map& rx); BufferHead *map_write(Objecter::OSDWrite *wr); - + }; - + // ******* ObjectCacher ********* // ObjectCacher fields private: Objecter *objecter; - + Mutex& lock; + hash_map objects; hash_map > objects_by_ino; @@ -253,7 +254,8 @@ class ObjectCacher { public: void bh_read_finish(object_t oid, off_t offset, size_t length, bufferlist &bl); - void bh_write_finish(object_t oid, off_t offset, size_t length, version_t v); + void bh_write_ack(object_t oid, off_t offset, size_t length, tid_t t); + void bh_write_commit(object_t oid, off_t offset, size_t length, tid_t t); class C_ReadFinish : public Context { ObjectCacher *oc; @@ -268,11 +270,36 @@ class ObjectCacher { } }; + class C_WriteAck : public Context { + ObjectCacher *oc; + object_t oid; + off_t start; + size_t length; + public: + tid_t tid; + C_WriteAck(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {} + void finish(int r) { + oc->bh_write_ack(oid, start, length, tid); + } + }; + class C_WriteCommit : public Context { + ObjectCacher *oc; + object_t oid; + off_t start; + size_t length; + public: + tid_t tid; + C_WriteCommit(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {} + void finish(int r) { + oc->bh_write_commit(oid, start, length, tid); + } + }; + public: - ObjectCacher(Objecter *o) : - objecter(o), + ObjectCacher(Objecter *o, Mutex& l) : + objecter(o), lock(l), stat_waiter(0), stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) {} @@ -304,6 +331,7 @@ class ObjectCacher { void flush_set(inodeno_t ino, Context *onfinish=0); void flush_all(Context *onfinish=0); + // help.. need to figure out how to handle this wrt BufferHeads, etc.. void commit_set(inodeno_t ino, Context *oncommit=0); void commit_all(Context *oncommit=0); }; @@ -313,7 +341,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) { out << "bh[" << bh.start() << "~" << bh.end() - << " v " << bh.get_version(); + << " v " << bh.last_write_tid; if (bh.is_tx()) out << " tx"; if (bh.is_rx()) out << " rx"; if (bh.is_dirty()) out << " dirty"; diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index af8e7131f13d6..c9e3d868c3fe8 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -88,13 +88,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // read ----------------------------------- -int Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, +tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, Context *onfinish) { OSDRead *rd = new OSDRead(bl); rd->extents.push_back(ObjectExtent(oid, off, len)); rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); - return readx(rd, onfinish); + readx(rd, onfinish); + return last_tid; } int Objecter::readx(OSDRead *rd, Context *onfinish) @@ -273,13 +274,14 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) // write ------------------------------------ -int Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, +tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, Context *onack, Context *oncommit) { OSDWrite *wr = new OSDWrite(bl); wr->extents.push_back(ObjectExtent(oid, off, len)); wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); - return writex(wr, onack, oncommit); + writex(wr, onack, oncommit); + return last_tid; } int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit) @@ -403,13 +405,14 @@ void Objecter::handle_osd_write_reply(MOSDOpReply *m) // zero --------------------------------------------------- -int Objecter::zero(object_t oid, off_t off, size_t len, +tid_t Objecter::zero(object_t oid, off_t off, size_t len, Context *onack, Context *oncommit) { OSDZero *z = new OSDZero; z->extents.push_back(ObjectExtent(oid, off, len)); z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); - return zerox(z, onack, oncommit); + zerox(z, onack, oncommit); + return last_tid; } int Objecter::zerox(OSDZero *z, Context *onack, Context *oncommit) diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h index b524599da5b3e..e56c1d79caa2e 100644 --- a/ceph/osdc/Objecter.h +++ b/ceph/osdc/Objecter.h @@ -123,12 +123,12 @@ class Objecter { int writex(OSDWrite *write, Context *onack, Context *oncommit); int zerox(OSDZero *zero, Context *onack, Context *oncommit); - int read(object_t oid, off_t off, size_t len, bufferlist *bl, - Context *onfinish); - int write(object_t oid, off_t off, size_t len, bufferlist &bl, - Context *onack, Context *oncommit); - int zero(object_t oid, off_t off, size_t len, - Context *onack, Context *oncommit); + tid_t read(object_t oid, off_t off, size_t len, bufferlist *bl, + Context *onfinish); + tid_t write(object_t oid, off_t off, size_t len, bufferlist &bl, + Context *onack, Context *oncommit); + tid_t zero(object_t oid, off_t off, size_t len, + Context *onack, Context *oncommit); }; #endif -- 2.39.5