We're not allowed to call acquire_locks again once we've completed locking.
Do it in the caller. Add a helper that will also take an rdlock to ensure
the anchor doesn't go away due to a racing operation.
}
};
+void MDCache::anchor_create_prep_locks(MDRequest *mdr, CInode *in,
+ set<SimpleLock*>& rdlocks, set<SimpleLock*>& xlocks)
+{
+ dout(10) << "anchor_create_prep_locks " << *in << dendl;
+
+ if (in->is_anchored()) {
+ // caller may have already xlocked it.. if so, that will suffice!
+ if (xlocks.count(&in->linklock) == 0)
+ rdlocks.insert(&in->linklock);
+ } else {
+ xlocks.insert(&in->linklock);
+
+ // path components too!
+ CDentry *dn = in->get_projected_parent_dn();
+ while (dn) {
+ rdlocks.insert(&dn->lock);
+ dn = dn->get_dir()->get_inode()->get_parent_dn();
+ }
+ }
+}
+
void MDCache::anchor_create(MDRequest *mdr, CInode *in, Context *onfinish)
{
assert(in->is_auth());
return;
}
- // rdlock path
- set<SimpleLock*> rdlocks = mdr->rdlocks;
- set<SimpleLock*> wrlocks = mdr->wrlocks;
- set<SimpleLock*> xlocks = mdr->xlocks;
- xlocks.insert(&in->linklock);
-
- CDentry *dn = in->get_parent_dn();
- while (dn) {
- rdlocks.insert(&dn->lock);
- dn = dn->get_dir()->get_inode()->get_parent_dn();
- }
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
- dout(7) << "anchor_create waiting for locks " << *in << dendl;
- return;
- }
-
// wait
in->add_waiter(CInode::WAIT_ANCHORED, onfinish);
// -- anchors --
public:
+ void anchor_create_prep_locks(MDRequest *mdr, CInode *in, set<SimpleLock*>& rdlocks,
+ set<SimpleLock*>& xlocks);
void anchor_create(MDRequest *mdr, CInode *in, Context *onfinish);
void anchor_destroy(CInode *in, Context *onfinish);
protected:
}
xlocks.insert(&targeti->linklock);
+
+ // take any locks needed for anchor creation/verification
+ mds->mdcache->anchor_create_prep_locks(mdr, targeti, rdlocks, xlocks);
+
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;
// anchor?
if (mdr->slave_request->get_op() == MMDSSlaveRequest::OP_LINKPREP) {
+
+ set<SimpleLock*> rdlocks = mdr->rdlocks;
+ set<SimpleLock*> wrlocks = mdr->wrlocks;
+ set<SimpleLock*> xlocks = mdr->xlocks;
+
+ // take any locks needed for anchor creation/verification
+ mds->mdcache->anchor_create_prep_locks(mdr, targeti, rdlocks, xlocks);
+
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
if (targeti->is_anchored()) {
dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << dendl;
}
else
rdlocks.insert(&srci->snaplock);
+ // take any locks needed for anchor creation/verification
+ mds->mdcache->anchor_create_prep_locks(mdr, srci, rdlocks, xlocks);
+
if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
return;