]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
merged trunk changes r1223:1251 into branches/aleung/security1/ceph
authoranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 01:03:05 +0000 (01:03 +0000)
committeranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 01:03:05 +0000 (01:03 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1253 29311d96-e01e-0410-9327-a35deaab8ce9

branches/aleung/security1/ceph/client/Client.cc
branches/aleung/security1/ceph/client/FileCache.cc
branches/aleung/security1/ceph/client/FileCache.h
branches/aleung/security1/ceph/client/SyntheticClient.cc
branches/aleung/security1/ceph/csyn.cc
branches/aleung/security1/ceph/mds/Locker.cc
branches/aleung/security1/ceph/mds/Server.cc

index e6a08035b5ca726b57fb8949589a8ecbde6d3bdb..4ad458afa9bba65e45747b35a11f059ca5697260 100644 (file)
@@ -1043,6 +1043,8 @@ void Client::handle_file_caps(MClientFileCaps *m)
   if (in->file_wr_mtime > in->inode.mtime)
     m->get_inode().mtime = in->inode.mtime = in->file_wr_mtime;
 
+
+
   if (g_conf.client_oc) {
     // caching on, use FileCache.
     Context *onimplement = 0;
@@ -2247,13 +2249,13 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents,
     assert(diri);
     assert(diri->inode.mode & INODE_MODE_DIR);
 
-    // add . and ..? 
-    string dot("."); 
-    contents[dot] = diri->inode; 
-    if (diri != root) { 
-      string dotdot(".."); 
-      contents[dotdot] = diri->dn->dir->parent_inode->inode; 
-    } 
+    // add . and ..?
+    string dot(".");
+    contents[dot] = diri->inode;
+    if (diri != root) {
+      string dotdot("..");
+      contents[dotdot] = diri->dn->dir->parent_inode->inode;
+    }
 
     if (!reply->get_dir_in().empty()) {
       // only open dir if we're actually adding stuff to it!
@@ -2265,11 +2267,11 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents,
       for (list<InodeStat*>::const_iterator pin = reply->get_dir_in().begin();
            pin != reply->get_dir_in().end(); 
            ++pin, ++pdn) {
-       
-       if (*pdn == ".")  
-         continue; 
-       
-        // count entries
+
+       if (*pdn == ".") 
+         continue;
+
+       // count entries
         res++;
 
         // put in cache
@@ -2283,15 +2285,11 @@ int Client::getdir(const char *relpath, map<string,inode_t>& contents,
         // contents to caller too!
         contents[*pdn] = in->inode;
       }
-      if (dir->is_empty()) 
-       close_dir(dir); 
+
+      if (dir->is_empty())
+       close_dir(dir);
     }
     
-    // add .. too?
-    //if (diri != root && diri->dn && diri->dn->dir) {
-    //Inode *parent = diri->dn->dir->parent_inode;
-    //contents[".."] = parent->inode;
-    //}    
 
     // FIXME: remove items in cache that weren't in my readdir?
     // ***
@@ -2838,7 +2836,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset,
   tout << size << endl;
   tout << offset << endl;
 
-  //assert(offset >= 0);
   assert(fh_map.count(fh));
   Fh *f = fh_map[fh];
   Inode *in = f->inode;
@@ -2854,21 +2851,6 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset,
   ExtCap *read_ext_cap = capcache->get_cache_cap(in->ino(), uid);
   assert(read_ext_cap);
   
-  // do we have read file cap?
-  while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
-    dout(7) << " don't have read cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_read.push_back(&cond);
-    cond.Wait(client_lock);
-  }  
-  // lazy cap?
-  while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
-     dout(7) << " don't have lazy cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_lazy.push_back(&cond);
-    cond.Wait(client_lock);
-  }
   // determine whether read range overlaps with file
   // ...ONLY if we're doing async io
   if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
@@ -2902,6 +2884,23 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset,
     rvalue = r = in->fc.read(offset, size, blist, client_lock, read_ext_cap);  // may block.
   } else {
     // object cache OFF -- legacy inconsistent way.
+
+    // do we have read file cap?
+    while (!lazy && (in->file_caps() & CAP_FILE_RD) == 0) {
+      dout(7) << " don't have read cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_read.push_back(&cond);
+      cond.Wait(client_lock);
+    }  
+    // lazy cap?
+    while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+      dout(7) << " don't have lazy cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_lazy.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+    
+    // do sync read
     Cond cond;
     bool done = false;
     C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
@@ -2988,7 +2987,6 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset,
   tout << size << endl;
   tout << offset << endl;
 
-  //assert(offset >= 0);
   assert(fh_map.count(fh));
   Fh *f = fh_map[fh];
   Inode *in = f->inode;
@@ -3000,26 +2998,8 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset,
 
   dout(10) << "cur file size is " << in->inode.size << "    wr size " << in->file_wr_size << endl;
 
-  //ExtCap *write_ext_cap = in->get_ext_cap(uid);
-  ExtCap *write_ext_cap = capcache->get_cache_cap(in->ino(), uid);
-  assert(write_ext_cap);
-
-  // do we have write file cap?
-  while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
-    dout(7) << " don't have write cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_write.push_back(&cond);
-    cond.Wait(client_lock);
-  }
-  while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
-    dout(7) << " don't have lazy cap, waiting" << endl;
-    Cond cond;
-    in->waitfor_lazy.push_back(&cond);
-    cond.Wait(client_lock);
-  }
-
-  // adjust fd pos
-  f->pos = offset+size;
+    ExtCap *write_ext_cap = capcache->get_cache_cap(in->ino(), uid);
+    assert(write_ext_cap);
 
   // time it.
   utime_t start = g_clock.now();
@@ -3034,11 +3014,28 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset,
 
     // write (this may block!)
     in->fc.write(offset, size, blist, client_lock, write_ext_cap);
+    
+    // adjust fd pos
+    f->pos = offset+size;
 
   } else {
     // legacy, inconsistent synchronous write.
     dout(7) << "synchronous write" << endl;
 
+    // do we have write file cap?
+    while (!lazy && (in->file_caps() & CAP_FILE_WR) == 0) {
+      dout(7) << " don't have write cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_write.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+    while (lazy && (in->file_caps() & CAP_FILE_LAZYIO) == 0) {
+      dout(7) << " don't have lazy cap, waiting" << endl;
+      Cond cond;
+      in->waitfor_lazy.push_back(&cond);
+      cond.Wait(client_lock);
+    }
+
     // prepare write
     Cond cond;
     bool done = false;
@@ -3054,6 +3051,9 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset,
                 //, 1+((int)g_clock.now()) / 10 //f->pos // hack hack test osd revision snapshots
                 ); 
     
+    // adjust fd pos
+    f->pos = offset+size;
+
     while (!done) {
       cond.Wait(client_lock);
       dout(20) << " sync write bump " << onfinish << endl;
index b4155125e829b5c6e928cfa10b701ea5c58880d1..b8e4dd0b2f6c68723824a7e2bff9f20bd7c8eea0 100644 (file)
@@ -1,3 +1,15 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
 
 #include "config.h"
 #include "include/types.h"
@@ -10,8 +22,8 @@
 #include "crypto/ExtCap.h"
 
 #undef dout
-#define dout(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
-#define derr(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myaddr() << ".filecache "
+#define dout(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
+#define derr(x)  if (x <= g_conf.debug_client) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".filecache "
 
 
 // flush/release/clean
@@ -56,27 +68,51 @@ void FileCache::tear_down()
 {
   off_t unclean = release_clean();
   if (unclean) {
-       dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
-       oc->purge_set(inode.ino);
+    dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+    oc->purge_set(inode.ino);
   }
 }
 
 // caps
 
+class C_FC_CheckCaps : public Context {
+  FileCache *fc;
+public:
+  C_FC_CheckCaps(FileCache *f) : fc(f) {}
+  void finish(int r) {
+       fc->check_caps();
+  }
+};
+
 void FileCache::set_caps(int caps, Context *onimplement) 
 {
   if (onimplement) {
+    dout(10) << "set_caps setting onimplement context for " << cap_string(caps) << endl;
     assert(latest_caps & ~caps);  // we should be losing caps.
     caps_callbacks[caps].push_back(onimplement);
   }
   
   latest_caps = caps;
   check_caps();  
+
+  // kick waiters?  (did we gain caps?)
+  if (can_read() && !waitfor_read.empty()) 
+    for (set<Cond*>::iterator p = waitfor_read.begin();
+        p != waitfor_read.end();
+        ++p)
+      (*p)->Signal();
+  if (can_write() && !waitfor_write.empty()) 
+    for (set<Cond*>::iterator p = waitfor_write.begin();
+        p != waitfor_write.end();
+        ++p)
+      (*p)->Signal();
+  
 }
 
 
 void FileCache::check_caps()
 {
+  // calc used
   int used = 0;
   if (num_reading) used |= CAP_FILE_RD;
   if (oc->set_is_cached(inode.ino)) used |= CAP_FILE_RDCACHE;
@@ -84,6 +120,18 @@ void FileCache::check_caps()
   if (oc->set_is_dirty_or_committing(inode.ino)) used |= CAP_FILE_WRBUFFER;
   dout(10) << "check_caps used " << cap_string(used) << endl;
 
+  // try to implement caps?
+  // BUG? latest_caps, not least caps i've seen?
+  if ((latest_caps & CAP_FILE_RDCACHE) == 0 &&
+      (used & CAP_FILE_RDCACHE))
+    release_clean();
+  if ((latest_caps & CAP_FILE_WRBUFFER) == 0 &&
+      (used & CAP_FILE_WRBUFFER))
+    flush_dirty(new C_FC_CheckCaps(this));
+  //if (latest_caps == 0 &&
+  //  used != 0)
+  //empty(new C_FC_CheckCaps(this));
+  
   // check callbacks
   map<int, list<Context*> >::iterator p = caps_callbacks.begin();
   while (p != caps_callbacks.end()) {
@@ -111,6 +159,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
 {
   int r = 0;
 
+  // can i read?
+  while ((latest_caps & CAP_FILE_RD) == 0) {
+    dout(10) << "read doesn't have RD cap, blocking" << endl;
+    Cond c;
+    waitfor_read.insert(&c);
+    c.Wait(client_lock);
+    waitfor_read.erase(&c);
+  }
+
   // inc reading counter
   num_reading++;
   
@@ -147,6 +204,15 @@ int FileCache::read(off_t offset, size_t size, bufferlist& blist, Mutex& client_
 
 void FileCache::write(off_t offset, size_t size, bufferlist& blist, Mutex& client_lock, ExtCap *write_ext_cap)
 {
+  // can i write
+  while ((latest_caps & CAP_FILE_WR) == 0) {
+    dout(10) << "write doesn't have WR cap, blocking" << endl;
+    Cond c;
+    waitfor_write.insert(&c);
+    c.Wait(client_lock);
+    waitfor_write.erase(&c);
+  }
+
   // inc writing counter
   num_writing++;
 
index acf6319fb2632f8a095462e7ab255546e5435aee..7209a72d4ac182ed67b4fc78eb37cdfb19498aab 100644 (file)
@@ -1,3 +1,16 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
 #ifndef __FILECACHE_H
 #define __FILECACHE_H
 
@@ -26,9 +39,9 @@ class FileCache {
   //int num_unsafe;
 
   // waiters
-  list<Cond*> waitfor_read;
-  list<Cond*> waitfor_write;
-  //list<Context*> waitfor_safe;
+  set<Cond*> waitfor_read;
+  set<Cond*> waitfor_write;
+
   bool waitfor_release;
 
  public:
@@ -39,7 +52,7 @@ class FileCache {
     num_reading(0), num_writing(0),// num_unsafe(0),
     waitfor_release(false) {}
   ~FileCache() {
-       tear_down();
+    tear_down();
   }
 
   // waiters/waiting
@@ -47,9 +60,7 @@ class FileCache {
   bool can_write() { return latest_caps & CAP_FILE_WR; }
   bool all_safe();// { return num_unsafe == 0; }
 
-  void add_read_waiter(Cond *c) { waitfor_read.push_back(c); }
-  void add_write_waiter(Cond *c) { waitfor_write.push_back(c); }
-  void add_safe_waiter(Context *c);// { waitfor_safe.push_back(c); }
+  void add_safe_waiter(Context *c);
 
   // ...
   void flush_dirty(Context *onflush=0);
index 7408f8f3a9cc2e3d9df1ec76612e15b5fb0bcd93..947cf5798fb123ab009b02ad588900d7381ddee0 100644 (file)
@@ -707,6 +707,8 @@ int SyntheticClient::clean_dir(string& basedir)
   for (map<string, inode_t>::iterator it = contents.begin();
        it != contents.end();
        it++) {
+    if (it->first == ".") continue;
+    if (it->first == "..") continue;
     string file = basedir + "/" + it->first;
 
     if (time_to_stop()) break;
index b5e4892cb9a6e602c3f33f1a87952b8713cb5c71..0f95ee56b26020c8f8823ec156eaa7c99b0a1062 100644 (file)
@@ -53,31 +53,44 @@ int main(int argc, char **argv, char *envp[]) {
   // start up network
   rank.start_rank();
 
-  // start client
-  Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
-  client->init();
+  list<Client*> clients;
+  list<SyntheticClient*> synclients;
+
+  cout << "mounting and starting " << g_conf.num_client << " syn client(s)" << endl;
+  for (int i=0; i<g_conf.num_client; i++) {
+    // start client
+    Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
+    client->init();
     
-  // start syntheticclient
-  SyntheticClient *syn = new SyntheticClient(client);
+    // start syntheticclient
+    SyntheticClient *syn = new SyntheticClient(client);
 
-  // start up fuse
-  // use my argc, argv (make sure you pass a mount point!)
-  cout << "mounting" << endl;
-  client->mount();
-  
-  cout << "starting syn client" << endl;
-  syn->start_thread();
+    client->mount();
+    
+    syn->start_thread();
+
+    clients.push_back(client);
+    synclients.push_back(syn);
+  }
+
+  cout << "waiting for client(s) to finish" << endl;
+  while (!clients.empty()) {
+    Client *client = clients.front();
+    SyntheticClient *syn = synclients.front();
+    clients.pop_front();
+    synclients.pop_front();
+    
+    // wait
+    syn->join_thread();
 
-  // wait
-  syn->join_thread();
+    // unmount
+    client->unmount();
+    client->shutdown();
 
-  // unmount
-  client->unmount();
-  cout << "unmounted" << endl;
-  client->shutdown();
-  
-  delete client;
-  
+    delete syn;
+    delete client;
+  }
+    
   // wait for messenger to finish
   rank.wait();
   
index 894f40fc5ca920beab533212f9d95cb42f1b68f6..d235b7d1868a743d6369f7609569c855d7b1472c 100644 (file)
@@ -2021,12 +2021,13 @@ void Locker::dentry_xlock_finish(CDentry *dn, bool quiet)
   // unpin dir
   dn->dir->auth_unpin();
 
-  // kick waiters 
-  list<Context*> finished; 
-  dn->dir->take_waiting(CDIR_WAIT_DNREAD, finished); 
-  mds->queue_finished(finished); 
+  // kick waiters
+  list<Context*> finished;
+  dn->dir->take_waiting(CDIR_WAIT_DNREAD, finished);
+  mds->queue_finished(finished);
 }
 
+
 /*
  * onfinish->finish() will be called with 
  * 0 on successful xlock,
index af5a8bee3954e57a3e77195b21854a89158df884..800698756e829209fb3ccf99d545578800c791b0 100644 (file)
@@ -2399,8 +2399,9 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri)
   CDentry *dn = 0;
   
   // make dentry and inode, xlock dentry.
-  //int r = prepare_mknod(req, diri, &in, &dn);
-  int r = prepare_mknod(req, diri, &in, &dn, true); 
+  bool excl = req->get_iarg() & O_EXCL;
+  int r = prepare_mknod(req, diri, &in, &dn, !excl);
+
   if (!r) 
     return; // wait on something
   assert(in);