]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
forgot the batch file
authoranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 24 Mar 2007 21:51:14 +0000 (21:51 +0000)
committeranwleung <anwleung@29311d96-e01e-0410-9327-a35deaab8ce9>
Sat, 24 Mar 2007 21:51:14 +0000 (21:51 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1301 29311d96-e01e-0410-9327-a35deaab8ce9

branches/aleung/security1/ceph/mds/UserBatch.h [new file with mode: 0644]

diff --git a/branches/aleung/security1/ceph/mds/UserBatch.h b/branches/aleung/security1/ceph/mds/UserBatch.h
new file mode 100644 (file)
index 0000000..8d50c30
--- /dev/null
@@ -0,0 +1,240 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __USERBATCH_H
+#define __USERBATCH_H
+
+#include "include/types.h"
+#include "common/Clock.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+
+#include "messages/MClientRequest.h"
+
+class UserBatch {
+
+  // is the thread initiated
+  bool thread_init;
+  // is the thread running
+  bool batch_stop;
+  // are we batching requests
+  bool batching;
+
+  Mutex batch_lock;
+  Cond batch_cond;
+
+public:
+
+  Server *server;
+  MDS *mds;
+
+  utime_t one_req_ago;
+  utime_t two_req_ago;
+  uid_t user;
+  gid_t user_group;
+
+  set<MClientRequest*> batched_requests;
+  bool batch_id_set;
+  cap_id_t batch_id;
+
+  bool created;
+
+  class BatchThread : public Thread {
+    UserBatch *batch;
+  public:
+    BatchThread() {}
+    BatchThread  (UserBatch *ub) : batch (ub) {}
+    void *entry() {
+      batch->batch_entry();
+      return 0;
+    }
+  } batch_thread;
+
+  UserBatch() {
+    thread_init = false;
+    batching = false;
+    batch_stop = false;
+    created = false;
+  }
+
+  UserBatch(Server *serve, MDS* metads, uid_t uid) : server(serve),
+                                                    mds(metads), user(uid) {
+    thread_init = false;
+    batching = false;
+    batch_stop = false;
+    created = true;
+    batch_thread = BatchThread(this);
+    batch_thread.create();
+  }
+  ~UserBatch () {
+    batch_lock.Lock();
+    batch_stop = true;
+    batch_cond.Signal();
+    batch_lock.Unlock();
+    batch_thread.join();
+  }
+
+  //void start_batching_thread() {
+  //  batch_thread.BatchThread(this);
+  //  batch_thread.create();
+  //}
+
+  void init(Server *serve, MDS* metads, uid_t uid) {
+    server = serve;
+    mds = metads;
+    user = uid;
+    thread_init = false;
+    batching = false;
+    batch_stop = false;
+    batch_thread = BatchThread(this);
+    batch_thread.create();
+  }
+  
+  bool is_thread_init() { return thread_init; }
+  bool is_batching() { return batching; }
+  bool should_batch(utime_t new_request_time) {
+    //if (new_request_time - two_req_ago < utime_t(0,10000)) // 10ms between
+    if (new_request_time > utime_t()) // always batch
+      return true;
+    return false;
+  }
+  void update_batch_time(utime_t new_request_time) {
+    two_req_ago = one_req_ago;
+    one_req_ago = new_request_time;
+  }
+
+  void add_to_batch(MClientRequest *req, CInode *newi) {
+    cout << "Batching the request for uid:"
+        << req->get_caller_uid() << " on client:"
+        << req->get_client() << " for file:"
+        << req->get_ino() << " test2:" << newi->ino()
+        << " with client inst:"
+        << req->get_client_inst() << endl;
+    
+    batch_lock.Lock();
+    
+    // wait until the thread has initialized
+    while (! thread_init)
+      batch_cond.Wait(batch_lock);
+    
+    // was batching thread already on?
+    if (batching) {
+      // set the correct inode number
+      cout << "Inserting " << newi->ino() << " into batch" << endl;
+      req->set_ino(newi->ino());
+      batched_requests.insert(req);
+    }
+    else {
+      // set the user were batching for
+      user = req->get_caller_uid();
+      user_group = req->get_caller_gid();
+
+      // set the future capid
+      batch_id.cid = mds->cap_id_count;
+      batch_id.mds_id = mds->get_nodeid();
+      mds->cap_id_count++;
+      
+      batching = true;
+      batch_id_set = true;
+      
+      // set the correct inode number
+      cout << "Inserting " << newi->ino() << " into batch" << endl;
+      req->set_ino(newi->ino());
+      
+      batched_requests.insert(req);
+      
+      // start the buffering now
+      batch_cond.Signal();
+    }
+    
+    batch_lock.Unlock();
+    return;
+  }
+
+  void batch_entry()
+  {
+    cout << "batch thread start------>" << endl;
+    batch_lock.Lock();
+    
+    // init myself and signal anyone waiting for me to init
+    thread_init = true;
+    batch_cond.Signal();
+    
+    while(!batch_stop) {
+      
+      // ifwe're not buffering, then,
+      // were gonna get signaled when we start buffering
+      // plus i need to release the lock for anyone
+      // waiting for me to init
+      while (!batching)
+       batch_cond.Wait(batch_lock);
+      
+      // the sleep releases the lock and allows the dispatch
+      // to insert requests into the buffer
+      // sleep first, then serve cap
+      batch_cond.WaitInterval(batch_lock, utime_t(5,0));
+      
+      // now i've slept, make cap for users
+      list<inodeno_t> inode_list;
+      CapGroup inode_hash;
+      for (set<MClientRequest *>::iterator si = batched_requests.begin();
+          si != batched_requests.end();
+          si++) {
+       cout << "Pushing inode " << (*si)->get_ino() << " back" << endl;
+       inode_list.push_back((*si)->get_ino());
+       inode_hash.add_inode((*si)->get_ino());
+      }
+      inode_hash.sign_list(mds->getPrvKey());
+
+      // cache the cap group for the files
+      cout << "Caching group " << inode_hash.get_root_hash()
+          << " at the MDS"<< endl;
+      mds->unix_groups_byhash[inode_hash.get_root_hash()]= inode_hash;
+      
+      ExtCap *ext_cap = new ExtCap(FILE_MODE_RW,
+                                  user,
+                                  user_group,
+                                  inode_hash.get_root_hash());
+      ext_cap->set_type(USER_BATCH);
+      ext_cap->set_id(batch_id);
+      ext_cap->sign_extcap(mds->getPrvKey());
+      
+      // put the cap every inodes cache
+      CInode *inode_cache;
+      for (list<inodeno_t>::iterator ili = inode_list.begin();
+          ili != inode_list.end();
+          ili++) {
+       inode_cache = mds->mdcache->inode_map[(*ili)];
+       inode_cache->add_user_extcap(user, ext_cap);
+      }
+      
+      // let requests loose
+      for (set<MClientRequest *>::iterator ri = batched_requests.begin();
+          ri != batched_requests.end();
+          ri++) {
+       server->handle_client_open(*ri, mds->mdcache->inode_map[(*ri)->get_ino()]);
+      }
+      
+      batched_requests.clear();
+      
+      //turn batching off
+      batching = false;
+    }
+    
+    batch_lock.Unlock();
+    cout << "<------batcher thread finish" << endl;
+  }
+  
+};
+
+#endif