}
-void MDS::reply_request(MClientRequest *req, int r)
-{
- // send error
- messenger->send_message(new MClientReply(req, r),
+/*******
+ * some generic stuff for finishing off requests
+ */
+
+/** C_MDS_CommitRequest
+ */
+
+class C_MDS_CommitRequest : public Context {
+ MDS *mds;
+ MClientRequest *req;
+ MClientReply *reply;
+ CInode *tracei; // inode to include a trace for
+ LogEvent *event;
+public:
+ C_MDS_CommitRequest(MDS *mds,
+ MClientRequest *req, MClientReply *reply, CInode *tracei,
+ LogEvent *event = 0) {
+ this->mds = mds;
+ this->req = req;
+ this->tracei = tracei;
+ this->reply = reply;
+ this->event = event;
+ }
+ void finish(int r) {
+ if (r == 0) {
+ // success. log and reply.
+ mds->commit_request(req, reply, tracei, event);
+ } else {
+ // failure. set failure code and reply.
+ reply->set_result(r);
+ mds->reply_request(req, reply, tracei);
+ }
+ }
+};
+
+/*
+ * send generic response (just and error code)
+ */
+void MDS::reply_request(MClientRequest *req, int r, CInode *tracei)
+{
+ reply_request(req, new MClientReply(req, r), tracei);
+}
+
+/*
+ * send given reply
+ * include a trace to tracei
+ */
+void MDS::reply_request(MClientRequest *req, MClientReply *reply, CInode *tracei) {
+ // include trace
+ if (tracei)
+ reply->set_trace_dist( tracei, whoami );
+
+ // send reply
+ messenger->send_message(reply,
MSG_ADDR_CLIENT(req->get_client()), 0,
MDS_PORT_SERVER);
-
+
// discard request
mdcache->request_finish(req);
}
+/*
+ * commit event(s) to the metadata log, then reply.
+ * or, be sloppy and do it concurrently (see g_conf)
+ */
+void MDS::commit_request(MClientRequest *req,
+ MClientReply *reply,
+ CInode *tracei,
+ LogEvent *event,
+ LogEvent *event2)
+{
+ if (g_conf.mds_log_before_reply) {
+ // SAFE mode!
+
+ if (event) {
+ // log, then reply
+ // pass event2 as event1 (so we chain together!)
+ /*
+ WARNING: by chaining back to CommitRequest we may get
+ something not quite right if the log commit fails. what
+ happens (to the whole system!) then? ** FIXME **
+ */
+ mdlog->submit_entry(event,
+ new C_MDS_CommitRequest(this, req, reply, tracei, event2));
+ }
+ else {
+ // just reply, no log entry (anymore).
+ reply_request(req, reply, tracei);
+ }
+ } else {
+ // SLOPPY mode!
+
+ // log
+ if (event) mdlog->submit_entry(event, NULL);
+ if (event2) mdlog->submit_entry(event2, NULL);
+
+ // reply
+ reply_request(req, reply, tracei);
+ }
+}
+
+
+/***
+ * process a client request
+ */
void MDS::handle_client_request(MClientRequest *req)
{
dout(10) << "reply to " << *req << " stat " << ref->inode.mtime << " pop " << ref->get_popularity() << endl;
MClientReply *reply = new MClientReply(req);
- reply->set_trace_dist( ref, whoami );
- // FIXME: put inode info in reply...
+ // inode info is in the trace
mdcache->inode_soft_read_finish(ref);
stat_req.hit();
stat_ops++;
- messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0,
- MDS_PORT_SERVER);
-
- mdcache->request_finish(req);
-}
-
-
-
-// INODE UPDATES
-
-// SOFT
-
-class C_MDS_InodeSoftUpdateFinish : public Context {
-public:
- CInode *in;
- MClientRequest *req;
- MDS *mds;
- MClientReply *reply;
- C_MDS_InodeSoftUpdateFinish(MDS *mds, MClientRequest *req, CInode *cur, MClientReply *reply) {
- this->mds = mds;
- this->in = cur;
- this->req = req;
- this->reply = reply;
- }
- virtual void finish(int result) {
- mds->handle_client_inode_soft_update_2(req, reply, in);
- }
-};
-
-void MDS::handle_client_inode_soft_update_2(MClientRequest *req,
- MClientReply *reply,
- CInode *cur)
-{
// reply
- dout(10) << "reply to " << *req << " inode soft update " << *cur << endl;
-
- messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0,
- MDS_PORT_SERVER);
-
- logger->inc("otouch");
- stat_write.hit();
- stat_req.hit();
- stat_ops++;
-
- mdcache->inode_soft_write_finish(cur);
-
- mdcache->request_finish(req);
+ reply_request(req, reply, ref);
}
+// INODE UPDATES
// utime
cur->inode.atime = mtime;
if (cur->is_auth())
cur->mark_dirty();
+
+ mdcache->inode_soft_write_finish(cur);
// init reply
- MClientReply *reply = new MClientReply(req);
- reply->set_trace_dist( cur, whoami );
+ MClientReply *reply = new MClientReply(req, 0);
reply->set_result(0);
- // wait for log to finish
- dout(10) << "log for " << *req << " utime " << cur->inode.mtime << endl;
- mdlog->submit_entry(new EInodeUpdate(cur),
- new C_MDS_InodeSoftUpdateFinish(this, req, cur, reply));
- return;
+ // commit
+ commit_request(req, reply, cur,
+ new EInodeUpdate(cur));
}
// HARD
-class C_MDS_InodeHardUpdateFinish : public Context {
-public:
- CInode *in;
- MClientRequest *req;
- MDS *mds;
- MClientReply *reply;
- C_MDS_InodeHardUpdateFinish(MDS *mds, MClientRequest *req, CInode *cur, MClientReply *reply) {
- this->mds = mds;
- this->in = cur;
- this->req = req;
- this->reply = reply;
- }
- virtual void finish(int result) {
- mds->handle_client_inode_hard_update_2(req, reply, in);
- }
-};
-
-void MDS::handle_client_inode_hard_update_2(MClientRequest *req,
- MClientReply *reply,
- CInode *cur)
-{
- // reply
- dout(10) << "reply to " << *req << " inode hard update " << *cur << endl;
-
- messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0,
- MDS_PORT_SERVER);
-
- logger->inc("otouch");
- stat_write.hit();
- stat_req.hit();
- stat_ops++;
-
- // done
- mdcache->inode_hard_write_finish(cur);
-
- mdcache->request_finish(req);
-}
-
-
// chmod
void MDS::handle_client_chmod(MClientRequest *req,
cur->inode.mode |= (mode & 04777);
cur->mark_dirty();
+ mdcache->inode_hard_write_finish(cur);
+
// start reply
- MClientReply *reply = new MClientReply(req);
- reply->set_trace_dist( cur, whoami );
- reply->set_result(0);
+ MClientReply *reply = new MClientReply(req, 0);
- // wait for log to finish
- dout(10) << "log for " << *req << " chmod" << endl;
- mdlog->submit_entry(new EInodeUpdate(cur),
- new C_MDS_InodeHardUpdateFinish(this, req, cur, reply));
- return;
+ // commit
+ commit_request(req, reply, cur,
+ new EInodeUpdate(cur));
}
// chown
cur->inode.gid = gid;
cur->mark_dirty();
+ mdcache->inode_hard_write_finish(cur);
+
// start reply
- MClientReply *reply = new MClientReply(req);
- reply->set_trace_dist( cur, whoami );
- reply->set_result(0);
+ MClientReply *reply = new MClientReply(req, 0);
- // wait for log to finish
- dout(10) << "log for " << *req << " chown" << endl;
- mdlog->submit_entry(new EInodeUpdate(cur),
- new C_MDS_InodeHardUpdateFinish(this, req, cur, reply));
- return;
+ // commit
+ commit_request(req, reply, cur,
+ new EInodeUpdate(cur));
}
return;
mdcache->inode_hard_read_finish(cur);
+
+ if (!cur->dir->is_complete()) {
+ // fetch
+ dout(10) << " incomplete dir contents for readdir on " << *cur->dir << ", fetching" << endl;
+ mdstore->fetch_dir(cur->dir, new C_MDS_RetryRequest(this, req, cur));
+ return;
+ }
- if (cur->dir->is_complete()) {
- // yay, reply
- MClientReply *reply = new MClientReply(req);
-
- // FIXME: need to sync all inodes in this dir. blech!
+ // yay, reply
+ MClientReply *reply = new MClientReply(req);
+
+ // build dir contents
+ CDir_map_t::iterator it;
+ int numfiles = 0;
+ for (it = cur->dir->begin(); it != cur->dir->end(); it++) {
+ CDentry *dn = it->second;
- // build dir contents
- CDir_map_t::iterator it;
- int numfiles = 0;
- for (it = cur->dir->begin(); it != cur->dir->end(); it++) {
- //string name = it->first;
- CDentry *dn = it->second;
- CInode *in = dn->inode;
- if (!in) continue; // null
- c_inode_info *i = new c_inode_info;
- i->inode = in->inode;
- in->get_dist_spec(i->dist, whoami);
- i->ref_dn = it->first;
- reply->add_dir_item(i);
- numfiles++;
+ // is dentry readable?
+ if (dn->is_xlocked()) {
+ // ***** FIXME *****
+ dout(10) << "warning, returning xlocked dentry, we are technically WRONG" << endl;
}
- dout(10) << "reply to " << *req << " readdir " << numfiles << " files" << endl;
- reply->set_trace_dist( cur, whoami );
- reply->set_result(0);
-
- logger->inc("ordir");
- stat_read.hit();
- stat_req.hit();
- stat_ops++;
-
- //mdcache->path_unpin(trace, req);
+ CInode *in = dn->inode;
+ if (!in) continue; // null dentry?
- messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0,
- MDS_PORT_SERVER);
- mdcache->request_finish(req);
- } else {
- // fetch
- dout(10) << " incomplete dir contents for readdir on " << *cur->dir << ", fetching" << endl;
- mdstore->fetch_dir(cur->dir, new C_MDS_RetryRequest(this, req, cur));
+ // add this item
+ // note: c_inode_info makes note of whether inode data is readable.
+ c_inode_info *i = new c_inode_info(in, whoami, it->first);
+ reply->add_dir_item(i);
+ numfiles++;
}
+
+ dout(10) << "reply to " << *req << " readdir " << numfiles << " files" << endl;
+ reply->set_result(0);
+
+ logger->inc("ordir");
+ stat_read.hit();
+ stat_req.hit();
+ stat_ops++;
+
+ // reply
+ reply_request(req, reply, cur);
}
CInode *newi = mknod(req, ref);
if (!newi) return;
- // log it
- dout(10) << "log for " << *req << " mknod " << newi->ino() << endl;
- mdlog->submit_entry(new EInodeUpdate(newi), // FIXME should be differnet log entry
- NULL);
-
- // reply
- reply_request(req, 0);
- return;
+ // commit
+ commit_request(req, new MClientReply(req, 0), ref,
+ new EInodeUpdate(newi)); // FIXME this is the wrong message
}
// mknod(): used by handle_client_mkdir, handle_client_mknod, which are mostly identical.
// UNLINK
-class C_MDS_Unlink : public Context {
-public:
- MDS *mds;
- CDentry *dn;
- MClientRequest *req;
- C_MDS_Unlink(MDS *mds, CDentry *dn, MClientRequest *req) {
- this->mds = mds;
- this->dn = dn;
- this->req = req;
- }
- virtual void finish(int r) {
- // reply
- MClientReply *reply = new MClientReply(req);
- mds->messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0, MDS_PORT_SERVER);
-
- // done.
- mds->mdcache->request_finish(req);
- }
-};
-
void MDS::handle_client_unlink(MClientRequest *req,
CInode *diri)
{
return;
// it's locked, unlink!
+ MClientReply *reply = new MClientReply(req,0);
mdcache->dentry_unlink(dn,
- new C_MDS_Unlink(this,dn,req));
+ new C_MDS_CommitRequest(this, req, reply, diri,
+ new EInodeUpdate(diri))); // FIXME WRONG EVENT
return;
}
-// RENAME
+
+
+// RENAME
+
class C_MDS_RenameTraverseDst : public Context {
MDS *mds;
MClientRequest *req;
dosrc = !dosrc;
}
- // we're golden (everything is xlocked by us, we rule, etc.)
+ // we're golden.
+ // everything is xlocked by us, we rule, etc.
+ MClientReply *reply = new MClientReply(req, 0);
mdcache->file_rename( srcdn, destdn,
- new C_MDS_RenameFinish(this, req, srcdn->inode),
+ new C_MDS_CommitRequest(this, req, reply, srcdn->inode,
+ new EInodeUpdate(srcdn->inode)), // FIXME WRONG EVENT
everybody );
}
CInode *newi = mknod(req, diri);
if (!newi) return;
+ // set the dir mode
+ newi->inode.mode = req->get_iarg();
+
// make my new inode a dir.
newi->inode.mode |= INODE_MODE_DIR;
newdir->mark_complete();
newdir->mark_dirty();
- // log it
- dout(10) << "log for " << *req << " mkdir " << newi->ino() << endl;
- mdlog->submit_entry(new EInodeUpdate(newi), // FIXME should be differnet log entry
- NULL);
+ // commit
+ commit_request(req, new MClientReply(req, 0), diri,
+ new EInodeUpdate(newi)); // FIXME should be differnet log entry
// schedule a commit for good measure
// NOTE: not strictly necessary.. it's in the log!
- // but, if test crashes we'll be less likely to corrupt osddata/* (in leiu of a real recovery mechanism)
+ // but, if fakemds crashes we'll be less likely to corrupt osddata/* (in leiu of a real recovery mechanism)
mdstore->commit_dir(newdir, NULL);
-
- // reply
- reply_request(req, 0);
return;
}
// set target
newi->symlink = req->get_sarg();
- // log it
- dout(10) << "log for " << *req << " symlink " << newi->ino() << endl;
- mdlog->submit_entry(new EInodeUpdate(newi), // FIXME should be differnet log entry
- NULL);
-
- // reply
- reply_request(req, 0);
- return;
+ // commit
+ commit_request(req, new MClientReply(req, 0), diri,
+ new EInodeUpdate(newi)); // FIXME should be differnet log entry
}
// reply
MClientReply *reply = new MClientReply(req, f->fh); // fh # is return code
- reply->set_trace_dist( cur, whoami );
-
- messenger->send_message(reply,
- MSG_ADDR_CLIENT(req->get_client()), 0,
- MDS_PORT_SERVER);
-
- // discard request
- mdcache->request_finish(req);
+ reply_request(req, reply, cur);
}
// update size, mtime
// XXX
- /*
+ /* FIXME ****
// mark dirty
- cur->mark_dirty();
-
- // log it
- dout(10) << "log for " << *req << " touch " << cur->inode.mtime << endl;
- mdlog->submit_entry(new EInodeUpdate(cur),
- new C_MDS_TouchFinish(this, req, cur, reply));
+ cur->mark_dirty();
*/
// close it.
idalloc->reclaim_id(ID_FH, f->fh);
// ok we're done
- if(f->mode != CFILE_MODE_R) {
+ if (f->mode != CFILE_MODE_R) {
if (!cur->is_auth() &&
!cur->is_open_write()) {
// we were a replica writer!
// XXX what about atime?
-
- // reply
- MClientReply *reply = new MClientReply(req);
- reply->set_trace_dist( cur, whoami );
- //reply->set_iarg( req->get_iarg() );
- messenger->send_message(reply,
- req->get_source(), 0, MDS_PORT_SERVER);
-
- // done
- mdcache->request_finish(req);
+ // commit
+ commit_request(req, new MClientReply(req, 0), cur,
+ new EInodeUpdate(cur)); // FIXME wrong message?
}
*
*/
-typedef struct {
+class c_inode_info {
+ public:
inode_t inode;
- set<int> dist;
string ref_dn; // referring dentry (blank if root)
string symlink; // symlink content (if symlink)
- bool is_sync;
-} c_inode_info;
+
+ bool inode_soft_valid; // true if inode info is valid (ie was readable on mds at the time)
+ bool inode_hard_valid; // true if inode info is valid (ie was readable on mds at the time)
+
+ set<int> dist; // where am i replicated?
+
+ public:
+ c_inode_info() {}
+ c_inode_info(CInode *in, int whoami, string ref_dn) {
+ // inode
+ this->inode = in->inode;
+ this->inode_soft_valid = in->softlock.can_read(in->is_auth());
+ this->inode_hard_valid = in->hardlock.can_read(in->is_auth());
+
+ // symlink content?
+ if (in->is_symlink()) this->symlink = in->symlink;
+
+ // referring dentry?
+ this->ref_dn = ref_dn;
+
+ // replicated where?
+ in->get_dist_spec(this->dist, whoami);
+ }
+
+ void _rope(crope &s) {
+ s.append((char*)&inode, sizeof(inode));
+ s.append((char*)&inode_soft_valid, sizeof(inode_soft_valid));
+ s.append((char*)&inode_hard_valid, sizeof(inode_hard_valid));
+
+ s.append(ref_dn.c_str());
+ s.append((char)0);
+ s.append(symlink.c_str());
+ s.append((char)0);
+
+ // distn
+ int n = dist.size();
+ s.append((char*)&n, sizeof(int));
+ for (set<int>::iterator it = dist.begin();
+ it != dist.end();
+ it++) {
+ int j = *it;
+ s.append((char*)&j,sizeof(int));
+ }
+ }
+
+ void _unrope(crope &s, int& off) {
+ s.copy(off, sizeof(inode), (char*)&inode);
+ off += sizeof(inode);
+ s.copy(off, sizeof(inode_soft_valid), (char*)&inode_soft_valid);
+ off += sizeof(inode_soft_valid);
+ s.copy(off, sizeof(inode_hard_valid), (char*)&inode_hard_valid);
+ off += sizeof(inode_hard_valid);
+
+ ref_dn = s.c_str() + off;
+ off += ref_dn.length() + 1;
+
+ symlink = s.c_str() + off;
+ off += symlink.length() + 1;
+
+ int l;
+ s.copy(off, sizeof(int), (char*)&l);
+ off += sizeof(int);
+ for (int i=0; i<l; i++) {
+ int j;
+ s.copy(off, sizeof(int), (char*)&j);
+ off += sizeof(int);
+ dist.insert(j);
+ }
+ }
+} ;
+
typedef struct {
long pcid;
}
virtual char *get_type_name() { return "creply"; }
-
- crope rope_info(c_inode_info *ci) {
- crope s;
- s.append((char*)&ci->inode, sizeof(inode_t));
- s.append((char*)&ci->is_sync, sizeof(bool));
-
- int n = ci->dist.size();
- s.append((char*)&n, sizeof(int));
- for (set<int>::iterator it = ci->dist.begin();
- it != ci->dist.end();
- it++) {
- int j = *it;
- s.append((char*)&j,sizeof(int));
- }
-
- s.append(ci->ref_dn.c_str());
- s.append((char)0);
- s.append(ci->symlink.c_str());
- s.append((char)0);
- return s;
- }
- int unrope_info(c_inode_info *ci, crope s) {
- s.copy(0, sizeof(inode_t), (char*)(&ci->inode));
- int off = sizeof(inode_t);
- s.copy(off, sizeof(bool), (char*)(&ci->is_sync));
- off += sizeof(bool);
-
- int l;
- s.copy(off, sizeof(int), (char*)&l);
- off += sizeof(int);
- for (int i=0; i<l; i++) {
- int j;
- s.copy(off, sizeof(int), (char*)&j);
- off += sizeof(int);
- ci->dist.insert(j);
- }
-
- ci->ref_dn = s.c_str() + off;
- off += ci->ref_dn.length() + 1;
-
- ci->symlink = s.c_str() + off;
- off += ci->symlink.length() + 1;
-
- return off;
- }
// serialization
virtual void decode_payload(crope& s) {
- crope::iterator sp = s.mutable_begin();
- s.copy(0, sizeof(st), (char*)&st);
- path = s.c_str() + sizeof(st);
- sp += sizeof(st) + path.length() + 1;
+ int off = 0;
+ s.copy(off, sizeof(st), (char*)&st);
+ off += sizeof(st);
+
+ path = s.c_str();
+ off += path.length() + 1;
for (int i=0; i<st.trace_depth; i++) {
c_inode_info *ci = new c_inode_info;
- sp += unrope_info(ci, s.substr(sp, s.end()));
+ ci->_unrope(s, off);
trace.push_back(ci);
}
if (st.dir_size) {
for (int i=0; i<st.dir_size; i++) {
c_inode_info *ci = new c_inode_info;
- sp += unrope_info(ci, s.substr(sp, s.end()));
+ ci->_unrope(s, off);
dir_contents.push_back(ci);
}
}
st.trace_depth = trace.size();
r.append((char*)&st, sizeof(st));
- if (path.length()) r.append(path.c_str());
+ r.append(path.c_str());
r.append((char)0);
vector<c_inode_info*>::iterator it;
for (it = trace.begin(); it != trace.end(); it++)
- r.append(rope_info(*it));
+ (*it)->_rope(r);
for (it = dir_contents.begin(); it != dir_contents.end(); it++)
- r.append(rope_info(*it));
+ (*it)->_rope(r);
}
// builders
void set_trace_dist(CInode *in, int whoami) {
while (in) {
- c_inode_info *info = new c_inode_info;
- info->inode = in->inode;
-
- // symlink content?
- if (in->is_symlink()) info->symlink = in->symlink;
-
- // referring dentry?
+ // add this inode to trace, along with referring dentry name
+ string ref_dn;
CDentry *dn = in->get_parent_dn();
- if (dn) info->ref_dn = dn->get_name();
-
- //info->is_sync = in->is_sync() || in->is_presync();
+ if (dn) ref_dn = dn->get_name();
- // replicated where?
- in->get_dist_spec(info->dist, whoami);
+ c_inode_info *info = new c_inode_info(in, whoami, ref_dn);
- // next!
trace.insert(trace.begin(), info);
in = in->get_parent_inode();
}