#include "messages/MLock.h"
#include "messages/MDentryUnlink.h"
+
#include "messages/MRenameNotify.h"
+#include "messages/MRename.h"
+#include "messages/MRenameAck.h"
+#include "messages/MRenameReq.h"
#include "messages/MClientRequest.h"
{
mds = m;
root = NULL;
- lru = new LRU();
- lru->lru_set_max(g_conf.mdcache_size);
- lru->lru_set_midpoint(g_conf.mdcache_mid);
+ lru.lru_set_max(g_conf.mdcache_size);
+ lru.lru_set_midpoint(g_conf.mdcache_mid);
}
MDCache::~MDCache()
{
- if (lru) { delete lru; lru = NULL; }
}
bool MDCache::shutdown()
{
- if (lru->lru_get_size() > 0) {
+ if (lru.lru_get_size() > 0) {
dout(7) << "WARNING: mdcache shutodwn with non-empty cache" << endl;
show_cache();
show_imports();
void MDCache::add_inode(CInode *in)
{
// add to lru, inode map
- assert(inode_map.size() == lru->lru_get_size());
- lru->lru_insert_mid(in);
+ assert(inode_map.size() == lru.lru_get_size());
+ lru.lru_insert_mid(in);
assert(inode_map.count(in->ino()) == 0); // should be no dup inos!
inode_map[ in->ino() ] = in;
- assert(inode_map.size() == lru->lru_get_size());
+ assert(inode_map.size() == lru.lru_get_size());
}
void MDCache::remove_inode(CInode *o)
dn->dir->unlink_inode(dn); // leave dentry
}
inode_map.erase(o->ino()); // remove from map
- lru->lru_remove(o); // remove from lru
+ lru.lru_remove(o); // remove from lru
}
bool MDCache::trim(__int32_t max) {
if (max < 0) {
- max = lru->lru_get_max();
+ max = lru.lru_get_max();
if (!max) return false;
}
map<int, MCacheExpire*> expiremap;
- while (lru->lru_get_size() > max) {
- CInode *in = (CInode*)lru->lru_expire();
+ while (lru.lru_get_size() > max) {
+ CInode *in = (CInode*)lru.lru_expire();
if (!in) break; //return false;
if (in->dir) {
dout(7) << "log is empty. flushing cache" << endl;
trim(0);
- dout(7) << "cache size now " << lru->lru_get_size() << endl;
+ dout(7) << "cache size now " << lru.lru_get_size() << endl;
// send all imports back to 0.
if (mds->get_nodeid() != 0) {
}
// shut down root?
- if (lru->lru_get_size() == 1) {
+ if (lru.lru_get_size() == 1) {
if (root &&
root->dir &&
root->dir->is_import() &&
}
// sanity
- assert(inode_map.size() == lru->lru_get_size());
+ assert(inode_map.size() == lru.lru_get_size());
// done?
- if (lru->lru_get_size() == 0) {
+ if (lru.lru_get_size() == 0) {
if (mds->get_nodeid() != 0) {
dout(7) << "done, sending shutdown_finish" << endl;
mds->messenger->send_message(new MGenericMessage(MSG_MDS_SHUTDOWNFINISH),
}
return true;
} else {
- dout(7) << "there's still stuff in the cache: " << lru->lru_get_size() << endl;
+ dout(7) << "there's still stuff in the cache: " << lru.lru_get_size() << endl;
show_cache();
dump();
}
case MSG_MDS_RENAMENOTIFY:
handle_rename_notify((MRenameNotify*)m);
break;
+ case MSG_MDS_RENAME:
+ handle_rename((MRename*)m);
+ break;
+ case MSG_MDS_RENAMEREQ:
+ handle_rename_req((MRenameReq*)m);
+ break;
+ case MSG_MDS_RENAMEACK:
+ handle_rename_ack((MRenameAck*)m);
+ break;
// import
// dentry
CDentry *dn = cur->dir->lookup(path[depth]);
+
+ // xlocked by me?
+ if (dn && !dn->inode && dn->xlockedby == req) { // hack?
+ trace.push_back(dn);
+ break; // done!
+ }
+
if (dn && dn->inode) {
// have it. locked?
if (!noperm && dn->is_xlockedbyother(req)) {
// directory isn't complete; reload
dout(7) << "traverse: incomplete dir contents for " << *cur << ", fetching" << endl;
- lru->lru_touch(cur); // touch readdiree
+ lru.lru_touch(cur); // touch readdiree
mds->mdstore->fetch_dir(cur->dir, ondelay);
mds->logger->inc("cmiss");
} else {
dout(7) << "traverse: discover on " << *cur << " for " << want.get_path() << " to mds" << dauth << endl;
- lru->lru_touch(cur); // touch discoveree
+ lru.lru_touch(cur); // touch discoveree
mds->messenger->send_message(new MDiscover(mds->get_nodeid(),
cur->ino(),
CDir *srcdir = srcdn->dir;
string srcname = srcdn->name;
+ bool srcauth = srcdir->dentry_authority(srcdn->name) == mds->get_nodeid();
CDir *destdir = destdn->dir;
string destname = destdn->name;
+ bool destauth = destdir->dentry_authority(destdn->name) == mds->get_nodeid();
CInode *in = srcdn->inode;
Message *req = srcdn->xlockedby;
+ // foreign rename?
+ if (srcauth && !destauth) {
+ dout(7) << "file_rename src auth, not dest auth. sending MRename" << endl;
+
+ file_rename_foreign_src(srcdn, destdn,
+ mds->get_nodeid()); // i'm the initiator
+
+ // set waiter on the inode (is this the best place?)
+ in->add_waiter(CINODE_WAIT_RENAME, c);
+ return;
+ }
+ else if (!srcauth) {
+ if (destauth) {
+ dout(7) << "file_rename dest auth, not src auth. sending MRenameReq" << endl;
+ } else {
+ dout(7) << "file_rename neither src auth nor dest auth. sending MRenameReq" << endl;
+ }
+
+ MRenameReq *m = new MRenameReq(mds->get_nodeid(), // i'm the initiator
+ srcdir->ino(), srcname, destdir->ino(), destname);
+ int srcauth = srcdir->dentry_authority(srcdn->name);
+ mds->messenger->send_message(m,
+ MSG_ADDR_MDS(srcauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+
+ // set waiter on the inode (is this the best place?)
+ in->add_waiter(CINODE_WAIT_RENAME, c);
+ return;
+ }
+
+ assert(srcauth && destauth);
+ dout(7) << "file_rename src and dest auth, renaming locally (easy!)" << endl;
+
// update our cache
rename_file(srcdn, destdn);
-
+
// mark dentries dirty
srcdn->mark_dirty();
destdn->mark_dirty();
// update imports/exports?
if (in->is_dir() && in->dir)
fix_renamed_dir(srcdir, in, destdir, false); // auth didnt change
+
// tell replicas (no need to wait for ack) to do the same, and un-xlock.
// make list
destdir->take_waiting(CDIR_WAIT_DNREAD, destname, mds->finished_queue);
}
-void MDCache::handle_rename_notify(MRenameNotify*m)
+void MDCache::file_rename_foreign_src(CDentry *srcdn, CDentry *destdn, int initiator)
+{
+ dout(7) << "file_rename_foreign_src " << *srcdn << " to " << *destdn << endl;
+
+ CDir *srcdir = srcdn->dir;
+ CDir *destdir = destdn->dir;
+ string srcname = srcdn->name;
+
+ // (we're basically exporting this inode)
+ CInode *in = srcdn->inode;
+ assert(in);
+
+ // encode and export inode state
+ crope inode_state;
+ encode_export_inode(in, inode_state);
+
+ MRename *m = new MRename(initiator,
+ srcdir->ino(), srcdn->name, destdir->ino(), destdn->name,
+ inode_state);
+ int destauth = destdir->dentry_authority(destdn->name);
+ mds->messenger->send_message(m,
+ MSG_ADDR_MDS(destauth), MDS_PORT_CACHE, MDS_PORT_CACHE);
+
+ // update our cache
+ rename_file(srcdn, destdn);
+ srcdn->mark_dirty();
+
+ // update imports/exports?
+ if (in->is_dir() && in->dir)
+ fix_renamed_dir(srcdir, in, destdir, false); // auth didnt change
+
+ // drop xlock on src
+ dentry_xlock_finish(srcdn);
+ srcdir->take_waiting(CDIR_WAIT_ANY, srcname, mds->finished_queue);
+}
+
+void MDCache::handle_rename_req(MRenameReq *m)
+{
+ CInode *srcdiri = get_inode(m->get_srcdirino());
+ CDir *srcdir = srcdiri->dir;
+ CDentry *srcdn = srcdir->lookup(m->get_srcname());
+ assert(srcdn);
+
+ CInode *destdiri = get_inode(m->get_destdirino());
+ CDir *destdir = destdiri->dir;
+ CDentry *destdn = destdir->lookup(m->get_destname());
+ assert(destdn);
+
+ file_rename_foreign_src(srcdn, destdn, m->get_initiator());
+ delete m;
+}
+
+void MDCache::handle_rename(MRename *m)
+{
+ CInode *srcdiri = get_inode(m->get_srcdirino());
+ CDir *srcdir = srcdiri->dir;
+ CDentry *srcdn = srcdir->lookup(m->get_srcname());
+ assert(srcdn);
+
+ CInode *destdiri = get_inode(m->get_destdirino());
+ CDir *destdir = destdiri->dir;
+ CDentry *destdn = destdir->lookup(m->get_destname());
+ assert(destdn);
+ string destname = destdn->name;
+
+ dout(7) << "handle_rename " << *srcdn << " to " << *destdn << endl;
+
+ // decode + import inode (into _old_ location to start)
+ int off = 0;
+ decode_import_inode(srcdn, m->get_inode_state(), off, m->get_source());
+
+ CInode *in = srcdn->inode;
+ assert(in);
+
+ // rename it
+ rename_file(srcdn, destdn);
+ destdn->mark_dirty();
+
+ // update imports/exports?
+ if (in->is_dir() && in->dir)
+ fix_renamed_dir(srcdir, in, destdir, false); // auth didnt change
+
+ // send notifies
+ // including src. but not me, or initiator.
+ set<int> who;
+ for (int i=0; i<mds->get_cluster()->get_num_mds(); i++)
+ if (i != mds->get_nodeid() &&
+ i != m->get_initiator()) who.insert(i);
+
+ for (set<int>::iterator it = who.begin();
+ it != who.end();
+ it++) {
+ mds->messenger->send_message(new MRenameNotify(srcdir->ino(),
+ srcdn->name,
+ destdir->ino(),
+ destdn->name),
+ MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE);
+ }
+
+ // ok, tell the initiator
+ if (m->get_initiator() == mds->get_nodeid()) {
+ // it's me!
+ in->take_waiting(CINODE_WAIT_RENAME, mds->finished_queue);
+ } else {
+ MRenameAck *ack = new MRenameAck(srcdir->ino(), srcdn->name, destdir->ino(), destdn->name);
+ mds->messenger->send_message(ack,
+ MSG_ADDR_MDS(m->get_initiator()), MDS_PORT_CACHE, MDS_PORT_CACHE);
+ }
+
+
+ // drop xlock on dst
+ dentry_xlock_finish(destdn);
+ destdir->take_waiting(CDIR_WAIT_DNREAD, destname, mds->finished_queue);
+
+ delete m;
+}
+
+
+void MDCache::handle_rename_ack(MRenameAck *m)
+{
+ CInode *destdiri = get_inode(m->get_destdirino());
+ CDir *destdir = destdiri->dir;
+ CDentry *destdn = destdir->lookup(m->get_destname());
+ assert(destdn);
+ CInode *in = destdn->inode;
+
+ dout(7) << "handle_rename_ack on " << *in << endl;
+
+ // all done!
+ in->take_waiting(CINODE_WAIT_RENAME, mds->finished_queue);
+ delete m;
+}
+
+
+void MDCache::handle_rename_notify(MRenameNotify *m)
{
dout(7) << "handle_rename_notify dir " << m->get_srcdirino() << " dn " << m->get_srcname() << " to dir " << m->get_destdirino() << " dname " << m->get_destname() << endl;
}
void finish(int r) {
+ cout << "xlockrequest->finish r = " << r << endl;
if (r == 0) {
CDentry *dn = dir->lookup(dname);
if (dn && dn->xlockedby == 0) {
+ cout << "xlock request success, now xlocked by " << req << endl;
// success
dn->xlockedby = req; // our request was the winner
// remember!
mdc->active_requests[req].foreign_xlocks.insert(dn);
- return;
}
}
// retry request (or whatever)
+ cout << "doing finish on " << finisher << endl;
finisher->finish(0);
+ delete finisher;
}
};
dout(10) << "dentry_xlock_request on dn " << dname << " create=" << create << " in " << *dir << endl;
// send request
int dauth = dir->dentry_authority(dname);
- MLock *m = new MLock(create ? LOCK_AC_REQXLOCKC:LOCK_AC_REQXLOCK, dauth);
+ MLock *m = new MLock(create ? LOCK_AC_REQXLOCKC:LOCK_AC_REQXLOCK, mds->get_nodeid());
m->set_dn(dir->ino(), dname);
mds->messenger->send_message(m,
MSG_ADDR_MDS(dauth), MDS_PORT_CACHE,
// normally we have it always
if (diri && dir) {
int dauth = dir->dentry_authority(dname);
- assert(dauth == mds->get_nodeid() ||
- dir->is_proxy());
+ assert(dauth == mds->get_nodeid() || dir->is_proxy() || // mine or proxy,
+ m->get_action() == LOCK_AC_REQXLOCKACK || // or we did a REQXLOCK and this is our ack/nak
+ m->get_action() == LOCK_AC_REQXLOCKNAK);
if (dir->is_proxy()) {
}
+/** encode_export_inode
+ * update our local state for this inode to export.
+ * encode relevant state to be sent over the wire.
+ * used by: export_dir_walk, file_rename (if foreign)
+ */
+void MDCache::encode_export_inode(CInode *in, crope& state_rope)
+{
+ in->version++; // so local log entries are ignored, etc.
+
+ // relax locks
+ if (!in->is_cached_by_anyone())
+ in->replicate_relax_locks();
+
+ // replica_writers?
+ if (in->is_open_write())
+ in->add_replica_writer(mds->get_nodeid()); // i am now a replica writer!
+
+ // add inode
+ CInodeExport istate( in );
+ state_rope.append( istate._rope() );
+
+ // we're export this inode; fix inode state
+ dout(7) << "encode_export_inode " << *in << endl;
+
+ if (in->is_dirty()) in->mark_clean();
+
+ // clear/unpin cached_by (we're no longer the authority)
+ in->cached_by_clear();
+
+ // don't need to know this anymore
+ in->clear_replica_writers();
+
+ // twiddle lock states
+ in->softlock.twiddle_export();
+ in->hardlock.twiddle_export();
+
+ // mark auth
+ assert(in->is_auth());
+ in->set_auth(false);
+ in->replica_nonce = CINODE_EXPORT_NONCE;
+
+ // *** other state too?
+}
+
void MDCache::export_dir_walk(MExportDir *req,
C_MDS_ExportFinish *fin,
// -- inode
dir_rope.append((char)'I'); // inode dentry
- in->version++; // so log entries are ignored, etc.
+ encode_export_inode(in, dir_rope); // encode, and (update state for) export
- // relax locks
- if (!in->is_cached_by_anyone())
- in->replicate_relax_locks();
-
- // replica_writers?
- if (in->is_open_write())
- in->add_replica_writer(mds->get_nodeid()); // i am now a replica writer!
-
- // add inode
- CInodeExport istate( in );
- dir_rope.append( istate._rope() );
-
- if (in->is_dir()) {
-
- // recurse?
- if (in->dir) {
- if (in->dir->is_auth()) {
- // nested subdir
- assert(in->dir->dir_auth == CDIR_AUTH_PARENT);
- subdirs.push_back(in->dir); // it's ours, recurse (later)
-
- } else {
- // nested export
- assert(in->dir->dir_auth >= 0);
- dout(7) << " encountered nested export " << *in->dir << " dir_auth " << in->dir->dir_auth << "; removing from exports" << endl;
- assert(exports.count(in->dir) == 1);
- exports.erase(in->dir); // discard nested export (nested_exports updated above)
-
- in->dir->state_clear(CDIR_STATE_EXPORT);
- in->dir->put(CDIR_PIN_EXPORT);
-
- // simplify dir_auth?
- if (in->dir->dir_auth == newauth)
- in->dir->dir_auth = CDIR_AUTH_PARENT;
- }
+ // directory?
+ if (in->is_dir() && in->dir) {
+ if (in->dir->is_auth()) {
+ // nested subdir
+ assert(in->dir->dir_auth == CDIR_AUTH_PARENT);
+ subdirs.push_back(in->dir); // it's ours, recurse (later)
+
+ } else {
+ // nested export
+ assert(in->dir->dir_auth >= 0);
+ dout(7) << " encountered nested export " << *in->dir << " dir_auth " << in->dir->dir_auth << "; removing from exports" << endl;
+ assert(exports.count(in->dir) == 1);
+ exports.erase(in->dir); // discard nested export (nested_exports updated above)
+
+ in->dir->state_clear(CDIR_STATE_EXPORT);
+ in->dir->put(CDIR_PIN_EXPORT);
+
+ // simplify dir_auth?
+ if (in->dir->dir_auth == newauth)
+ in->dir->dir_auth = CDIR_AUTH_PARENT;
}
}
- // we're export this inode; fix inode state
- dout(7) << "export_dir_walk exporting " << *in << endl;
-
- if (in->is_dirty()) in->mark_clean();
-
- // clear/unpin cached_by (we're no longer the authority)
- in->cached_by_clear();
-
- // don't need to know this anymore
- in->clear_replica_writers();
-
- // twiddle lock states
- in->softlock.twiddle_export();
- in->hardlock.twiddle_export();
-
- // mark auth
- assert(in->is_auth());
- in->set_auth(false);
- in->replica_nonce = CINODE_EXPORT_NONCE;
-
// add to proxy
export_proxy_inos[basedir].insert(in->ino());
in->state_set(CINODE_STATE_PROXY);
in->get(CINODE_PIN_PROXY);
-
- // *** other state too?
-
// waiters
list<Context*> waiters;
in->take_waiting(CINODE_WAIT_ANY, waiters);
}
+void MDCache::decode_import_inode(CDentry *dn, crope& r, int& off, int oldauth)
+{
+
+ CInodeExport istate;
+ off = istate._unrope(r, off);
+ dout(15) << "got a cinodeexport " << endl;
+
+ bool added = false;
+ CInode *in = get_inode(istate.get_ino());
+ if (!in) {
+ in = new CInode;
+ added = true;
+ } else {
+ in->set_auth(true);
+ }
+
+ // state
+ istate.update_inode(in);
+
+ // link
+ if (added) {
+ add_inode(in);
+ dn->dir->link_inode(dn, in);
+ dout(10) << "added " << *in << endl;
+ } else {
+ dout(10) << " had " << *in << endl;
+
+ if (in->is_replica_writer(mds->get_nodeid()))
+ in->remove_replica_writer(mds->get_nodeid());
+ }
+
+ // cached_by
+ assert(!in->is_cached_by(oldauth));
+ in->cached_by_add( oldauth, CINODE_EXPORT_NONCE );
+ if (in->is_cached_by(mds->get_nodeid()))
+ in->cached_by_remove(mds->get_nodeid());
+
+ /* don't do this
+ if (!in->hardlock.is_stable() &&
+ in->hardlock.is_gathering(mds->get_nodeid())) {
+ in->hardlock.gather_set.erase(mds->get_nodeid());
+ if (in->hardlock.gather_set.size() == 0)
+ inode_hard_eval(in);
+ }
+ if (!in->softlock.is_stable() &&
+ in->softlock.is_gathering(mds->get_nodeid())) {
+ in->softlock.gather_set.erase(mds->get_nodeid());
+ if (in->softlock.gather_set.size() == 0)
+ inode_soft_eval(in);
+ }
+ */
+
+ // other
+ if (in->is_dirty()) {
+ dout(10) << "logging dirty import " << *in << endl;
+ mds->mdlog->submit_entry(new EInodeUpdate(in),
+ NULL); // FIXME pay attention to completion?
+ }
+}
+
void MDCache::import_dir_block(crope& r,
int& off,
}
else if (icode == 'I') {
// inode
- CInodeExport istate;
- off = istate._unrope(r, off);
- dout(15) << "got a cinodeexport " << endl;
-
- bool added = false;
- CInode *in = get_inode(istate.get_ino());
- if (!in) {
- in = new CInode;
- added = true;
- } else {
- in->set_auth(true);
- }
-
- // state
- istate.update_inode(in);
-
- // link
- if (added) {
- add_inode(in);
- dn->dir->link_inode(dn, in);
- dout(10) << "added " << *in << endl;
- } else {
- dout(10) << " had " << *in << endl;
-
- if (in->is_replica_writer(mds->get_nodeid()))
- in->remove_replica_writer(mds->get_nodeid());
- }
-
- // cached_by
- assert(!in->is_cached_by(oldauth));
- in->cached_by_add( oldauth, CINODE_EXPORT_NONCE );
- if (in->is_cached_by(mds->get_nodeid()))
- in->cached_by_remove(mds->get_nodeid());
-
- /* don't do this
- if (!in->hardlock.is_stable() &&
- in->hardlock.is_gathering(mds->get_nodeid())) {
- in->hardlock.gather_set.erase(mds->get_nodeid());
- if (in->hardlock.gather_set.size() == 0)
- inode_hard_eval(in);
- }
- if (!in->softlock.is_stable() &&
- in->softlock.is_gathering(mds->get_nodeid())) {
- in->softlock.gather_set.erase(mds->get_nodeid());
- if (in->softlock.gather_set.size() == 0)
- inode_soft_eval(in);
- }
- */
-
- // other
- if (in->is_dirty()) {
- dout(10) << "logging dirty import " << *in << endl;
- mds->mdlog->submit_entry(new EInodeUpdate(in),
- NULL); // FIXME pay attention to completion?
- }
+ decode_import_inode(dn, r, off, oldauth);
}
// mark dentry dirty? (only _after_ we link the inode)
}
//dump();
- lru->lru_status();
+ lru.lru_status();
return cur;
}