From 7dd1a5f2071356a8bdb137d81a0353df9786240f Mon Sep 17 00:00:00 2001 From: sage Date: Mon, 15 May 2006 20:45:52 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@767 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/Client.cc | 42 +++++++++------ ceph/osdc/Filer.h | 57 +------------------- ceph/osdc/ObjectCacher.cc | 110 +++++++++++++++++++++++++++++++++----- ceph/osdc/ObjectCacher.h | 77 +++++++++++++++++++++++--- ceph/osdc/Objecter.h | 3 +- 5 files changed, 194 insertions(+), 95 deletions(-) diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index c943b468988c9..7399c419d995b 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -32,6 +32,7 @@ #include "osd/Filer.h" #include "osd/Objecter.h" +#include "osd/ObjectCacher.h" #include "common/Cond.h" #include "common/Mutex.h" @@ -78,7 +79,7 @@ Client::Client(Messenger *m) osdmap = new OSDMap(); // initially blank.. see mount() objecter = new Objecter(messenger, osdmap); objectcacher = new ObjectCacher(objecter, client_lock); - filer = new Filer(objecter, objectcacher); + filer = new Filer(objecter); } @@ -1704,24 +1705,31 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) // adjust fd pos f->pos = offset+size; - if (!g_conf.client_oc) { - // buffer cache OFF - Cond cond; - bufferlist blist; // data will go here - - bool done = false; - C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue); - filer->read(in->inode, size, offset, &blist, onfinish); - while (!done) - cond.Wait(client_lock); + Cond cond; + bufferlist blist; // data will go here + bool done = false; + C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue); - // copy data into caller's buf - blist.copy(0, blist.length(), buf); + int r = 0; + if (g_conf.client_oc) { + // object cache ON + r = objectcacher->file_read(in->inode, size, offset, &blist, onfinish); + } else { + // object cache OFF -- legacy inconsistent way. + r = filer->read(in->inode, size, offset, &blist, onfinish); } - else { - // buffer cache ON + if (r == 0) { + // wait! + while (!done) + cond.Wait(client_lock); + } else { + // had it cached, apparently! + assert(r > 0); + delete onfinish; } + // copy data into caller's buf + blist.copy(0, blist.length(), buf); // done! client_lock.Unlock(); @@ -1803,10 +1811,10 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) if (in->file_caps() & CAP_FILE_WRBUFFER) { // caps buffered write? // async, caching, non-blocking. - filer->caching_write(in->inode, size, offset, blist); + objectcacher->file_write(in->inode, size, offset, blist); } else { // atomic, synchronous, blocking. - filer->atomic_sync_write(in->inode, size, offset, blist, client_lock); + objectcacher->file_atomic_sync_write(in->inode, size, offset, blist, client_lock); } } else { // legacy, inconsistent synchronous write. diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index eded1d140b463..d4f8702115f37 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -40,22 +40,19 @@ using namespace __gnu_cxx; #include "OSDMap.h" #include "Objecter.h" -#include "ObjectCacher.h" class Context; class Messenger; class OSDMap; -class ObjectCacher; /**** Filer interface ***/ class Filer { Objecter *objecter; - ObjectCacher *oc; public: - Filer(Objecter *o, ObjectCacher *c=0) : objecter(o), oc(c) {} + Filer(Objecter *o) : objecter(o) {} ~Filer() {} bool is_active() { @@ -99,58 +96,6 @@ class Filer { } - /*** async+caching (non-blocking) file interface ***/ - int caching_read(inode_t& inode, - size_t len, - off_t offset, - bufferlist *bl, - Context *onfinish) { - Objecter::OSDRead *rd = new Objecter::OSDRead(bl); - file_to_extents(inode, len, offset, rd->extents); - return oc->readx(rd, inode.ino, onfinish); - } - - int caching_write(inode_t& inode, - size_t len, - off_t offset, - bufferlist& bl) { - Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); - file_to_extents(inode, len, offset, wr->extents); - return oc->writex(wr, inode.ino); - } - - - - /*** sync+blocking file interface ***/ - - int atomic_sync_read(inode_t& inode, - size_t len, off_t offset, - bufferlist *bl, - Mutex &lock) { - Objecter::OSDRead *rd = new Objecter::OSDRead(bl); - file_to_extents(inode, len, offset, rd->extents); - - assert(oc); - int r = oc->atomic_sync_readx(rd, inode.ino, lock); - return r; - } - - int atomic_sync_write(inode_t& inode, - size_t len, off_t offset, - bufferlist& bl, - Mutex &lock) { - Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); - file_to_extents(inode, len, offset, wr->extents); - - assert(oc); - int r = oc->atomic_sync_writex(wr, inode.ino, lock); - return r; - } - - - - - /***** mapping *****/ diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc index 5343b021ae36c..e7da2e20eda43 100644 --- a/ceph/osdc/ObjectCacher.cc +++ b/ceph/osdc/ObjectCacher.cc @@ -105,7 +105,8 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd, if (ex_it->oid != oid) continue; - dout(10) << "map_read " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl; + dout(10) << "map_read " << hex << ex_it->oid << dec + << " " << ex_it->start << "~" << ex_it->length << endl; map::iterator p = data.lower_bound(ex_it->start); // p->first >= start @@ -313,7 +314,7 @@ void ObjectCacher::bh_read(Object *ob, BufferHead *bh) void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl) { - lock.Lock(); + //lock.Lock(); dout(7) << "bh_read_finish " << hex << oid << dec << " " << start << "~" << length @@ -366,7 +367,7 @@ void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, buff finish_contexts(ls); } } - lock.Unlock(); + //lock.Unlock(); } @@ -385,12 +386,13 @@ void ObjectCacher::bh_write(Object *ob, BufferHead *bh) // set bh last_write_tid onack->tid = tid; oncommit->tid = tid; + ob->last_write_tid = tid; bh->last_write_tid = tid; } void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t tid) { - lock.Lock(); + //lock.Lock(); dout(7) << "bh_write_ack " << hex << oid << dec @@ -446,12 +448,12 @@ void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t finish_contexts(ls); } } - lock.Unlock(); + //lock.Unlock(); } void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid_t tid) { - lock.Lock(); + //lock.Lock(); // update object last_commit dout(7) << "bh_write_commit " @@ -477,7 +479,7 @@ void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid } } - lock.Unlock(); + // lock.Unlock(); } @@ -492,7 +494,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) for (list::iterator ex_it = rd->extents.begin(); ex_it != rd->extents.end(); ex_it++) { - dout(10) << "readx ex " << *ex_it << endl; + dout(10) << "readx " << *ex_it << endl; // get Object cache Object *o; @@ -538,6 +540,12 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) } } } else { + // make a plain list + for (map::iterator bh_it = hits.begin(); + bh_it != hits.end(); + bh_it++) + hit_ls.push_back(bh_it->second); + // create reverse map of buffer offset -> object for the eventual result. // this is over a single ObjectExtent, so we know that // - the bh's are contiguous @@ -550,6 +558,12 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) while (1) { BufferHead *bh = bh_it->second; assert(opos == bh->start() + bhoff); + + dout(10) << "readx rmap opos " << opos + << ": " << *bh << " +" << bhoff + << " frag " << f_it->first << "~" << f_it->second << " +" << foff + << endl; + size_t len = MIN(f_it->second - foff, bh->length() - bhoff); stripe_map[f_it->first].substr_of(bh->bl, @@ -561,13 +575,13 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) if (opos == bh->end()) { bh_it++; bhoff = 0; - if (bh_it == hits.end()) break; } if (foff == f_it->second) { f_it++; foff = 0; - if (f_it == ex_it->buffer_extents.end()) break; } + if (bh_it == hits.end()) break; + if (f_it == ex_it->buffer_extents.end()) break; } assert(f_it == ex_it->buffer_extents.end()); assert(bh_it == hits.end()); @@ -581,8 +595,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) bhit++) touch_bh(*bhit); - if (!success) - return -1; + if (!success) return 0; // wait! // no misses... success! do the read. assert(!hit_ls.empty()); @@ -599,7 +612,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) rd->bl->claim_append(i->second); } - return 0; + return pos; } @@ -673,3 +686,74 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute return 0; } + + +// flush. non-blocking, takes callback. +// returns true if already flushed, and deletes the callback. +bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) +{ + if (objects.count(ino) == 0) { + dout(10) << "flush_set on " << hex << ino << dec << " dne" << endl; + delete onfinish; + return true; + } + + Object *ob = objects[ino]; + dout(10) << "flush_set " << *ob << endl; + + bool any_tx = false; + for (map::iterator p = ob->data.begin(); + p != ob->data.end(); + p++) { + BufferHead *bh = p->second; + if (bh->is_tx()) { + any_tx = true; + continue; + } + if (!bh->is_dirty()) continue; + + bh_write(ob, bh); + } + + if (!any_tx) { + dout(10) << "flush_set " << *ob << " has no dirty|tx bhs" << endl; + delete onfinish; + return true; + } + + dout(10) << "flush_set " << *ob << " will finish on ack tid " << ob->last_write_tid << endl; + ob->waitfor_ack[ob->last_write_tid].push_back(onfinish); + return false; +} + + +// commit. non-blocking, takes callback. +// return true if already flushed. +bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) +{ + assert(onfinish); // doesn't make any sense otherwise. + + if (objects.count(ino) == 0) { + dout(10) << "commit_set on " << hex << ino << dec << " dne" << endl; + delete onfinish; + return true; + } + + Object *ob = objects[ino]; + dout(10) << "commit_set " << *ob << endl; + + // make sure it's flushing. + flush_set(ino); + + if (ob->last_write_tid < ob->last_commit_tid) { + dout(10) << "commit_set " << *ob << " will finish on commit tid " << ob->last_write_tid << endl; + ob->waitfor_commit[ob->last_write_tid].push_back(onfinish); + return false; + } else { + dout(10) << "commit_set " << *ob << " all committed" << endl; + delete onfinish; + return true; + } +} + + diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index 62b942f0dc52d..8ebfa1b724e4d 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -8,6 +8,7 @@ #include "common/Cond.h" #include "Objecter.h" +#include "Filer.h" class Objecter; class Objecter::OSDRead; @@ -94,6 +95,7 @@ class ObjectCacher { public: map data; + tid_t last_write_tid; // version of bh (if non-zero) tid_t last_ack_tid; // last update acked. tid_t last_commit_tid; // last update commited. @@ -110,7 +112,7 @@ class ObjectCacher { Object(ObjectCacher *_oc, object_t o, inodeno_t i) : oc(_oc), oid(o), ino(i), - last_ack_tid(0), last_commit_tid(0), + last_write_tid(0), last_ack_tid(0), last_commit_tid(0), lock_state(LOCK_NONE) {} @@ -164,6 +166,7 @@ class ObjectCacher { // ObjectCacher fields private: Objecter *objecter; + Filer filer; Mutex& lock; hash_map objects; @@ -299,10 +302,10 @@ class ObjectCacher { public: ObjectCacher(Objecter *o, Mutex& l) : - objecter(o), lock(l), + objecter(o), filer(o), lock(l), stat_waiter(0), - stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) - {} + stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0) { + } class C_RetryRead : public Context { @@ -313,7 +316,11 @@ class ObjectCacher { public: C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {} void finish(int) { - oc->readx(rd, ino, onfinish); + int r = oc->readx(rd, ino, onfinish); + if (r > 0) { + onfinish->finish(0); + delete onfinish; + } } }; @@ -328,12 +335,56 @@ class ObjectCacher { int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock); int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock); - void flush_set(inodeno_t ino, Context *onfinish=0); + bool flush_set(inodeno_t ino, Context *onfinish=0); void flush_all(Context *onfinish=0); - // help.. need to figure out how to handle this wrt BufferHeads, etc.. - void commit_set(inodeno_t ino, Context *oncommit=0); + bool commit_set(inodeno_t ino, Context *oncommit); void commit_all(Context *oncommit=0); + + // file functions + + /*** async+caching (non-blocking) file interface ***/ + int file_read(inode_t& inode, + size_t len, + off_t offset, + bufferlist *bl, + Context *onfinish) { + Objecter::OSDRead *rd = new Objecter::OSDRead(bl); + filer.file_to_extents(inode, len, offset, rd->extents); + return readx(rd, inode.ino, onfinish); + } + + int file_write(inode_t& inode, + size_t len, + off_t offset, + bufferlist& bl) { + Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); + filer.file_to_extents(inode, len, offset, wr->extents); + return writex(wr, inode.ino); + } + + + + /*** sync+blocking file interface ***/ + + int file_atomic_sync_read(inode_t& inode, + size_t len, off_t offset, + bufferlist *bl, + Mutex &lock) { + Objecter::OSDRead *rd = new Objecter::OSDRead(bl); + filer.file_to_extents(inode, len, offset, rd->extents); + return atomic_sync_readx(rd, inode.ino, lock); + } + + int file_atomic_sync_write(inode_t& inode, + size_t len, off_t offset, + bufferlist& bl, + Mutex &lock) { + Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); + filer.file_to_extents(inode, len, offset, wr->extents); + return atomic_sync_writex(wr, inode.ino, lock); + } + }; @@ -351,4 +402,14 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) return out; } +inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob) +{ + out << "object[" + << hex << ob.get_oid() << " ino " << ob.get_ino() << dec + << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid + << " lock " << ob.lock_state + << "]"; + return out; +} + #endif diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h index e56c1d79caa2e..d42962810929b 100644 --- a/ceph/osdc/Objecter.h +++ b/ceph/osdc/Objecter.h @@ -34,7 +34,8 @@ inline ostream& operator<<(ostream& out, ObjectExtent &ex) { return out << "extent(" << hex << ex.oid << " in " << ex.pgid << dec - << " " << ex.start << "~" << ex.length; + << " " << ex.start << "~" << ex.length + << ")"; } class Objecter { -- 2.39.5