const string &dname = linkpath.last_dentry();
vector<CDentry*> linktrace;
CDir *dir = traverse_to_auth_dir(mdr, linktrace, linkpath);
+ if (!dir) return;
dout(7) << "handle_client_link link " << dname << " in " << *dir << endl;
// traverse to link target
{
MClientRequest *req = mdr->client_request();
+ // traverse to path
+ vector<CDentry*> trace;
+ Context *ondelay = new C_MDS_RetryRequest(mdcache, mdr);
+ int r = mdcache->path_traverse(mdr, 0,
+ req->get_filepath(), trace, false,
+ req, ondelay,
+ MDS_TRAVERSE_FORWARD);
+ if (r > 0) return;
+ if (trace.empty()) r = -EINVAL; // can't unlink root
+ if (r < 0) {
+ reply_request(mdr, r);
+ return;
+ }
+
+ CDentry *dn = trace[trace.size()-1];
+ assert(dn);
+
+ // readable?
+ if (!dn->can_read(mdr)) {
+ dout(10) << "waiting on unreadable dentry " << *dn << endl;
+ dn->dir->add_waiter(CDir::WAIT_DNREAD, dn->name, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
// rmdir or unlink?
bool rmdir = false;
if (req->get_op() == MDS_OP_RMDIR) rmdir = true;
- // get/lock the dentry and path
- CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true); // must exist
- if (!dn) return;
-
if (rmdir) {
dout(7) << "handle_client_rmdir on " << *dn << endl;
} else {
dout(7) << "handle_client_unlink on " << *dn << endl;
}
-
// dn looks ok.
// get/open inode.
}
}
- dout(7) << "handle_client_unlink/rmdir on " << *in << endl;
-
+ dout(7) << "inode is " << *in << endl;
// ok!
if (dn->is_remote() && !dn->inode->is_auth())
// not dir auth?
if (!dir->is_auth()) {
- // hmm. we need it to import. how to make that happen?
- // and wait on it?
- assert(0); // IMPLEMENT ME
+ dout(10) << "_verify_rmdir not auth for " << *dir << ", FIXME BUG" << endl;
+ reply_request(mdr, -ENOTEMPTY);
+ return false;
}
}
void Server::handle_client_rename(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
-
- dout(7) << "handle_client_rename on " << *req << endl;
-
- // traverse to source
- /*
- this is abnoraml, just for rename. since we don't pin source path
- (because we don't want to screw up the lock ordering) the ref inode
- (normally/initially srcdiri) may move, and this may fail.
- */
- /*
- filepath refpath = req->get_filepath();
- string srcname = refpath.last_dentry();
- refpath = refpath.prefixpath(refpath.depth()-1);
-
- dout(7) << "handle_client_rename src traversing to srcdir " << refpath << endl;
- vector<CDentry*> trace;
- int r = mdcache->path_traverse(refpath, trace, true,
- req, new C_MDS_RetryRequest(mdcache, mdr),
- MDS_TRAVERSE_FORWARD);
+ dout(7) << "handle_client_rename " << *req << endl;
+
+ // traverse to dest dir (not dest)
+ // we do this FIRST, because the rename should occur on the
+ // destdn's auth.
+ const filepath &destpath = req->get_sarg();
+ const string &destname = destpath.last_dentry();
+ vector<CDentry*> desttrace;
+ CDir *destdir = traverse_to_auth_dir(mdr, desttrace, destpath);
+ if (!destdir) return; // fw or error out
+ dout(10) << "dest will be " << destname << " in " << *destdir << endl;
+ assert(destdir->is_auth());
+
+ // traverse to src
+ filepath srcpath = req->get_filepath();
+ vector<CDentry*> srctrace;
+ Context *ondelay = new C_MDS_RetryRequest(mdcache, mdr);
+ int r = mdcache->path_traverse(mdr, 0,
+ srcpath, srctrace, false,
+ req, ondelay,
+ MDS_TRAVERSE_DISCOVER);
if (r > 0) return;
- if (r < 0) { // dne or something. got renamed out from under us, probably!
- dout(7) << "traverse r=" << r << endl;
- reply_request(req, r);
+ if (srctrace.empty()) r = -EINVAL; // can't rename root
+ if (r < 0) {
+ reply_request(mdr, r);
return;
}
-
- CInode *srcdiri;
- if (trace.size())
- srcdiri = trace[trace.size()-1]->inode;
- else
- srcdiri = mdcache->get_root();
-
- dout(7) << "handle_client_rename srcdiri is " << *srcdiri << endl;
+ CDentry *srcdn = srctrace[srctrace.size()-1];
+ dout(10) << "srcdn is " << *srcdn << endl;
+ CInode *srci = mdcache->get_dentry_inode(srcdn, mdr);
+ dout(10) << "srci is " << *srci << endl;
- dout(7) << "handle_client_rename srcname is " << srcname << endl;
-
- // make sure parent is a dir?
- if (!srcdiri->is_dir()) {
- dout(7) << "srcdiri not a dir " << *srcdiri << endl;
- reply_request(req, -EINVAL);
+ // -- some sanity checks --
+ // src == dest?
+ if (srcdn->get_dir() == destdir && srcdn->name == destname) {
+ dout(7) << "rename src=dest, noop" << endl;
+ reply_request(mdr, 0);
return;
}
- frag_t srcfg = srcdiri->pick_dirfrag(srcname);
-
- // open dirfrag? is it mine?
- CDir *srcdir = try_open_auth_dir(srcdiri, srcfg, mdr);
- if (!srcdir) return;
- dout(7) << "handle_client_rename srcdir is " << *srcdir << endl;
-
- // ok, done passing buck.
-
- // src dentry
- CDentry *srcdn = srcdir->lookup(srcname);
- if (!_rename_open_dn(srcdir, srcdn, true, mdr))
- return;
-
- // pin src dentry in cache (so it won't expire)
- mdcache->request_pin_dn(req, srcdn);
-
- // find the destination, normalize
- // discover, etc. on the way... just get it on the local node.
- filepath destpath = req->get_sarg();
-
- C_MDS_RenameTraverseDst *onfinish = new C_MDS_RenameTraverseDst(this, req, ref, srcdn, destpath);
- Context *ondelay = new C_MDS_RetryRequest(mdcache, mdr);
-
- mdcache->path_traverse(mdr,
- destpath, onfinish->trace, false,
- req, ondelay,
- MDS_TRAVERSE_DISCOVER,
- onfinish);
- */
-}
-
-void Server::handle_client_rename_2(MDRequest *mdr,
- CDentry *srcdn,
- filepath& destpath,
- vector<CDentry*>& trace,
- int r)
-{
- /*
- MClientRequest *req = mdr->client_request();
+ // dest a child of src?
+ // e.g. mv /usr /usr/foo
+ CDentry *pdn = destdir->inode->parent;
+ while (pdn) {
+ if (pdn == srcdn) {
+ dout(7) << "cannot rename item to be a child of itself" << endl;
+ reply_request(mdr, -EINVAL);
+ return;
+ }
+ pdn = pdn->dir->inode->parent;
+ }
- dout(7) << "handle_client_rename_2 on " << *req << endl;
- dout(12) << " r = " << r << " trace depth " << trace.size()
- << " destpath depth " << destpath.depth() << endl;
- // make sure srcdn is readable, srci is still there.
- if (!_rename_open_dn(srcdn->dir, srcdn, true, req, ref))
+ // identify/create dest dentry
+ CDentry *destdn = destdir->lookup(destname);
+ if (destdn && !destdn->can_read(mdr)) {
+ destdir->add_waiter(CDir::WAIT_DNREAD, destname, new C_MDS_RetryRequest(mdcache, mdr));
return;
- CInode *srci = srcdn->inode;
-
- // note: trace includes root, destpath doesn't (include leading /)
- if (trace.size() && trace[trace.size()-1]->is_null()) {
- dout(10) << "dropping null dentry from tail of trace" << endl;
- trace.pop_back(); // drop it!
- }
-
- // identify dest
- CDentry* lastdn = 0;
- CInode* lastin = 0;
- if (trace.size()) {
- lastdn = trace[trace.size()-1];
- dout(10) << "handle_client_rename_2 traced to " << *lastdn
- << ", trace size = " << trace.size()
- << ", destpath = " << destpath.depth() << endl;
- lastin = mdcache->get_dentry_inode(lastdn, req, ref);
- if (!lastin) return;
- } else {
- dout(10) << "handle_client_rename_2 traced to root" << endl;
- lastin = mdcache->get_root();
}
- assert(lastin);
-
- // make sure i can open the dir?
- frag_t dfg;
- CDir* destdir = 0;
- string destname;
- CDentry *destdn = 0;
- if (trace.size() == destpath.depth()) {
+ CInode *oldin = 0;
+ if (destdn && !destdn->is_null()) {
+ dout(10) << "dest dn exists " << *destdn << endl;
+ oldin = mdcache->get_dentry_inode(destdn, mdr);
+ if (!oldin) return;
+ dout(10) << "oldin " << *oldin << endl;
+
// mv /some/thing /to/some/existing_other_thing
- if (lastin->is_dir() && !srci->is_dir()) {
- reply_request(req, -EISDIR);
+ if (oldin->is_dir() && !srci->is_dir()) {
+ reply_request(mdr, -EISDIR);
return;
}
- if (!lastin->is_dir() && srci->is_dir()) {
- reply_request(req, -ENOTDIR);
+ if (!oldin->is_dir() && srci->is_dir()) {
+ reply_request(mdr, -ENOTDIR);
return;
}
- // they are both files or both dirs.
- destdn = lastdn;
- destname = destdn->name;
- destdir = destdn->dir;
-
- if (lastin->is_dir()) {
- // is it empty?
- if (!_verify_rmdir(req, ref, lastin))
- return;
- }
- }
- else if (trace.size() == destpath.depth()-1) {
- if (!lastin->is_dir()) {
- // mv /some/thing /to/some/existing_file/blah
- dout(7) << "not a dir " << *lastin << endl;
- reply_request(req, -ENOTDIR);
+ // non-empty dir?
+ if (oldin->is_dir() && !_verify_rmdir(mdr, oldin))
return;
- }
-
- // mv /some/thing /to/some/thing_that_dne
- destname = destpath.last_dentry(); // "thing_that_dne"
- dfg = lastin->pick_dirfrag(destname);
- destdir = try_open_dir(lastin, dfg, req, ref); // /to/some
- if (!destdir) return;
}
- else {
- assert(trace.size() < destpath.depth()-1);
-
- // check traverse return value
- if (r > 0) return; // discover, readdir, etc.
-
- assert(r < 0 || trace.size() == 0); // musta been an error
-
- dout(7) << " rename dest " << destpath << " dne" << endl;
- reply_request(req, -EINVAL);
- return;
+ if (!destdn) {
+ // mv /some/thing /to/some/non_existent_name
+ destdn = prepare_null_dentry(mdr, destdir, destname);
+ if (!destdn) return;
}
- string srcpath = req->get_path();
- dout(10) << "handle_client_rename_2 srcpath " << srcpath << endl;
- dout(10) << "handle_client_rename_2 destpath " << destpath << endl;
+ dout(10) << "destdn " << *destdn << endl;
- // src == dest?
- if (srcdn->get_dir() == destdir && srcdn->name == destname) {
- dout(7) << "rename src=dest, noop" << endl;
- reply_request(req, 0);
- return;
- }
- // dest a child of src?
- // e.g. mv /usr /usr/foo
- CDentry *pdn = destdir->inode->parent;
- while (pdn) {
- if (pdn == srcdn) {
- reply_request(req, -EINVAL);
- return;
- }
- pdn = pdn->dir->inode->parent;
- }
+ // -- locks --
+ set<CDentry*> dentry_rdlocks;
+ set<CDentry*> dentry_xlocks;
+ set<CInode*> inode_hard_rdlocks;
+ set<CInode*> inode_hard_xlocks;
- // does destination exist? (is this an overwrite?)
- CInode *oldin = 0;
- if (destdn) {
- if (!destdn->is_null()) {
- oldin = mdcache->get_dentry_inode(destdn, req, ref);
- if (!oldin) return;
- dout(7) << "dest dn exists " << *destdn << " " << *oldin << endl;
- } else {
- dout(7) << "dest dn exists " << *destdn << endl;
- }
- } else {
- dout(7) << "dest dn dne (yet)" << endl;
- }
-
- // local or remote?
- dout(7) << "handle_client_rename_2 destname " << destname
- << " destdir " << *destdir
- << endl;
+ // rdlock sourcedir path, xlock src dentry
+ for (unsigned i=0; i<srctrace.size()-1; i++)
+ dentry_rdlocks.insert(srctrace[i]);
+ dentry_xlocks.insert(srcdn);
- //
- if (!srcdn->is_auth() || !destdir->is_auth() ||
- (oldin && !oldin->is_auth())) {
- dout(7) << "rename has remote dest, or overwrites remote inode" << endl;
- dout(7) << "FOREIGN RENAME" << endl;
-
- reply_request(req, -EINVAL); // for now!
+ // rdlock destdir path, xlock dest dentry
+ for (unsigned i=0; i<desttrace.size(); i++)
+ dentry_rdlocks.insert(desttrace[i]);
+ dentry_xlocks.insert(destdn);
- } else {
- dout(7) << "rename is local" << endl;
+ // xlock oldin
+ if (oldin) inode_hard_xlocks.insert(oldin);
+
+ if (!mds->locker->acquire_locks(mdr,
+ dentry_rdlocks, dentry_xlocks,
+ inode_hard_rdlocks, inode_hard_xlocks))
+ return;
- _rename_local(req, ref,
- srcdn,
- destdir, destdn, destname);
+
+ // ok go!
+ if (srcdn->is_auth() && destdn->is_auth())
+ _rename_local(mdr, srcdn, destdn);
+ else {
+ // _rename_remote(mdr, srcdn, destdn);
+ reply_request(mdr, -EXDEV);
+ return;
}
- */
}
void Server::_rename_local(MDRequest *mdr,
CDentry *srcdn,
- CDir *destdir,
- CDentry *destdn,
- const string& destname)
+ CDentry *destdn)
{
- /*
- dout(10) << "_rename_local " << *srcdn << " to " << destname << " in " << *destdir << endl;
-
- // make sure target (possibly null) dentry exists
- int r = prepare_null_dentry(mdr,
- destdir->inode, destname,
- &destdir, &destdn, true);
- if (!r) return;
- dout(10) << "destdn " << *destdn << endl;
-
- // auth pins
- if (!mdcache->request_auth_pinned(req, srcdn->get_dir()) &&
- !srcdn->get_dir()->can_auth_pin()) {
- srcdn->get_dir()->add_waiter(CDir::WAIT_AUTHPINNABLE,
- new C_MDS_RetryRequest(mds, req, ref));
- return;
- }
- if (!mdcache->request_auth_pinned(req, destdn->get_dir()) &&
- !destdn->get_dir()->can_auth_pin()) {
- destdn->get_dir()->add_waiter(CDir::WAIT_AUTHPINNABLE,
- new C_MDS_RetryRequest(mds, req, ref));
- return;
- }
- if (destdn->inode &&
- !mdcache->request_auth_pinned(req, destdn->inode) &&
- !destdn->inode->can_auth_pin()) {
- destdn->inode->add_waiter(CInode::WAIT_AUTHPINNABLE,
- new C_MDS_RetryRequest(mds, req, ref));
- return;
- }
- mdcache->request_auth_pin(req, srcdn->dir);
- mdcache->request_auth_pin(req, destdn->dir);
- if (destdn->inode)
- mdcache->request_auth_pin(req, destdn->inode);
-
- // locks
- bool dosrc = *srcdn < *destdn;
- for (int i=0; i<2; i++) {
- if (dosrc) {
- if (!mds->locker->dentry_xlock_start(srcdn, req, ref))
- return;
- } else {
- if (!mds->locker->dentry_xlock_start(destdn, req, ref))
- return;
- }
- dosrc = !dosrc;
- }
- if (destdn->inode &&
- !mds->locker->inode_hard_xlock_start(destdn->inode, req, ref))
- return;
-
-
- // verify rmdir?
- if (destdn->inode && destdn->inode->is_dir() &&
- !_verify_rmdir(req, ref, destdn->inode))
- return;
+ dout(10) << "_rename_local " << *srcdn << " to " << *destdn << endl;
// let's go.
EUpdate *le = new EUpdate("rename_local");
- le->metablob.add_client_req(req->get_reqid());
+ le->metablob.add_client_req(mdr->reqid);
CDentry *straydn = 0;
inode_t *pi = 0;
pi->version = ipv;
}
- C_MDS_rename_local_finish *fin = new C_MDS_rename_local_finish(mds, req,
+ C_MDS_rename_local_finish *fin = new C_MDS_rename_local_finish(mds, mdr,
srcdn, destdn, straydn,
ipv, pi ? pi->ctime:0);
mdlog->submit_entry(le);
mdlog->wait_for_sync(fin);
}
- */
}
time_t ictime,
version_t atid1, version_t atid2)
{
- /*
+ MClientRequest *req = mdr->client_request();
dout(10) << "_rename_local_finish " << *req << endl;
CInode *oldin = destdn->inode;
// share news with replicas
// ***
- // unlock
- mds->locker->dentry_xlock_finish(srcdn);
- mds->locker->dentry_xlock_finish(destdn);
- if (oldin)
- mds->locker->inode_hard_xlock_finish(oldin);
-
// reply
MClientReply *reply = new MClientReply(req, 0);
- reply_request(req, reply, destdn->dir->get_inode()); // FIXME: imprecise ref
+ reply_request(mdr, reply, destdn->dir->get_inode()); // FIXME: imprecise ref
// clean up?
if (straydn)
mdcache->eval_stray(straydn);
- */
}