//security (all principals)
secure_io: 1, /* 0=off, 1=on */
- mds_group: 0, /* 0=none, 1=unix, 2=batch, 3=def, 4=predict */
+ mds_group: 0, /* 0=none, 1=unix, 2=batch, 3=define, 4=predict */
mds_collection: 0, /* 0=none, 1=unix, 3=def */
unix_group_file: 0, /* 0=no file, non-zero = filename ptr */
fix_client_id: 0, /* 0=off, 1=on */
data.id.cid = new_id;
data.id.mds_id = new_mds_id;
}
+ void set_id(cap_id_t capid) {
+ data.id.cid = capid.cid;
+ data.id.mds_id = capid.mds_id;
+ }
void set_type(__int8_t new_type) { data.type = new_type;}
void set_user_hash(hash_t nhash) { data.user_group = nhash; }
user_cap_set = false;
group_cap_set = false;
world_cap_set = false;
+
+ batching = false;
+ buffer_stop = false;
+ buffer_thread = BufferThread(this);
+ buffer_thread.create();
auth_pins = 0;
nested_auth_pins = 0;
CInode::~CInode() {
if (dir) { delete dir; dir = 0; }
+ buffer_lock.Lock();
+ buffer_stop = true;
+ buffer_cond.Signal();
+ buffer_lock.Unlock();
+ buffer_thread.join();
}
+void CInode::buffer_entry()
+{
+ cout << "buffer start" << endl;
+ buffer_lock.Lock();
+ while(!buffer_stop) {
+
+ // were gonna get signaled when we start buffering
+ cout << "Buffer waiting" << endl;
+ buffer_cond.Wait(buffer_lock);
+ cout << "Buffer signaled" << endl;
+
+ // the sleep releases the lock and allows the dispatch
+ // to insert requests into the buffer
+ // sleep first, then serve cap
+ cout << "buffer sleeping to buffer" << endl;
+ buffer_cond.WaitInterval(buffer_lock, utime_t(5,0));
+
+ /*
+ // now i've slept, make cap for users
+ set<uid_t> user_set;
+ for (set<MClientRequest *>::iterator si = buffered_reqs.begin();
+ si != buffered_reqs.end();
+ si++) {
+ user_set.insert((*si)->get_uid());
+ }
+ MerkleTree users_hash(user_set);
+ ExtCap *ext_cap = new ExtCap(FILE_MODE_RW,
+ inode.uid,
+ inode.gid,
+ users_hash.get_root_hash(),
+ inode.ino);
+ ext_cap->set_id(batch_id);
+
+ // put the cap in everyones cache
+ for (set<uid_t>::iterator usi = user_set.begin();
+ usi != user_set.end();
+ usi++) {
+ ext_caps[(*usi)] = (*ext_cap);
+ }
+ */
+
+ // let requests loose
+ for (set<MClientRequest *>::iterator ri = buffered_reqs.begin();
+ ri != buffered_reqs.end();
+ ri++) {
+ cout << "ABOUT TO PASS OFF THE REQUEST" << endl;
+ //open_fun_ptr(*ri, this);
+ server->handle_client_open(*ri, this);
+ }
+ }
+ buffer_lock.Unlock();
+ cout << "buffer finish" << endl;
+}
+
// pins
#include "CDentry.h"
#include "Lock.h"
#include "Capability.h"
+#include "Server.h"
+#include "messages/MClientRequest.h"
#include <cassert>
#include <list>
// waiters
multimap<int, Context*> waiting;
+
+ // batching information
+ bool batching;
+ utime_t two_req_ago;
+ utime_t one_req_ago;
+ set<MClientRequest *> buffered_reqs;
+ bool batch_id_set;
+ cap_id_t batch_id;
+
+ Mutex buffer_lock;
+ Cond buffer_cond;
+ bool buffer_stop;
+ void buffer_entry();
+ // FIXME total hack to have escape point back to server
+ Server *server;
+ //void (Server::*open_fun_ptr) (MClientRequest*, CInode *);
+ class BufferThread : public Thread {
+ CInode *inode;
+ public:
+ BufferThread () {}
+ BufferThread (CInode *in): inode(in) {}
+ void *entry() {
+ inode->buffer_entry();
+ return 0;
+ }
+ } buffer_thread;
// -- distributed state --
public:
void add_user_extcap(uid_t user, ExtCap* extcap) {
//if (ext_caps.empty())
// get(CINODE_PIN_CAPS);
- assert(ext_caps.count(user) == 0);
+ //assert(ext_caps.count(user) == 0);
ext_caps[user] = (*extcap);
}
void remove_user_extcap(uid_t user) {
// issue most generic cap (RW)
my_want |= FILE_MODE_RW;
- utime_t clock_time_start = g_clock.now();
+ //utime_t clock_time_start = g_clock.now();
utime_t test_time = g_clock.now();
- utime_t clock_time_end = g_clock.now();
- cout << "Clock time " << clock_time_end - clock_time_start << endl;
+ //utime_t clock_time_end = g_clock.now();
+ //cout << "Clock time " << clock_time_end - clock_time_start << endl;
// check cache
ExtCap *ext_cap;
- utime_t setup_time_end = g_clock.now();
- cout << "Setup time " << setup_time_end - setup_time_start << endl;
+ //utime_t setup_time_end = g_clock.now();
+ //cout << "Setup time " << setup_time_end - setup_time_start << endl;
- utime_t checkcache_time_start = g_clock.now();
+ //utime_t checkcache_time_start = g_clock.now();
// unix groups
if (g_conf.mds_group == 1) {
if (my_user == in->get_uid())
// no grouping
else
ext_cap = in->get_user_extcap(my_user);
- utime_t checkcache_time_end = g_clock.now();
- cout << "Check cache time " << checkcache_time_end - checkcache_time_start << endl;
+ //utime_t checkcache_time_end = g_clock.now();
+ //cout << "Check cache time " << checkcache_time_end - checkcache_time_start << endl;
if (!ext_cap) {
// make new cap
}
// default no grouping
else {
- utime_t make_time_start = g_clock.now();
+ //utime_t make_time_start = g_clock.now();
ext_cap = new ExtCap(my_want, my_user, in->ino());
- utime_t make_time_end = g_clock.now();
- cout << "Capability make time " << make_time_end - make_time_start << endl;
+ //utime_t make_time_end = g_clock.now();
+ //cout << "Capability make time " << make_time_end - make_time_start << endl;
}
// set capability id
- ext_cap->set_id(cap_id_count, mds->get_nodeid());
- cap_id_count++;
+ ext_cap->set_id(mds->cap_id_count, mds->get_nodeid());
+ mds->cap_id_count++;
dout(3) << "Made new " << my_want << " capability for uid: "
<< ext_cap->get_uid() << " for inode: " << ext_cap->get_ino()<< endl;
cout << "Signature time " << sign_time_end - sign_time_start << endl;
// caches this capability in the inode
- utime_t cache_time_start = g_clock.now();
+ //utime_t cache_time_start = g_clock.now();
if (g_conf.mds_group == 1) {
if (my_user == in->get_uid())
in->set_unix_user_cap(ext_cap);
else
in->add_user_extcap(my_user,ext_cap);
- utime_t cache_time_end = g_clock.now();
- cout << "Caching time " << cache_time_end - cache_time_start << endl;
+ //utime_t cache_time_end = g_clock.now();
+ //cout << "Caching time " << cache_time_end - cache_time_start << endl;
}
// we want to index based on mode, so we can cache more caps
}
}
// add capability as recently used
- utime_t recentcap_time_start = g_clock.now();
+ //utime_t recentcap_time_start = g_clock.now();
mds->recent_caps.insert(ext_cap->get_id());
- utime_t recentcap_time_end = g_clock.now();
- cout << "Recent cap cache time " << recentcap_time_end - recentcap_time_start << endl;
+ //utime_t recentcap_time_end = g_clock.now();
+ //cout << "Recent cap cache time " << recentcap_time_end - recentcap_time_start << endl;
return ext_cap;
}
MDS *mds;
MDCache *mdcache;
// count of capability id's used
- int cap_id_count;
+ //int cap_id_count;
public:
- Locker(MDS *m, MDCache *c) : mds(m), mdcache(c), cap_id_count(0) {}
+ //Locker(MDS *m, MDCache *c) : mds(m), mdcache(c), cap_id_count(0) {}
+ Locker(MDS *m, MDCache *c) : mds(m), mdcache(c) {}
void dispatch(Message *m);
myPrivKey = esignPrivKey("crypto/esig1536.dat");
myPubKey = esignPubKey(myPrivKey);
- // create unix_groups?
+ // create unix_groups from file?
if (g_conf.unix_group_file) {
ifstream from(g_conf.unix_group_file);
assert(0);
}
}
+
+ // cap identifiers
+ cap_id_count = 0;
// beacon
beacon_last_seq = 0;
// recent capabilities to renew
set<cap_id_t> recent_caps;
Renewal token;
+ // count of capability ids used
+ int cap_id_count;
void queue_waitfor_active(Context *c) { waitfor_active.push_back(c); }
case MDS_OP_OPEN:
if (req->get_iarg() & O_CREAT)
handle_client_openc(req, ref);
- else
- handle_client_open(req, ref);
+ else {
+ if (g_conf.mds_group == 2) {
+ utime_t open_req_time = g_clock.now();
+ // if this request is within 10ms of the last 2, flash crowd!
+ cout << "Buffering time check" << open_req_time - ref->two_req_ago
+ << endl;
+ if (open_req_time - ref->two_req_ago < utime_t(0, 10000)) {
+ cout << "Buffering the request" << endl;
+ ref->buffer_lock.Lock();
+ ref->buffered_reqs.insert(req);
+ if (ref->buffer_stop) {
+ ref->batch_id.cid = mds->cap_id_count;
+ ref->batch_id.mds_id = mds->get_nodeid();
+ ref->batch_id_set = true;
+ ref->buffer_stop = false;
+ }
+ ref->buffer_lock.Unlock();
+ return;
+ }
+ else {
+ cout << "Not buffering the request" << endl;
+ ref->two_req_ago = ref->one_req_ago;
+ ref->one_req_ago = open_req_time;
+ handle_client_open(req, ref);
+ }
+ }
+ else
+ handle_client_open(req, ref);
+ }
break;
case MDS_OP_TRUNCATE:
handle_client_truncate(req, ref);
} else {
// exists!
// FIXME: do i need to repin path based existant inode? hmm.
- handle_client_open(req, in);
+ if (g_conf.mds_group == 2) {
+ utime_t open_req_time = g_clock.now();
+ // if this request is within 10ms of the last 2, flash crowd!
+ cout << "Buffering time check" << open_req_time - in->two_req_ago
+ << " against " << utime_t(1, 0) << endl;
+ //if (open_req_time - in->two_req_ago < utime_t(1, 0)) {
+ if (open_req_time > utime_t()) {
+ cout << "Buffering the request" << endl;
+ in->two_req_ago = in->one_req_ago;
+ in->one_req_ago = open_req_time;
+
+ // if buffer waiting thread is off, turn it on
+ if (!in->batching) {
+ // grab lock and insert
+ in->buffer_lock.Lock();
+ in->buffered_reqs.insert(req);
+
+ // prepare capid for future capability
+ in->batch_id.cid = mds->cap_id_count;
+ in->batch_id.mds_id = mds->get_nodeid();
+ mds->cap_id_count++;
+ // turn on batching flags
+ in->batching = true;
+ in->batch_id_set = true;
+ in->buffer_stop = false;
+
+ // set function pointer to open handler
+ //in->open_fun_ptr = (void (*)(MClientRequest*, CInode*))&handle_client_open;
+ in->server = this;
+
+ //singal the thread
+ cout << "Going to singal" << endl;
+ in->buffer_cond.Signal();
+ cout << "Done signaling" << endl;
+
+ // release the lock
+ in->buffer_lock.Unlock();
+ }
+ else {
+ // grab lock and insert
+ in->buffer_lock.Lock();
+ in->buffered_reqs.insert(req);
+ in->buffer_lock.Unlock();
+ }
+ return;
+ }
+ else {
+ cout << "Not buffering the request" << endl;
+ in->two_req_ago = in->one_req_ago;
+ in->one_req_ago = open_req_time;
+ handle_client_open(req, in);
+ }
+ }
+ else
+ handle_client_open(req, in);
}
}