]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* some changes to client cache: readers/writers block properly, wake up when data...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 00:19:38 +0000 (00:19 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 00:19:38 +0000 (00:19 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1250 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/client/Client.cc
trunk/ceph/client/FileCache.cc
trunk/ceph/client/FileCache.h

index 4e4a6a1b6b737a35a22b4d6126cae93b58184393..9f6914d33d13673e059f0ceddb278bbc7f3eb861 100644 (file)
@@ -875,6 +875,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
   if (in->file_wr_mtime > in->inode.mtime)
     m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime;
 
+
+
   if (g_conf.client_oc) {
     // caching on, use FileCache.
     Context *onimplement = 0;
@@ -2274,7 +2276,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
   tout << size << endl;
   tout << offset << endl;
 
-  assert(offset >= 0);
   assert(fh_map.count(fh));
   Fh *f = fh_map[fh];
   Inode *in = f->inode;
@@ -2284,21 +2285,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
 
   bool lazy = f->mode == FILE_MODE_LAZY;
   
-  // do we have read file cap?
-  while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
-    dout(7) << " don't have read cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_read.push_back(&cond);
-    cond.Wait(client_lock);
-  }  
-  // lazy cap?
-  while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
-     dout(7) << " don't have lazy cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_lazy.push_back(&cond);
-    cond.Wait(client_lock);
-  }
   // determine whether read range overlaps with file
   // ...ONLY if we're doing async io
   if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
@@ -2332,6 +2318,23 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
     rvalue = r = in->fc.read(offset, size, blist, client_lock);  // may block.
   } else {
     // object cache OFF -- legacy inconsistent way.
+
+    // do we have read file cap?
+    while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
+      dout(7) << " don't have read cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_read.push_back(&cond);
+      cond.Wait(client_lock);
+    }  
+    // lazy cap?
+    while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+      dout(7) << " don't have lazy cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_lazy.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+    
+    // do sync read
     Cond cond;
     bool done = false;
     C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
@@ -2398,7 +2401,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
   tout << size << endl;
   tout << offset << endl;
 
-  assert(offset >= 0);
   assert(fh_map.count(fh));
   Fh *f = fh_map[fh];
   Inode *in = f->inode;
@@ -2410,23 +2412,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
 
   dout(10) << "cur file size is " << in->inode.size << "    wr size " << in->file_wr_size << endl;
 
-  // do we have write file cap?
-  while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
-    dout(7) << " don't have write cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_write.push_back(&cond);
-    cond.Wait(client_lock);
-  }
-  while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
-    dout(7) << " don't have lazy cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_lazy.push_back(&cond);
-    cond.Wait(client_lock);
-  }
-
-  // adjust fd pos
-  f->pos = offset+size;
-
   // time it.
   utime_t start = g_clock.now();
     
@@ -2440,11 +2425,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
 
     // write (this may block!)
     in->fc.write(offset, size, blist, client_lock);
+    
+    // adjust fd pos
+    f->pos = offset+size;
 
   } else {
     // legacy, inconsistent synchronous write.
     dout(7) << "synchronous write" << endl;
 
+    // do we have write file cap?
+    while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
+      dout(7) << " don't have write cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_write.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+    while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+      dout(7) << " don't have lazy cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_lazy.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+
     // prepare write
     Cond cond;
     bool done = false;
@@ -2460,6 +2462,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
                 //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
                 ); 
     
+    // adjust fd pos
+    f->pos = offset+size;
+
     while (!done) {
       cond.Wait(client_lock);
       dout(20) << " sync write bump " << onfinish << endl;
index 2a1dd1576ae5917994471ffa9756740154956977..f45bac2c57d0e310478f5a68ada89ddbc760a7c0 100644 (file)
@@ -1,3 +1,15 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "config.h"
 #include "include/types.h"
@@ -8,8 +20,8 @@
 #include "msg/Messenger.h"
 
 #undef dout
-#define dout(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
-#define derr(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
+#define dout(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
+#define derr(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
 
 
 // flush/release/clean
@@ -54,27 +66,51 @@ void FileCache::tear_down()
 {
   off_t unclean = release_clean();
   if (unclean) {
-       dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
-       oc->purge_set(inode.ino);
+    dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+    oc->purge_set(inode.ino);
   }
 }
 
 // caps
 
+class C_FC_CheckCaps : public Context {
+  FileCache *fc;
+public:
+  C_FC_CheckCaps(FileCache *f) : fc(f) {}
+  void finish(int r) {
+       fc->check_caps();
+  }
+};
+
 void FileCache::set_caps(int caps, Context *onimplement) 
 {
   if (onimplement) {
+    dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl;
     assert(latest_caps & ~caps);  // we should be losing caps.
     caps_callbacks[caps].push_back(onimplement);
   }
   
   latest_caps = caps;
   check_caps();  
+
+  // kick waiters?  (did we gain caps?)
+  if (can_read() && !waitfor_read.empty()) 
+    for (set<Cond*>::iterator p = waitfor_read.begin();
+        p != waitfor_read.end();
+        ++p)
+      (*p)->Signal();
+  if (can_write() && !waitfor_write.empty()) 
+    for (set<Cond*>::iterator p = waitfor_write.begin();
+        p != waitfor_write.end();
+        ++p)
+      (*p)->Signal();
+  
 }
 
 
 void FileCache::check_caps()
 {
+  // calc used
   int used = 0;
   if (num_reading) used |= CAP_FILE_RD;
   if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
@@ -82,6 +118,18 @@ void FileCache::check_caps()
   if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
   dout(10) << "check_caps used " << cap_string(used) << endl;
 
+  // try to implement caps?
+  // BUG? latest_caps, not least caps i've seen?
+  if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
+      (used & CAP_FILE_RDCACHE))
+    release_clean();
+  if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
+      (used & CAP_FILE_WRBUFFER))
+    flush_dirty(new C_FC_CheckCaps(this));
+  if (latest_caps == 0 &&
+      used != 0)
+    empty(new C_FC_CheckCaps(this));
+  
   // check callbacks
   map<int, list<Context*> >::iterator p = caps_callbacks.begin();
   while (p != caps_callbacks.end()) {
@@ -109,6 +157,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
 {
   int r = 0;
 
+  // can i read?
+  while ((latest_caps & CAP_FILE_RD) == 0) {
+    dout(10) << "read doesn't have RD cap, blocking" << endl;
+    Cond c;
+    waitfor_read.insert(&c);
+    c.Wait(client_lock);
+    waitfor_read.erase(&c);
+  }
+
   // inc reading counter
   num_reading++;
   
@@ -145,6 +202,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
 
 void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock)
 {
+  // can i write
+  while ((latest_caps & CAP_FILE_WR) == 0) {
+    dout(10) << "write doesn't have WR cap, blocking" << endl;
+    Cond c;
+    waitfor_write.insert(&c);
+    c.Wait(client_lock);
+    waitfor_write.erase(&c);
+  }
+
   // inc writing counter
   num_writing++;
 
index 6bef22f4e0c6a4e62848dce62e6b3ac9c67444c1..d710d38c0731a186471555c6a5edda6b343d01f6 100644 (file)
@@ -1,3 +1,16 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
 #ifndef __FILECACHE_H
 #define __FILECACHE_H
 
@@ -22,9 +35,9 @@ class FileCache {
   //int num_unsafe;
 
   // waiters
-  list<Cond*> waitfor_read;
-  list<Cond*> waitfor_write;
-  //list<Context*> waitfor_safe;
+  set<Cond*> waitfor_read;
+  set<Cond*> waitfor_write;
+
   bool waitfor_release;
 
  public:
@@ -35,7 +48,7 @@ class FileCache {
     num_reading(0), num_writing(0),// num_unsafe(0),
     waitfor_release(false) {}
   ~FileCache() {
-       tear_down();
+    tear_down();
   }
 
   // waiters/waiting
@@ -43,9 +56,7 @@ class FileCache {
   bool can_write() { return latest_caps & CAP_FILE_WR; }
   bool all_safe();// { return num_unsafe == 0; }
 
-  void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
-  void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
-  void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
+  void add_safe_waiter(Context *c);
 
   // ...
   void flush_dirty(Context *onflush=0);