]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
buffer flash crowd stuff ground work is working
authoranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 23:26:00 +0000 (23:26 +0000)
committeranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 16 Mar 2007 23:26:00 +0000 (23:26 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1257 29311d96-e01e-0410-9327-a35deaab8ce9

branches/aleung/security1/ceph/config.cc
branches/aleung/security1/ceph/crypto/ExtCap.h
branches/aleung/security1/ceph/mds/CInode.cc
branches/aleung/security1/ceph/mds/CInode.h
branches/aleung/security1/ceph/mds/Locker.cc
branches/aleung/security1/ceph/mds/Locker.h
branches/aleung/security1/ceph/mds/MDS.cc
branches/aleung/security1/ceph/mds/MDS.h
branches/aleung/security1/ceph/mds/Server.cc

index 65b98991c0e532653f9f75baec6d7595726f4117..4302b3f5c16c8119887b6bd917b0bc3e4dc5056d 100644 (file)
@@ -306,7 +306,7 @@ md_config_t g_conf = {
 
   //security (all principals)
   secure_io:              1, /* 0=off, 1=on */
-  mds_group:              0, /* 0=none, 1=unix, 2=batch, 3=def, 4=predict */
+  mds_group:              0, /* 0=none, 1=unix, 2=batch, 3=define, 4=predict */
   mds_collection:         0, /* 0=none, 1=unix, 3=def */
   unix_group_file:        0, /* 0=no file, non-zero = filename ptr */
   fix_client_id:          0, /* 0=off, 1=on */
index 71f922312e3a4fef06ee10aa5704c4307db06cb1..c379c9076254560479f2a45d1f33f175dedcecf8 100644 (file)
@@ -184,6 +184,10 @@ public:
     data.id.cid = new_id;
     data.id.mds_id = new_mds_id;
   }
+  void set_id(cap_id_t capid) {
+    data.id.cid = capid.cid;
+    data.id.mds_id = capid.mds_id;
+  }
   void set_type(__int8_t new_type) { data.type = new_type;}
 
   void set_user_hash(hash_t nhash) { data.user_group = nhash; }
index d31343bea89c8b5adef2236cbcd14ad20004ca08..a1e462b0a6adff5dc54109867b599f357f9a1528 100644 (file)
@@ -95,6 +95,11 @@ CInode::CInode(MDCache *c, bool auth) {
   user_cap_set = false;
   group_cap_set = false;
   world_cap_set = false;
+
+  batching = false;
+  buffer_stop = false;
+  buffer_thread = BufferThread(this);
+  buffer_thread.create();
   
   auth_pins = 0;
   nested_auth_pins = 0;
@@ -107,7 +112,66 @@ CInode::CInode(MDCache *c, bool auth) {
 
 CInode::~CInode() {
   if (dir) { delete dir; dir = 0; }
+  buffer_lock.Lock();
+  buffer_stop = true;
+  buffer_cond.Signal();
+  buffer_lock.Unlock();
+  buffer_thread.join();  
 }
+void CInode::buffer_entry()
+{
+  cout << "buffer start" << endl;
+  buffer_lock.Lock();
+  while(!buffer_stop) {
+
+    // were gonna get signaled when we start buffering
+    cout << "Buffer waiting" << endl;
+    buffer_cond.Wait(buffer_lock);
+    cout << "Buffer signaled" << endl;
+    
+    // the sleep releases the lock and allows the dispatch
+    // to insert requests into the buffer
+    // sleep first, then serve cap
+    cout << "buffer sleeping to buffer" << endl;
+    buffer_cond.WaitInterval(buffer_lock, utime_t(5,0));
+    
+    /*
+    // now i've slept, make cap for users
+    set<uid_t> user_set;
+    for (set<MClientRequest *>::iterator si = buffered_reqs.begin();
+        si != buffered_reqs.end();
+        si++) {
+      user_set.insert((*si)->get_uid());
+    }
+    MerkleTree users_hash(user_set);
+    ExtCap *ext_cap = new ExtCap(FILE_MODE_RW,
+                                inode.uid,
+                                inode.gid,
+                                users_hash.get_root_hash(),
+                                inode.ino);
+    ext_cap->set_id(batch_id);
+
+    // put the cap in everyones cache
+    for (set<uid_t>::iterator usi = user_set.begin();
+        usi != user_set.end();
+        usi++) {
+      ext_caps[(*usi)] = (*ext_cap);
+    }
+    */
+
+    // let requests loose
+    for (set<MClientRequest *>::iterator ri = buffered_reqs.begin();
+        ri != buffered_reqs.end();
+        ri++) {
+      cout << "ABOUT TO PASS OFF THE REQUEST" << endl;
+      //open_fun_ptr(*ri, this);
+      server->handle_client_open(*ri, this);
+    }   
+  }
+  buffer_lock.Unlock();
+  cout << "buffer finish" << endl;
+}
+
 
 
 // pins
index 18113d2fbd52ba6cbec7fa05fe3846e1def0b0db..bfa5f86135bd4c3cad546621b608b4fc4ba38c98 100644 (file)
@@ -25,7 +25,9 @@
 #include "CDentry.h"
 #include "Lock.h"
 #include "Capability.h"
+#include "Server.h"
 
+#include "messages/MClientRequest.h"
 
 #include <cassert>
 #include <list>
@@ -172,7 +174,33 @@ class CInode : public MDSCacheObject {
 
   // waiters
   multimap<int, Context*>  waiting;
+  
 
+  // batching information
+  bool batching;
+  utime_t two_req_ago;
+  utime_t one_req_ago;
+  set<MClientRequest *> buffered_reqs;
+  bool batch_id_set;
+  cap_id_t batch_id;
+
+  Mutex buffer_lock;
+  Cond buffer_cond;
+  bool buffer_stop;
+  void buffer_entry();
+  // FIXME total hack to have escape point back to server
+  Server *server;
+  //void (Server::*open_fun_ptr) (MClientRequest*, CInode *);
+  class BufferThread : public Thread {
+    CInode *inode;
+  public:
+    BufferThread () {}
+    BufferThread (CInode *in): inode(in) {}
+    void *entry() {
+      inode->buffer_entry();
+      return 0;
+    }
+  } buffer_thread;
 
   // -- distributed state --
 public:
@@ -392,7 +420,7 @@ protected:
   void add_user_extcap(uid_t user, ExtCap* extcap) {
     //if (ext_caps.empty())
     //  get(CINODE_PIN_CAPS);
-    assert(ext_caps.count(user) == 0);
+    //assert(ext_caps.count(user) == 0);
     ext_caps[user] = (*extcap);
   }
   void remove_user_extcap(uid_t user) {
index d235b7d1868a743d6369f7609569c855d7b1472c..f07681f99e44c9ef4dca85f8c787c40558836dff 100644 (file)
@@ -236,17 +236,17 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
   // issue most generic cap (RW)
   my_want |= FILE_MODE_RW;
 
-  utime_t clock_time_start = g_clock.now();
+  //utime_t clock_time_start = g_clock.now();
   utime_t test_time = g_clock.now();
-  utime_t clock_time_end = g_clock.now();
-  cout << "Clock time " << clock_time_end - clock_time_start << endl;
+  //utime_t clock_time_end = g_clock.now();
+  //cout << "Clock time " << clock_time_end - clock_time_start << endl;
 
   // check cache
   ExtCap *ext_cap;
-  utime_t setup_time_end = g_clock.now();
-  cout << "Setup time " << setup_time_end - setup_time_start << endl;
+  //utime_t setup_time_end = g_clock.now();
+  //cout << "Setup time " << setup_time_end - setup_time_start << endl;
 
-  utime_t checkcache_time_start = g_clock.now();
+  //utime_t checkcache_time_start = g_clock.now();
   // unix groups
   if (g_conf.mds_group == 1) {
     if (my_user == in->get_uid())
@@ -259,8 +259,8 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
   // no grouping
   else 
     ext_cap = in->get_user_extcap(my_user);
-  utime_t checkcache_time_end = g_clock.now();
-  cout << "Check cache time " << checkcache_time_end - checkcache_time_start << endl;
+  //utime_t checkcache_time_end = g_clock.now();
+  //cout << "Check cache time " << checkcache_time_end - checkcache_time_start << endl;
 
   if (!ext_cap) {
     // make new cap
@@ -320,15 +320,15 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
     }
     // default no grouping
     else {
-      utime_t make_time_start = g_clock.now();
+      //utime_t make_time_start = g_clock.now();
       ext_cap = new ExtCap(my_want, my_user, in->ino());
-      utime_t make_time_end = g_clock.now();
-      cout << "Capability make time " << make_time_end - make_time_start << endl;
+      //utime_t make_time_end = g_clock.now();
+      //cout << "Capability make time " << make_time_end - make_time_start << endl;
     }
     
     // set capability id
-    ext_cap->set_id(cap_id_count, mds->get_nodeid());
-    cap_id_count++;
+    ext_cap->set_id(mds->cap_id_count, mds->get_nodeid());
+    mds->cap_id_count++;
 
     dout(3) << "Made new " << my_want << " capability for uid: "
        << ext_cap->get_uid() << " for inode: " << ext_cap->get_ino()<< endl;
@@ -339,7 +339,7 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
     cout << "Signature time " << sign_time_end - sign_time_start << endl;
 
     // caches this capability in the inode
-    utime_t cache_time_start = g_clock.now();
+    //utime_t cache_time_start = g_clock.now();
     if (g_conf.mds_group == 1) {
       if (my_user == in->get_uid())
        in->set_unix_user_cap(ext_cap);
@@ -351,8 +351,8 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
     else
       in->add_user_extcap(my_user,ext_cap);
 
-    utime_t cache_time_end = g_clock.now();
-    cout << "Caching time " << cache_time_end - cache_time_start << endl;
+    //utime_t cache_time_end = g_clock.now();
+    //cout << "Caching time " << cache_time_end - cache_time_start << endl;
 
   }
   // we want to index based on mode, so we can cache more caps
@@ -366,10 +366,10 @@ ExtCap* Locker::issue_new_extcaps(CInode *in, int mode, MClientRequest *req) {
     }
   }
   // add capability as recently used
-  utime_t recentcap_time_start = g_clock.now();
+  //utime_t recentcap_time_start = g_clock.now();
   mds->recent_caps.insert(ext_cap->get_id());
-  utime_t recentcap_time_end = g_clock.now();
-  cout << "Recent cap cache time " << recentcap_time_end - recentcap_time_start << endl;
+  //utime_t recentcap_time_end = g_clock.now();
+  //cout << "Recent cap cache time " << recentcap_time_end - recentcap_time_start << endl;
 
   return ext_cap;
 }
index 68c83e9b79f68638eb3e2e63e2145811e540fe91..964d0b9cd71a64816590b1312450556189f059cb 100644 (file)
@@ -49,10 +49,11 @@ private:
   MDS *mds;
   MDCache *mdcache;
   // count of capability id's used
-  int cap_id_count;
+  //int cap_id_count;
  
  public:
-  Locker(MDS *m, MDCache *c) : mds(m), mdcache(c), cap_id_count(0) {}  
+  //Locker(MDS *m, MDCache *c) : mds(m), mdcache(c), cap_id_count(0) {}  
+  Locker(MDS *m, MDCache *c) : mds(m), mdcache(c) {}
 
   void dispatch(Message *m);
 
index 6fbcece87620e9aa20e6e8fc47ea9740ec387273..400a242b7a656bb43c5f69bd4928606a5e388af9 100644 (file)
@@ -95,7 +95,7 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) : timer(mds_lock) {
   myPrivKey = esignPrivKey("crypto/esig1536.dat");
   myPubKey = esignPubKey(myPrivKey);
 
-  // create unix_groups?
+  // create unix_groups from file?
   if (g_conf.unix_group_file) {
     ifstream from(g_conf.unix_group_file);
 
@@ -146,6 +146,9 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) : timer(mds_lock) {
       assert(0);
     }
   }
+
+  // cap identifiers
+  cap_id_count = 0;
  
   // beacon
   beacon_last_seq = 0;
index a37dc75a49c408eaa84f01be46a7db72da64cd62..d5de241bfcd0d4f29b5c2681863671d1885188ea 100644 (file)
@@ -165,6 +165,8 @@ public:
   // recent capabilities to renew
   set<cap_id_t> recent_caps;
   Renewal token;
+  // count of capability ids used
+  int cap_id_count;
 
   void queue_waitfor_active(Context *c) { waitfor_active.push_back(c); }
 
index 800698756e829209fb3ccf99d545578800c791b0..acc2453f7ea9464a18f5c01ddc9c38b28fc5dcf4 100644 (file)
@@ -491,8 +491,35 @@ void Server::dispatch_request(Message *m, CInode *ref)
   case MDS_OP_OPEN:
     if (req->get_iarg() & O_CREAT) 
       handle_client_openc(req, ref);
-    else 
-      handle_client_open(req, ref);
+    else {
+      if (g_conf.mds_group == 2) {
+       utime_t open_req_time = g_clock.now();
+       // if this request is within 10ms of the last 2, flash crowd!
+       cout << "Buffering time check" << open_req_time - ref->two_req_ago
+            << endl;
+       if (open_req_time - ref->two_req_ago < utime_t(0, 10000)) {
+         cout << "Buffering the request" << endl;
+         ref->buffer_lock.Lock();
+         ref->buffered_reqs.insert(req);
+         if (ref->buffer_stop) {
+           ref->batch_id.cid = mds->cap_id_count;
+           ref->batch_id.mds_id = mds->get_nodeid();
+           ref->batch_id_set = true;
+           ref->buffer_stop = false;
+         }
+         ref->buffer_lock.Unlock();
+         return;
+       }
+       else {
+         cout << "Not buffering the request" << endl;
+         ref->two_req_ago = ref->one_req_ago;
+         ref->one_req_ago = open_req_time;
+         handle_client_open(req, ref);
+       }
+      }
+      else
+       handle_client_open(req, ref);
+    }
     break;
   case MDS_OP_TRUNCATE:
     handle_client_truncate(req, ref);
@@ -2431,7 +2458,61 @@ void Server::handle_client_openc(MClientRequest *req, CInode *diri)
   } else {
     // exists!
     // FIXME: do i need to repin path based existant inode? hmm.
-    handle_client_open(req, in);
+    if (g_conf.mds_group == 2) {
+      utime_t open_req_time = g_clock.now();
+      // if this request is within 10ms of the last 2, flash crowd!
+      cout << "Buffering time check" << open_req_time - in->two_req_ago
+          << " against " << utime_t(1, 0) << endl;
+      //if (open_req_time - in->two_req_ago < utime_t(1, 0)) {
+      if (open_req_time > utime_t()) {
+       cout << "Buffering the request" << endl;
+       in->two_req_ago = in->one_req_ago;
+       in->one_req_ago = open_req_time;
+
+       // if buffer waiting thread is off, turn it on
+       if (!in->batching) {
+         // grab lock and insert
+         in->buffer_lock.Lock();
+         in->buffered_reqs.insert(req);
+
+         // prepare capid for future capability
+         in->batch_id.cid = mds->cap_id_count;
+         in->batch_id.mds_id = mds->get_nodeid();
+         mds->cap_id_count++;
+         // turn on batching flags
+         in->batching = true;
+         in->batch_id_set = true;
+         in->buffer_stop = false;
+
+         // set function pointer to open handler
+         //in->open_fun_ptr = (void (*)(MClientRequest*, CInode*))&handle_client_open;
+         in->server = this;
+         
+         //singal the thread
+         cout << "Going to singal" << endl;
+         in->buffer_cond.Signal();
+         cout << "Done signaling" << endl;
+         
+         // release the lock
+         in->buffer_lock.Unlock();
+       }
+       else {
+         // grab lock and insert
+         in->buffer_lock.Lock();
+         in->buffered_reqs.insert(req);
+         in->buffer_lock.Unlock();
+       }
+       return;
+      }
+      else {
+       cout << "Not buffering the request" << endl;
+       in->two_req_ago = in->one_req_ago;
+       in->one_req_ago = open_req_time;
+       handle_client_open(req, in);
+      }
+    }
+    else
+      handle_client_open(req, in);
   }
 }