From 45e23298dd4534deb376f0a546510ae07fdb6806 Mon Sep 17 00:00:00 2001 From: sage Date: Wed, 9 Aug 2006 04:23:06 +0000 Subject: [PATCH] client oc fixup git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@792 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/Client.cc | 7 +++- ceph/client/Client.h | 4 +- ceph/client/FileCache.cc | 26 ++++++++++--- ceph/client/FileCache.h | 10 ++--- ceph/common/Clock.h | 2 +- ceph/config.cc | 6 +++ ceph/config.h | 1 + ceph/include/lru.h | 18 +++++---- ceph/osdc/ObjectCacher.cc | 81 ++++++++++++++++++++++++++++----------- ceph/osdc/ObjectCacher.h | 2 + ceph/osdc/Objecter.cc | 2 +- 11 files changed, 113 insertions(+), 46 deletions(-) diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 2089c800c8feb..8cb83aad971b2 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -538,6 +538,7 @@ void Client::dispatch(Message *m) } else { dout(10) << "unmounting: trim pass, size still " << lru.lru_get_size() << "+" << inode_map.size() << endl; + dump_cache(); } } @@ -1541,8 +1542,10 @@ int Client::open(const char *relpath, int mode) // caps included? int mds = MSG_ADDR_NUM(reply->get_source()); - if (f->inode->caps.empty()) // first caps? + if (f->inode->caps.empty()) {// first caps? + dout(7) << " first caps on " << hex << f->inode->inode.ino << dec << endl; f->inode->get(); + } int new_caps = reply->get_file_caps(); new_caps &= CAP_FILE_WR|CAP_FILE_RD; // HACK: test synchronous read/write @@ -1865,7 +1868,7 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) // create a buffer that refers to *buf, but doesn't try to free it when it's done. bufferlist blist; blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) ); - + // issue write Cond cond; bool done = false; diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 904d5680ce282..4a08e6d976b58 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -148,11 +148,11 @@ class Inode { void get() { ref++; - //cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl; + cout << "inode.get on " << hex << inode.ino << dec << " now " << ref << endl; } void put() { ref--; assert(ref >= 0); - //cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl; + cout << "inode.put on " << hex << inode.ino << dec << " now " << ref << endl; } Inode(inode_t _inode, ObjectCacher *_oc) : diff --git a/ceph/client/FileCache.cc b/ceph/client/FileCache.cc index 2092c11a982ee..8aefd35f18824 100644 --- a/ceph/client/FileCache.cc +++ b/ceph/client/FileCache.cc @@ -10,7 +10,10 @@ void FileCache::flush_dirty(Context *onflush) { - oc->flush_set(inode.ino, onflush); + if (oc->flush_set(inode.ino, onflush)) { + onflush->finish(0); + delete onflush; + } } off_t FileCache::release_clean() @@ -31,15 +34,12 @@ bool FileCache::is_dirty() void FileCache::empty(Context *onempty) { off_t unclean = release_clean(); - bool clean = oc->flush_set(inode.ino); + bool clean = oc->flush_set(inode.ino, onempty); assert(!unclean == clean); - + if (clean) { onempty->finish(0); delete onempty; - } else { - clean = oc->flush_set(inode.ino, onempty); - assert(!clean); } } @@ -147,3 +147,17 @@ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& clien if (num_writing == 0 && !caps_callbacks.empty()) check_caps(); } + +bool FileCache::all_safe() +{ + return !oc->set_is_dirty_or_committing(inode.ino); +} + +void FileCache::add_safe_waiter(Context *c) +{ + bool safe = oc->commit_set(inode.ino, c); + if (safe) { + c->finish(0); + delete c; + } +} diff --git a/ceph/client/FileCache.h b/ceph/client/FileCache.h index 1b0eb3b948e02..2e2cfc4e31c4e 100644 --- a/ceph/client/FileCache.h +++ b/ceph/client/FileCache.h @@ -19,12 +19,12 @@ class FileCache { int num_reading; int num_writing; - int num_unsafe; + //int num_unsafe; // waiters list waitfor_read; list waitfor_write; - list waitfor_safe; + //list waitfor_safe; bool waitfor_release; public: @@ -32,17 +32,17 @@ class FileCache { oc(_oc), inode(_inode), latest_caps(0), - num_reading(0), num_writing(0), num_unsafe(0), + num_reading(0), num_writing(0),// num_unsafe(0), waitfor_release(false) {} // waiters/waiting bool can_read() { return latest_caps & CAP_FILE_RD; } bool can_write() { return latest_caps & CAP_FILE_WR; } - bool all_safe() { return num_unsafe == 0; } + bool all_safe();// { return num_unsafe == 0; } void add_read_waiter(Cond *c) { waitfor_read.push_back(c); } void add_write_waiter(Cond *c) { waitfor_write.push_back(c); } - void add_safe_waiter(Context *c) { waitfor_safe.push_back(c); } + void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); } // ... void flush_dirty(Context *onflush=0); diff --git a/ceph/common/Clock.h b/ceph/common/Clock.h index 492e73c99f031..4ee2fcd6fe5a2 100644 --- a/ceph/common/Clock.h +++ b/ceph/common/Clock.h @@ -132,7 +132,7 @@ class Clock { // relative time (from startup) const utime_t& now() { gettimeofday(&last.timeval(), NULL); - last -= zero; + //last -= zero; //last = abs_last - start_offset; return last; } diff --git a/ceph/config.cc b/ceph/config.cc index 3096757ce1dc7..c0e75b18a1888 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -71,6 +71,7 @@ md_config_t g_conf = { debug_mds_log: 1, debug_buffer: 0, debug_filer: 0, + debug_objecter: 0, debug_objectcacher: 0, debug_client: 0, debug_osd: 0, @@ -384,6 +385,11 @@ void parse_config_options(vector& args) g_conf.debug_filer = atoi(args[++i]); else g_debug_after_conf.debug_filer = atoi(args[++i]); + else if (strcmp(args[i], "--debug_objecter") == 0) + if (!g_conf.debug_after) + g_conf.debug_objecter = atoi(args[++i]); + else + g_debug_after_conf.debug_objecter = atoi(args[++i]); else if (strcmp(args[i], "--debug_objectcacher") == 0) if (!g_conf.debug_after) g_conf.debug_objectcacher = atoi(args[++i]); diff --git a/ceph/config.h b/ceph/config.h index b60d18ae97774..ef28dc98c3db0 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -45,6 +45,7 @@ struct md_config_t { int debug_mds_log; int debug_buffer; int debug_filer; + int debug_objecter; int debug_objectcacher; int debug_client; int debug_osd; diff --git a/ceph/include/lru.h b/ceph/include/lru.h index a8079ef13873c..38a86e4d7da9c 100644 --- a/ceph/include/lru.h +++ b/ceph/include/lru.h @@ -258,15 +258,13 @@ class LRU { // expire -- expire a single item - LRUObject *lru_expire() { + LRUObject *lru_get_next_expire() { LRUObject *p; - + // look through tail of bot while (lru_bot.get_length()) { p = lru_bot.get_tail(); - - if (!p->lru_pinned) - return lru_remove(p); // yay. + if (!p->lru_pinned) return p; // move to pintail lru_bot.remove(p); @@ -276,8 +274,7 @@ class LRU { // ok, try head then while (lru_top.get_length()) { p = lru_top.get_tail(); - if (!p->lru_pinned) - return lru_remove( p ); + if (!p->lru_pinned) return p; // move to pintail lru_top.remove(p); @@ -287,6 +284,13 @@ class LRU { // no luck! return NULL; } + + LRUObject *lru_expire() { + LRUObject *p = lru_get_next_expire(); + if (p) + return lru_remove(p); + return NULL; + } void lru_status() { diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc index 39dbe598789f6..4686ed27d923e 100644 --- a/ceph/osdc/ObjectCacher.cc +++ b/ceph/osdc/ObjectCacher.cc @@ -417,14 +417,14 @@ void ObjectCacher::lock_ack(list& oids, tid_t tid) } Object *ob = objects[oid]; + + list ls; assert(tid <= ob->last_write_tid); if (ob->last_write_tid == tid) { dout(10) << "lock_ack " << *ob << " tid " << tid << endl; - list ls; - switch (ob->lock_state) { case Object::LOCK_RDUNLOCKING: case Object::LOCK_WRUNLOCKING: @@ -448,14 +448,21 @@ void ObjectCacher::lock_ack(list& oids, tid_t tid) ob->last_ack_tid = tid; - finish_contexts(ls); - if (ob->can_close()) close_object(ob); } else { dout(10) << "lock_ack " << *ob << " tid " << tid << " obsolete" << endl; } + + // waiters? + if (ob->waitfor_ack.count(tid)) { + ls.splice(ls.end(), ob->waitfor_ack[tid]); + ob->waitfor_ack.erase(tid); + } + + finish_contexts(ls); + } } @@ -553,6 +560,22 @@ void ObjectCacher::bh_write_commit(object_t oid, off_t start, size_t length, tid } +void ObjectCacher::flush() +{ + utime_t cutoff = g_clock.now(); + //cutoff.sec_ref() -= g_conf.client_oc_max_dirty_age; + + dout(10) << "flush" << endl; + + while (1) { + BufferHead *bh = (BufferHead*) lru_dirty.lru_get_next_expire(); + if (!bh) break; + if (bh->last_write > cutoff) break; + + bh_write(bh->ob, bh); + } +} + void ObjectCacher::trim(off_t max) { if (max < 0) @@ -717,6 +740,8 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) { + utime_t now = g_clock.now(); + for (list::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ex_it++) { @@ -748,8 +773,12 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) // it's dirty. mark_dirty(bh); + touch_bh(bh); + bh->last_write = now; } + delete wr; + trim(); return 0; } @@ -801,7 +830,10 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& Object *o = get_object(i->first, ino); rdlock(o); } - + + // readx will hose rd + list extents = rd->extents; + // do the read, into our cache Cond cond; bool done = false; @@ -811,8 +843,8 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& while (!done) cond.Wait(lock); // release the locks - for (list::iterator ex_it = rd->extents.begin(); - ex_it != rd->extents.end(); + for (list::iterator ex_it = extents.begin(); + ex_it != extents.end(); ex_it++) { assert(objects.count(ex_it->oid)); Object *o = objects[ex_it->oid]; @@ -873,20 +905,23 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute wrlock(o); } + // writex will hose wr + list extents = wr->extents; + // do the write, into our cache writex(wr, ino); // flush // ...and release the locks? - for (list::iterator ex_it = wr->extents.begin(); - ex_it != wr->extents.end(); + for (list::iterator ex_it = extents.begin(); + ex_it != extents.end(); ex_it++) { assert(objects.count(ex_it->oid)); Object *o = objects[ex_it->oid]; wrunlock(o); } - + return 0; } @@ -1101,12 +1136,11 @@ bool ObjectCacher::flush(Object *ob) } // flush. non-blocking, takes callback. -// returns true if already flushed, and deletes the callback. +// returns true if already flushed bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) { if (objects_by_ino.count(ino) == 0) { dout(10) << "flush_set on " << hex << ino << dec << " dne" << endl; - delete onfinish; return true; } @@ -1115,6 +1149,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) C_Gather *gather = 0; // we'll need to wait for all objects to flush! set& s = objects_by_ino[ino]; + bool safe = true; for (set::iterator i = s.begin(); i != s.end(); i++) { @@ -1122,20 +1157,21 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish) if (!flush(ob)) { // we'll need to gather... - if (!gather) + if (!gather && onfinish) gather = new C_Gather(onfinish); + safe = false; dout(10) << "flush_set " << hex << ino << dec << " will wait for ack tid " << ob->last_write_tid << " on " << *ob << endl; - ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub()); + if (gather) + ob->waitfor_ack[ob->last_write_tid].push_back(gather->new_sub()); } } - if (!gather) { + if (safe) { dout(10) << "flush_set " << hex << ino << dec << " has no dirty|tx bhs" << endl; - delete onfinish; return true; } return false; @@ -1150,7 +1186,6 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) if (objects_by_ino.count(ino) == 0) { dout(10) << "commit_set on " << hex << ino << dec << " dne" << endl; - delete onfinish; return true; } @@ -1159,6 +1194,7 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) C_Gather *gather = 0; // we'll need to wait for all objects to commit set& s = objects_by_ino[ino]; + bool safe = true; for (set::iterator i = s.begin(); i != s.end(); i++) { @@ -1167,18 +1203,19 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) // make sure it's flushing. flush_set(ino); - if (ob->last_write_tid < ob->last_commit_tid) { + if (ob->last_write_tid > ob->last_commit_tid) { dout(10) << "commit_set " << hex << ino << dec << " " << *ob << " will finish on commit tid " << ob->last_write_tid << endl; - if (!gather) gather = new C_Gather(onfinish); - ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() ); + if (!gather && onfinish) gather = new C_Gather(onfinish); + safe = false; + if (gather) + ob->waitfor_commit[ob->last_write_tid].push_back( gather->new_sub() ); } } - if (!gather) { + if (safe) { dout(10) << "commit_set " << hex << ino << dec << " all committed" << endl; - delete onfinish; return true; } return false; diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index f637709b500fc..f2a23cb029055 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -41,6 +41,7 @@ class ObjectCacher { Object *ob; bufferlist bl; tid_t last_write_tid; // version of bh (if non-zero) + utime_t last_write; map< off_t, list > waitfor_read; @@ -314,6 +315,7 @@ class ObjectCacher { void bh_write(Object *ob, BufferHead *bh); void trim(off_t max=-1); + void flush(); bool flush(Object *o); off_t release(Object *o); diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index 8d75ec2ceb845..fa7ab29c88d2a 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -14,7 +14,7 @@ #include "config.h" #undef dout -#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << messenger->get_myaddr() << ".objecter " +#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter " // messages ------------------------------ -- 2.39.5