}
// we have the group, hand it back
else {
- MOSDUpdateReply *reply = new MOSDUpdateReply(my_hash,
- groups[my_hash].get_list(),
- groups[my_hash].get_sig());
+ MOSDUpdateReply *reply;
+ // its a file group
+ if (g_conf.mds_group == 3)
+ reply = new MOSDUpdateReply(my_hash, groups[my_hash].get_inode_list(),
+ groups[my_hash].get_sig());
+ // its a user group
+ else
+ reply = new MOSDUpdateReply(my_hash, groups[my_hash].get_list(),
+ groups[my_hash].get_sig());
messenger->send_message(reply, m->get_source_inst());
}
// default test option (only for debug)
- //reply = new MOSDUpdateReply(my_hash);
+ //MOSDUpdateReply *reply = new MOSDUpdateReply(my_hash);
//messenger->send_message(reply, m->get_source_inst());
}
hash_t my_hash = m->get_user_hash();
// cache the list
- groups[my_hash].set_list(m->get_user_list());
+ if (g_conf.mds_group == 3)
+ groups[my_hash].set_inode_list(m->get_file_list());
+ else
+ groups[my_hash].set_list(m->get_user_list());
+
groups[my_hash].set_root_hash(my_hash);
groups[my_hash].set_sig(m->get_sig());
for (set<int>::iterator oi = update_waiter_osd[my_hash].begin();
oi != update_waiter_osd[my_hash].end();
++oi) {
- MOSDUpdateReply *reply = new MOSDUpdateReply(my_hash, groups[my_hash].get_list(), m->get_sig());
+ MOSDUpdateReply *reply;
+ if (g_conf.mds_group == 3)
+ reply = new MOSDUpdateReply(my_hash, groups[my_hash].get_inode_list(),
+ m->get_sig());
+ else
+ reply = new MOSDUpdateReply(my_hash, groups[my_hash].get_list(),
+ m->get_sig());
messenger->send_message(reply, osdmap->get_inst(*oi));
}
private:
//gid_t group_id;
hash_t root_hash;
+
MerkleTree mtree;
list<uid_t> users;
+
+ MerkleTree file_tree;
+ list<inodeno_t> inodes;
+
byte signature[ESIGNSIGSIZE];
public:
mtree.add_user(user);
root_hash = mtree.get_root_hash();
}
+ CapGroup (hash_t rhash, list<inodeno_t>& inodelist) :
+ root_hash(rhash), inodes(inodelist) { }
+ CapGroup (inodeno_t ino) {
+ inodes.push_back(ino);
+ file_tree.add_inode(ino);
+ root_hash = file_tree.get_root_hash();
+ }
//CapGroup (gid_t id, list<uid_t>& ulist) : group_id(id), users(ulist) {
// add users to MerkleTree
// mtree = MerkleTree(users);
//FIXME need to re-compute hash
}
+ void add_inode(inodeno_t ino) {
+ inodes.push_back(ino);
+ // re-compute root-hash
+ file_tree.add_inode(ino);
+ root_hash = file_tree.get_root_hash();
+ }
+ void remove_user(inodeno_t ino) {
+ inodes.remove(ino);
+ //FIXME need to re-compute hash
+ }
+
bool contains(uid_t user) {
for (list<uid_t>::iterator ui = users.begin();
ui != users.end();
return false;
}
+ bool contains_inode(inodeno_t ino) {
+ for (list<inodeno_t>::iterator ii = inodes.begin();
+ ii != inodes.end();
+ ii++) {
+ if (*ii == ino)
+ return true;
+ }
+ return false;
+ }
+
void set_list(list<uid_t>& nlist) { users = nlist; }
list<uid_t>& get_list() { return users; }
+
+ void set_inode_list(list<inodeno_t>& ilist) { inodes = ilist; }
+ list<inodeno_t>& get_inode_list() { return inodes; }
};
#endif
#define NO_GROUP 0
#define UNIX_GROUP 1
#define BATCH 2
+#define USER_BATCH 3
struct cap_id_t {
int cid;
data.ino = n;
}
+ // for single file, many users
ExtCap(int m, uid_t u, gid_t g, hash_t h, inodeno_t n)
{
data.id.cid = 0;
data.ino = n;
}
+ // for file group, single user
+ ExtCap(int m, uid_t u, gid_t g, hash_t h)
+ {
+ data.id.cid = 0;
+ data.id.mds_id = 0;
+ data.t_s = g_clock.now();
+ data.t_e = data.t_s;
+ data.t_e += 3600;
+ data.mode = m;
+ data.uid = u;
+ data.gid = g;
+ data.file_group = h;
+ }
+
// capability for too many user, many named files
// capability for too many user, too many files
// zero the array
sha1((byte*)uidArray, (byte*)&root_hash, sizeof(uidArray));
}
+
+ // constructor from an initial list of files
+ MerkleTree (list< inodeno_t >& input) {
+ memset(&root_hash, 0x00, sizeof(root_hash));
+ inodeno_t inoArray[input.size()];
+ int counter = 0;
+
+ // FIXME just do a linear hash first for root hash
+ // copy list into buffer
+ for (list<inodeno_t>::iterator li = input.begin();
+ li != input.end();
+ li++) {
+ inoArray[counter] = *li;
+ counter++;
+ }
+ // zero the array
+ sha1((byte*)inoArray, (byte*)&root_hash, sizeof(inoArray));
+ }
// constructor from an initial set of users
MerkleTree (set< uid_t >& input) {
// hash em both
sha1((byte*)&conjunction, (byte*)&root_hash, sizeof(conjunction));
}
+
+ void add_inode(inodeno_t ino) {
+ // hash the user
+ hash_t ino_hash;
+ sha1((byte*)&ino, (byte*)&ino_hash, sizeof(ino));
+ // join the user and root_hash
+ hash_t conjunction[2];
+ conjunction[0] = root_hash;
+ conjunction[1] = ino_hash;
+ // hash em both
+ sha1((byte*)&conjunction, (byte*)&root_hash, sizeof(conjunction));
+ }
hash_t& get_root_hash() { return root_hash; }
};
bool did_shutdown_exports;
bool did_shutdown_log_cap;
friend class C_MDC_ShutdownCommit;
+ friend class UserBatch;
// recovery
protected:
#include "messages/MOSDMap.h"
#include "messages/MOSDGetMap.h"
+#include "UserBatch.h"
+
LogType mds_logtype, mds_cache_logtype;
class MMDSBeacon;
+class UserBatch;
+
class MDS : public Dispatcher {
public:
map<gid_t, hash_t> unix_groups_map;
// hash to group map
map<hash_t, CapGroup> unix_groups_byhash;
+
// recent capabilities to renew
set<cap_id_t> recent_caps;
Renewal token;
+
// count of capability ids used
int cap_id_count;
+ // batched file groups by user
+ map<uid_t, UserBatch*> user_batch;
+
void queue_waitfor_active(Context *c) { waitfor_active.push_back(c); }
bool is_dne() { return state == MDSMap::STATE_DNE; }
#include "Renamer.h"
#include "MDStore.h"
+#include "UserBatch.h"
+
#include "msg/Messenger.h"
#include "messages/MClientMount.h"
hash_t my_hash = m->get_user_hash();
dout(3) << "handle_client_update for " << my_hash << endl;
- MClientUpdateReply *reply = new MClientUpdateReply(my_hash, mds->unix_groups_byhash[my_hash].get_list());
- reply->set_sig(mds->unix_groups_byhash[my_hash].get_sig());
+ MClientUpdateReply *reply;
+ // its a file group
+ if (g_conf.mds_group == 3) {
+ reply = new MClientUpdateReply(my_hash, mds->unix_groups_byhash[my_hash].get_inode_list());
+ reply->set_sig(mds->unix_groups_byhash[my_hash].get_sig());
+ }
+ // its a user group
+ else {
+ reply = new MClientUpdateReply(my_hash,
+ mds->unix_groups_byhash[my_hash].get_list(),
+ mds->unix_groups_byhash[my_hash].get_sig());
+ }
messenger->send_message(reply, m->get_source_inst());
}
// make sure dir is pinnable
- // create inode
- cout << "creating inode....should i maybe be creating the thread here?" << endl;
*pin = mdcache->create_inode();
(*pin)->inode.uid = req->get_caller_uid();
(*pin)->inode.gid = req->get_caller_gid();
(*pin)->inode.ctime = (*pin)->inode.mtime = (*pin)->inode.atime = g_clock.gettime(); // now
- cout << "Done setting up inode" << endl;
+
// note: inode.version will get set by finisher's mark_dirty.
// create dentry
mds->balancer->hit_inode(newi, META_POP_IWR);
// ok, do the open.
- mds->server->handle_client_open(req, newi);
+ if (g_conf.mds_group == 3) {
+ uid_t user_id = req->get_caller_uid();
+ utime_t open_req_time = g_clock.now();
+
+ if (mds->user_batch[user_id]->should_batch(open_req_time)) {
+
+ mds->user_batch[user_id]->update_batch_time(open_req_time);
+
+ cout << "Passing inode " << newi->ino() << " to add_to_batch" << endl;
+ mds->user_batch[user_id]->add_to_batch(req, newi);
+
+ return;
+ }
+ else {
+ mds->user_batch[user_id]->update_batch_time(open_req_time);
+ mds->server->handle_client_open(req, newi);
+ }
+ }
+ else
+ mds->server->handle_client_open(req, newi);
}
};
in->inode.mode = 0644; // FIXME req should have a umask
in->inode.mode |= INODE_MODE_FILE;
+ if (g_conf.mds_group == 3) {
+ uid_t my_user = req->get_caller_uid();
+ // create and start user batching thread
+ if (! mds->user_batch[my_user]) {
+ mds->user_batch[my_user] = new UserBatch(this, mds, my_user);
+ mds->user_batch[my_user]->update_batch_time(g_clock.now());
+ }
+ }
+
// prepare finisher
C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, req, dn, in);
EUpdate *le = new EUpdate("openc");
handle_client_open(req, in);
}
}
+ else if (g_conf.mds_group == 3) {
+ uid_t user_id = req->get_caller_uid();
+ utime_t open_req_time = g_clock.now();
+
+ if (mds->user_batch[user_id]->should_batch(open_req_time)) {
+
+ mds->user_batch[user_id]->update_batch_time(open_req_time);
+
+ mds->user_batch[user_id]->add_to_batch(req, in);
+
+ return;
+ }
+ else {
+ mds->user_batch[user_id]->update_batch_time(open_req_time);
+ handle_client_open(req, in);
+ }
+
+ }
else
handle_client_open(req, in);
}
#include "msg/Message.h"
#include "osd/osd_types.h"
-#include "crypto/CryptoLib.h"
-using namespace CryptoLib;
-#include "crypto/MerkleTree.h"
+//#include "crypto/CryptoLib.h"
+//using namespace CryptoLib;
+//#include "crypto/MerkleTree.h"
class MClientUpdateReply : public Message {
private:
hash_t user_hash;
byte signature[ESIGNSIGSIZE];
list<uid_t> updated_users;
+ list<inodeno_t> updated_files;
public:
MClientUpdateReply() : Message(MSG_CLIENT_UPDATE_REPLY) { }
- MClientUpdateReply(hash_t uhash, list<uid_t> ulist) :
+ MClientUpdateReply(hash_t uhash, list<uid_t>& ulist, byte *sig) :
Message(MSG_CLIENT_UPDATE_REPLY),
- user_hash(uhash), updated_users(ulist) { }
+ user_hash(uhash), updated_users(ulist) {
+ memcpy(signature, sig, ESIGNSIGSIZE);
+ }
+ MClientUpdateReply(hash_t uhash, list<inodeno_t>& flist) :
+ Message(MSG_CLIENT_UPDATE_REPLY),
+ user_hash(uhash), updated_files(flist) { }
hash_t get_user_hash() { return user_hash; }
list<uid_t>& get_user_list() { return updated_users; }
+ list<inodeno_t>& get_file_list() { return updated_files; }
void set_sig(byte *sig) { memcpy(signature, sig, ESIGNSIGSIZE); }
byte *get_sig() { return signature; }
virtual void encode_payload() {
payload.append((char*)&user_hash, sizeof(user_hash));
payload.append((char*)signature, sizeof(signature));
+
_encode(updated_users, payload);
+ _encode(updated_files, payload);
}
virtual void decode_payload() {
int off = 0;
off += sizeof(user_hash);
payload.copy(off, sizeof(signature), (char*)signature);
off += sizeof(signature);
+
_decode(updated_users, payload, off);
+ _decode(updated_files, payload, off);
}
virtual char *get_type_name() { return "client_update_reply"; }
void print(ostream& out) {
#include "osd/osd_types.h"
#include "crypto/MerkleTree.h"
+#define USER_HASH 0
+#define FILE_HASH 1
+
class MOSDUpdate : public Message {
private:
struct {
gid_t group;
hash_t uhash;
+
entity_inst_t client;
entity_inst_t asker;
} update_st;
public:
+
gid_t get_group() { return update_st.group; }
hash_t get_hash() { return update_st.uhash; }
+
entity_inst_t get_client_inst() { return update_st.client; }
entity_inst_t get_asker() { return update_st.asker; }
class MOSDUpdateReply : public Message {
private:
- //gid_t group;
hash_t user_hash;
byte signature[ESIGNSIGSIZE];
list<uid_t> updated_users;
+ list<inodeno_t> updated_files;
public:
MOSDUpdateReply () : Message(MSG_OSD_UPDATE_REPLY) { }
- //MOSDUpdateReply(gid_t gid) : Message(MSG_OSD_UPDATE_REPLY),
- // group(gid) { }
MOSDUpdateReply(hash_t uhash) : Message(MSG_OSD_UPDATE_REPLY),
user_hash(uhash) { }
- //MOSDUpdateReply (gid_t gid, list<uid_t> users) :
- // Message(MSG_OSD_UPDATE_REPLY), group(gid), updated_users(users) { }
MOSDUpdateReply(hash_t uhash, list<uid_t>& users) :
Message(MSG_OSD_UPDATE_REPLY), user_hash(uhash), updated_users(users) { }
MOSDUpdateReply(hash_t uhash, list<uid_t>& users, byte *sig) :
Message(MSG_OSD_UPDATE_REPLY), user_hash(uhash), updated_users(users) {
memcpy(signature, sig, ESIGNSIGSIZE);
}
+ MOSDUpdateReply(hash_t fhash, list<inodeno_t>& files, byte *sig) :
+ Message(MSG_OSD_UPDATE_REPLY), user_hash(fhash), updated_files(files) {
+ memcpy(signature, sig, ESIGNSIGSIZE);
+ }
- //gid_t get_group() { return group; }
hash_t get_user_hash() { return user_hash; }
list<uid_t>& get_list() { return updated_users; }
+ list<inodeno_t>& get_file_list() { return updated_files; }
void set_sig(byte *sig) { memcpy(signature, sig, ESIGNSIGSIZE); }
byte *get_sig() { return signature; }
}
virtual void encode_payload() {
- //payload.append((char*)&group, sizeof(group));
+
payload.append((char*)signature, sizeof(signature));
payload.append((char*)&user_hash, sizeof(user_hash));
_encode(updated_users, payload);
+ _encode(updated_files, payload);
}
virtual void decode_payload() {
int off = 0;
- //payload.copy(off, sizeof(group), (char*)&group);
- //off += sizeof(group);
payload.copy(off, sizeof(signature), (char*)signature);
off += sizeof(signature);
payload.copy(off, sizeof(user_hash), (char*)&user_hash);
off += sizeof(user_hash);
_decode(updated_users, payload, off);
+ _decode(updated_files, payload, off);
}
virtual char *get_type_name() { return "oop_update_reply"; }
void print(ostream& out) {
if (!(user_groups[my_hash].contains(op->get_user()))) {
// do update to get new unix groups
dout(1) << "User " << op->get_user() << " not in group "
- << my_hash << endl;
+ << my_hash << endl;
return false;
}
return false;
}
// check object matches
- if (op->get_oid().ino != op_capability->get_ino()) {
+ if (op_capability->get_type() == USER_BATCH) {
+ hash_t my_hash = op_capability->get_file_hash();
+ if (! user_groups[my_hash].contains_inode(op->get_oid().ino)) {
+ dout(1) << "File in request " << op->get_oid().ino
+ << " not in group " << my_hash << " file in cap is "
+ << op_capability->get_ino() << endl;
+ return false;
+ }
+ }
+ else if (op->get_oid().ino != op_capability->get_ino()) {
dout(1) << "File in cap did not match request" << endl;
return false;
}
// if no one has already requested the ticket
if (update_waiter_op.count(my_hash) == 0) {
- dout(10) << "update_group requesting update for hash " << my_hash << endl;
+ dout(1) << "update_group requesting update for hash " << my_hash << endl;
// send it
messenger->send_message(update, client);
} else {
// don't request, someone else already did. just wait!
- dout(10) << "update_group waiting for update for hash " << my_hash << endl;
+ dout(1) << "update_group waiting for update for hash " << my_hash << endl;
}
// wait for reply
// store the new list into group
hash_t my_hash = m->get_user_hash();
- dout(10) << "hande_osd_update_reply for " << my_hash << endl;
+ dout(10) << "handle_osd_update_reply for " << my_hash << endl;
// verify
if (m->verify_list(monmap->get_key()))
dout(1) << "List verification failed" << endl;
// add the new list to our cache
- user_groups[my_hash].set_list(m->get_list());
+ if (g_conf.mds_group == 3) {
+ user_groups[my_hash].set_inode_list(m->get_file_list());
+
+ cout << "Received a group update for " << my_hash << endl;
+ for (list<inodeno_t>::iterator ii = m->get_file_list().begin();
+ ii != m->get_file_list().end();
+ ii++) {
+ cout << my_hash << " contains " << (*ii) << endl;
+ }
+ }
+ else
+ user_groups[my_hash].set_list(m->get_list());
// wait up the waiter(s)
take_waiters(update_waiter_op[my_hash]);
update_group(op->get_client_inst(), my_hash, op);
return;
}
+ }
+ else if (op_capability->get_type() == USER_BATCH) {
+ hash_t my_hash = op_capability->get_file_hash();
+ // do we have group cached? if not, update group
+ // we will lose execution control here! re-gain on reply
+ if (user_groups.count(my_hash) == 0) {
+ outstanding_updates[op->get_reqid()] = write_time_start;
+ update_group(op->get_client_inst(), my_hash, op);
+ return;
+ }
}
// check accesses are right