From 2bcc95a7e36cb41163d8b758a0d38e682a2a857b Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 16 Mar 2007 00:19:38 +0000 Subject: [PATCH] * some changes to client cache: readers/writers block properly, wake up when data is flushed git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1250 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/client/Client.cc | 73 +++++++++++++++++---------------- trunk/ceph/client/FileCache.cc | 74 ++++++++++++++++++++++++++++++++-- trunk/ceph/client/FileCache.h | 25 ++++++++---- 3 files changed, 127 insertions(+), 45 deletions(-) diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index 4e4a6a1b6b737..9f6914d33d136 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -875,6 +875,8 @@ void Client::handle_file_caps(MClientFileCaps *m) if (in->file_wr_mtime > in->inode.mtime) m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime; + + if (g_conf.client_oc) { // caching on, use FileCache. Context *onimplement = 0; @@ -2274,7 +2276,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) tout << size << endl; tout << offset << endl; - assert(offset >= 0); assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; @@ -2284,21 +2285,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) bool lazy = f->mode == FILE_MODE_LAZY; - // do we have read file cap? - while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { - dout(7) << " don't have read cap, waiting" << endl; - Cond cond; - in->waitfor_read.push_back(&cond); - cond.Wait(client_lock); - } - // lazy cap? - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { - dout(7) << " don't have lazy cap, waiting" << endl; - Cond cond; - in->waitfor_lazy.push_back(&cond); - cond.Wait(client_lock); - } - // determine whether read range overlaps with file // ...ONLY if we're doing async io if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) { @@ -2332,6 +2318,23 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) rvalue = r = in->fc.read(offset, size, blist, client_lock); // may block. } else { // object cache OFF -- legacy inconsistent way. + + // do we have read file cap? + while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) { + dout(7) << " don't have read cap, waiting" << endl; + Cond cond; + in->waitfor_read.push_back(&cond); + cond.Wait(client_lock); + } + // lazy cap? + while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + dout(7) << " don't have lazy cap, waiting" << endl; + Cond cond; + in->waitfor_lazy.push_back(&cond); + cond.Wait(client_lock); + } + + // do sync read Cond cond; bool done = false; C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue); @@ -2398,7 +2401,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) tout << size << endl; tout << offset << endl; - assert(offset >= 0); assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; @@ -2410,23 +2412,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl; - // do we have write file cap? - while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { - dout(7) << " don't have write cap, waiting" << endl; - Cond cond; - in->waitfor_write.push_back(&cond); - cond.Wait(client_lock); - } - while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { - dout(7) << " don't have lazy cap, waiting" << endl; - Cond cond; - in->waitfor_lazy.push_back(&cond); - cond.Wait(client_lock); - } - - // adjust fd pos - f->pos = offset+size; - // time it. utime_t start = g_clock.now(); @@ -2440,11 +2425,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) // write (this may block!) in->fc.write(offset, size, blist, client_lock); + + // adjust fd pos + f->pos = offset+size; } else { // legacy, inconsistent synchronous write. dout(7) << "synchronous write" << endl; + // do we have write file cap? + while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) { + dout(7) << " don't have write cap, waiting" << endl; + Cond cond; + in->waitfor_write.push_back(&cond); + cond.Wait(client_lock); + } + while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) { + dout(7) << " don't have lazy cap, waiting" << endl; + Cond cond; + in->waitfor_lazy.push_back(&cond); + cond.Wait(client_lock); + } + // prepare write Cond cond; bool done = false; @@ -2460,6 +2462,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots ); + // adjust fd pos + f->pos = offset+size; + while (!done) { cond.Wait(client_lock); dout(20) << " sync write bump " << onfinish << endl; diff --git a/trunk/ceph/client/FileCache.cc b/trunk/ceph/client/FileCache.cc index 2a1dd1576ae59..f45bac2c57d0e 100644 --- a/trunk/ceph/client/FileCache.cc +++ b/trunk/ceph/client/FileCache.cc @@ -1,3 +1,15 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #include "config.h" #include "include/types.h" @@ -8,8 +20,8 @@ #include "msg/Messenger.h" #undef dout -#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache " -#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache " +#define dout(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache " +#define derr(x) if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache " // flush/release/clean @@ -54,27 +66,51 @@ void FileCache::tear_down() { off_t unclean = release_clean(); if (unclean) { - dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl; - oc->purge_set(inode.ino); + dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl; + oc->purge_set(inode.ino); } } // caps +class C_FC_CheckCaps : public Context { + FileCache *fc; +public: + C_FC_CheckCaps(FileCache *f) : fc(f) {} + void finish(int r) { + fc->check_caps(); + } +}; + void FileCache::set_caps(int caps, Context *onimplement) { if (onimplement) { + dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl; assert(latest_caps & ~caps); // we should be losing caps. caps_callbacks[caps].push_back(onimplement); } latest_caps = caps; check_caps(); + + // kick waiters? (did we gain caps?) + if (can_read() && !waitfor_read.empty()) + for (set::iterator p = waitfor_read.begin(); + p != waitfor_read.end(); + ++p) + (*p)->Signal(); + if (can_write() && !waitfor_write.empty()) + for (set::iterator p = waitfor_write.begin(); + p != waitfor_write.end(); + ++p) + (*p)->Signal(); + } void FileCache::check_caps() { + // calc used int used = 0; if (num_reading) used |= CAP_FILE_RD; if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE; @@ -82,6 +118,18 @@ void FileCache::check_caps() if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER; dout(10) << "check_caps used " << cap_string(used) << endl; + // try to implement caps? + // BUG? latest_caps, not least caps i've seen? + if ((latest_caps & CAP_FILE_RDCACHE) == 0 && + (used & CAP_FILE_RDCACHE)) + release_clean(); + if ((latest_caps & CAP_FILE_WRBUFFER) == 0 && + (used & CAP_FILE_WRBUFFER)) + flush_dirty(new C_FC_CheckCaps(this)); + if (latest_caps == 0 && + used != 0) + empty(new C_FC_CheckCaps(this)); + // check callbacks map >::iterator p = caps_callbacks.begin(); while (p != caps_callbacks.end()) { @@ -109,6 +157,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ { int r = 0; + // can i read? + while ((latest_caps & CAP_FILE_RD) == 0) { + dout(10) << "read doesn't have RD cap, blocking" << endl; + Cond c; + waitfor_read.insert(&c); + c.Wait(client_lock); + waitfor_read.erase(&c); + } + // inc reading counter num_reading++; @@ -145,6 +202,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_ void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock) { + // can i write + while ((latest_caps & CAP_FILE_WR) == 0) { + dout(10) << "write doesn't have WR cap, blocking" << endl; + Cond c; + waitfor_write.insert(&c); + c.Wait(client_lock); + waitfor_write.erase(&c); + } + // inc writing counter num_writing++; diff --git a/trunk/ceph/client/FileCache.h b/trunk/ceph/client/FileCache.h index 6bef22f4e0c6a..d710d38c0731a 100644 --- a/trunk/ceph/client/FileCache.h +++ b/trunk/ceph/client/FileCache.h @@ -1,3 +1,16 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + #ifndef __FILECACHE_H #define __FILECACHE_H @@ -22,9 +35,9 @@ class FileCache { //int num_unsafe; // waiters - list waitfor_read; - list waitfor_write; - //list waitfor_safe; + set waitfor_read; + set waitfor_write; + bool waitfor_release; public: @@ -35,7 +48,7 @@ class FileCache { num_reading(0), num_writing(0),// num_unsafe(0), waitfor_release(false) {} ~FileCache() { - tear_down(); + tear_down(); } // waiters/waiting @@ -43,9 +56,7 @@ class FileCache { bool can_write() { return latest_caps & CAP_FILE_WR; } bool all_safe();// { return num_unsafe == 0; } - void add_read_waiter(Cond *c) { waitfor_read.push_back(c); } - void add_write_waiter(Cond *c) { waitfor_write.push_back(c); } - void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); } + void add_safe_waiter(Context *c); // ... void flush_dirty(Context *onflush=0); -- 2.39.5