} else if (mdr->more()->waiting_on_slave.count(*p)) {
dout(10) << " already waiting on witness mds." << *p << dendl;
} else {
- if (!_rmdir_prepare_witness(mdr, *p, dn, straydn))
+ if (!_rmdir_prepare_witness(mdr, *p, trace, straydn))
return;
}
}
}
}
-bool Server::_rmdir_prepare_witness(MDRequestRef& mdr, mds_rank_t who, CDentry *dn, CDentry *straydn)
+bool Server::_rmdir_prepare_witness(MDRequestRef& mdr, mds_rank_t who, vector<CDentry*>& trace, CDentry *straydn)
{
if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) {
dout(10) << "_rmdir_prepare_witness mds." << who << " is not active" << dendl;
dout(10) << "_rmdir_prepare_witness mds." << who << dendl;
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_RMDIRPREP);
- dn->make_path(req->srcdnpath);
- straydn->make_path(req->destdnpath);
- req->op_stamp = mdr->get_op_stamp();
-
+ req->srcdnpath = filepath(trace.front()->get_dir()->ino());
+ for (auto dn : trace)
+ req->srcdnpath.push_dentry(dn->name);
mdcache->replicate_stray(straydn, who, req->stray);
-
+
+ req->op_stamp = mdr->get_op_stamp();
mds->send_message_mds(req, who);
assert(mdr->more()->waiting_on_slave.count(who) == 0);
if (destpath.get_ino() != srcpath.get_ino() &&
!(req->get_source().is_mds() &&
MDS_INO_IS_MDSDIR(srcpath.get_ino()))) { // <-- mds 'rename' out of stray dir is ok!
- // do traces share a dentry?
- CDentry *common = 0;
- for (unsigned i=0; i < srctrace.size(); i++) {
- for (unsigned j=0; j < desttrace.size(); j++) {
- if (srctrace[i] == desttrace[j]) {
- common = srctrace[i];
- break;
- }
- }
- if (common)
- break;
+ CInode *srcbase = srctrace[0]->get_dir()->get_inode();
+ CInode *destbase = desttrace[0]->get_dir()->get_inode();
+ // ok, extend srctrace toward root until it is an ancestor of desttrace.
+ while (srcbase != destbase &&
+ !srcbase->is_projected_ancestor_of(destbase)) {
+ CDentry *pdn = srcbase->get_projected_parent_dn();
+ srctrace.insert(srctrace.begin(), pdn);
+ dout(10) << "rename prepending srctrace with " << *pdn << dendl;
+ srcbase = pdn->get_dir()->get_inode();
}
- if (common) {
- dout(10) << "rename src and dest traces share common dentry " << *common << dendl;
- } else {
- CInode *srcbase = srctrace[0]->get_dir()->get_inode();
- CInode *destbase = destdir->get_inode();
- if (!desttrace.empty())
- destbase = desttrace[0]->get_dir()->get_inode();
-
- // ok, extend srctrace toward root until it is an ancestor of desttrace.
- while (srcbase != destbase &&
- !srcbase->is_projected_ancestor_of(destbase)) {
- srctrace.insert(srctrace.begin(),
- srcbase->get_projected_parent_dn());
- dout(10) << "rename prepending srctrace with " << *srctrace[0] << dendl;
- srcbase = srcbase->get_projected_parent_dn()->get_dir()->get_inode();
- }
-
- // then, extend destpath until it shares the same parent inode as srcpath.
- while (destbase != srcbase) {
- desttrace.insert(desttrace.begin(),
- destbase->get_projected_parent_dn());
- rdlocks.insert(&desttrace[0]->lock);
- dout(10) << "rename prepending desttrace with " << *desttrace[0] << dendl;
- destbase = destbase->get_projected_parent_dn()->get_dir()->get_inode();
- }
- dout(10) << "rename src and dest traces now share common ancestor " << *destbase << dendl;
+ // then, extend destpath until it shares the same parent inode as srcpath.
+ while (destbase != srcbase) {
+ CDentry *pdn = destbase->get_projected_parent_dn();
+ desttrace.insert(desttrace.begin(), pdn);
+ rdlocks.insert(&pdn->lock);
+ dout(10) << "rename prepending desttrace with " << *pdn << dendl;
+ destbase = pdn->get_dir()->get_inode();
}
+ dout(10) << "rename src and dest traces now share common ancestor " << *destbase << dendl;
}
// src == dest?
// is this a stray migration, reintegration or merge? (sanity checks!)
if (mdr->reqid.name.is_mds() &&
!(MDS_INO_IS_MDSDIR(srcpath.get_ino()) &&
- MDS_INO_IS_STRAY(destpath.get_ino())) &&
+ MDS_INO_IS_MDSDIR(destpath.get_ino())) &&
!(destdnl->is_remote() &&
destdnl->get_remote_ino() == srci->ino())) {
respond_to_request(mdr, -EINVAL); // actually, this won't reply, but whatev.
if (srcdnl->is_primary() && !mdr->more()->is_ambiguous_auth) {
dout(10) << " preparing ambiguous auth for srci" << dendl;
mdr->set_ambiguous_auth(srci);
- _rename_prepare_witness(mdr, last, witnesses, srcdn, destdn, straydn);
+ _rename_prepare_witness(mdr, last, witnesses, srctrace, desttrace, straydn);
return;
}
}
} else if (mdr->more()->waiting_on_slave.count(*p)) {
dout(10) << " already waiting on witness mds." << *p << dendl;
} else {
- if (!_rename_prepare_witness(mdr, *p, witnesses, srcdn, destdn, straydn))
+ if (!_rename_prepare_witness(mdr, *p, witnesses, srctrace, desttrace, straydn))
return;
}
}
if (last != MDS_RANK_NONE && mdr->more()->witnessed.count(last) == 0) {
dout(10) << " preparing last witness (srcdn auth)" << dendl;
assert(mdr->more()->waiting_on_slave.count(last) == 0);
- _rename_prepare_witness(mdr, last, witnesses, srcdn, destdn, straydn);
+ _rename_prepare_witness(mdr, last, witnesses, srctrace, desttrace, straydn);
return;
}
// helpers
bool Server::_rename_prepare_witness(MDRequestRef& mdr, mds_rank_t who, set<mds_rank_t> &witnesse,
- CDentry *srcdn, CDentry *destdn, CDentry *straydn)
+ vector<CDentry*>& srctrace, vector<CDentry*>& dsttrace, CDentry *straydn)
{
if (!mds->mdsmap->is_clientreplay_or_active_or_stopping(who)) {
dout(10) << "_rename_prepare_witness mds." << who << " is not active" << dendl;
dout(10) << "_rename_prepare_witness mds." << who << dendl;
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
MMDSSlaveRequest::OP_RENAMEPREP);
- srcdn->make_path(req->srcdnpath);
- destdn->make_path(req->destdnpath);
- req->op_stamp = mdr->get_op_stamp();
-
+
+ req->srcdnpath = filepath(srctrace.front()->get_dir()->ino());
+ for (auto dn : srctrace)
+ req->srcdnpath.push_dentry(dn->name);
+ req->destdnpath = filepath(dsttrace.front()->get_dir()->ino());
+ for (auto dn : dsttrace)
+ req->destdnpath.push_dentry(dn->name);
if (straydn)
mdcache->replicate_stray(straydn, who, req->stray);
// srcdn auth will verify our current witness list is sufficient
req->witnesses = witnesse;
+ req->op_stamp = mdr->get_op_stamp();
mds->send_message_mds(req, who);
assert(mdr->more()->waiting_on_slave.count(who) == 0);
vector<CDentry*> trace;
int r = mdcache->path_traverse(mdr, NULL, NULL, destpath, &trace, NULL, MDS_TRAVERSE_DISCOVERXLOCK);
if (r > 0) return;
+ if (r == -ESTALE) {
+ mdcache->find_ino_peers(destpath.get_ino(), new C_MDS_RetryRequest(mdcache, mdr),
+ mdr->slave_to_mds);
+ return;
+ }
assert(r == 0); // we shouldn't get an error here!
CDentry *destdn = trace[trace.size()-1];