dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
if (!(pf->rstat == pf->accounted_rstat)) {
- if (mut->wrlocks.count(&nestlock) == 0) {
+ if (!mut->is_wrlocked(&nestlock)) {
mdcache->mds->locker->wrlock_force(&nestlock, mut);
}
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
{
// rdlock ancestor snaps
CInode *t = in;
- rdlocks.insert(&in->snaplock);
while (t->get_projected_parent_dn()) {
t = t->get_projected_parent_dn()->get_dir()->get_inode();
- rdlocks.insert(&t->snaplock);
+ lov.add_rdlock(&t->snaplock);
}
+ lov.add_rdlock(&in->snaplock);
}
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
file_layout_t **layout)
{
//rdlock ancestor snaps
CInode *t = in;
- rdlocks.insert(&in->snaplock);
- rdlocks.insert(&in->policylock);
+ lov.add_rdlock(&in->snaplock);
+ lov.add_rdlock(&in->policylock);
bool found_layout = false;
while (t) {
- rdlocks.insert(&t->snaplock);
+ lov.add_rdlock(&t->snaplock);
if (!found_layout) {
- rdlocks.insert(&t->policylock);
+ lov.add_rdlock(&t->policylock);
if (t->get_projected_inode()->has_layout()) {
*layout = &t->get_projected_inode()->layout;
found_layout = true;
/* If this function returns false, the mdr has been placed
* on the appropriate wait list */
bool Locker::acquire_locks(MDRequestRef& mdr,
- set<SimpleLock*> &rdlocks,
- set<SimpleLock*> &wrlocks,
- set<SimpleLock*> &xlocks,
- map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+ MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze,
bool auth_pin_nonblock)
{
client_t client = mdr->get_client();
- set<SimpleLock*, SimpleLock::ptr_lt> sorted; // sort everything we will lock
- set<MDSCacheObject*> mustpin; // items to authpin
+ set<MDSCacheObject*> mustpin; // items to authpin
// xlocks
- for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
- SimpleLock *lock = *p;
-
- if ((lock->get_type() == CEPH_LOCK_ISNAP ||
- lock->get_type() == CEPH_LOCK_IPOLICY) &&
- mds->is_cluster_degraded() &&
- mdr->is_master() &&
- !mdr->is_queued_for_replay()) {
- // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
- // get processed in proper order.
- bool wait = false;
- if (lock->get_parent()->is_auth()) {
- if (!mdr->locks.count(lock)) {
- set<mds_rank_t> ls;
- lock->get_parent()->list_replicas(ls);
- for (auto m : ls) {
- if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
- wait = true;
- break;
+ for (int i = 0, size = lov.size(); i < size; ++i) {
+ auto& p = lov[i];
+ SimpleLock *lock = p.lock;
+ MDSCacheObject *object = lock->get_parent();
+
+ if (p.is_xlock()) {
+ if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+ lock->get_type() == CEPH_LOCK_IPOLICY) &&
+ mds->is_cluster_degraded() &&
+ mdr->is_master() &&
+ !mdr->is_queued_for_replay()) {
+ // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+ // get processed in proper order.
+ bool wait = false;
+ if (object->is_auth()) {
+ if (!mdr->locks.count(lock)) {
+ set<mds_rank_t> ls;
+ object->list_replicas(ls);
+ for (auto m : ls) {
+ if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+ wait = true;
+ break;
+ }
}
}
+ } else {
+ // if the lock is the latest locked one, it's possible that slave mds got the lock
+ // while there are recovering mds.
+ if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+ wait = true;
+ }
+ if (wait) {
+ dout(10) << " must xlock " << *lock << " " << *object
+ << ", waiting for cluster recovered" << dendl;
+ mds->locker->drop_locks(mdr.get(), NULL);
+ mdr->drop_local_auth_pins();
+ mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+ return false;
}
- } else {
- // if the lock is the latest locked one, it's possible that slave mds got the lock
- // while there are recovering mds.
- if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
- wait = true;
- }
- if (wait) {
- dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
- << ", waiting for cluster recovered" << dendl;
- mds->locker->drop_locks(mdr.get(), NULL);
- mdr->drop_local_auth_pins();
- mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
- return false;
}
- }
-
- dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
-
- sorted.insert(lock);
- mustpin.insert(lock->get_parent());
- // augment xlock with a versionlock?
- if ((*p)->get_type() == CEPH_LOCK_DN) {
- CDentry *dn = (CDentry*)lock->get_parent();
- if (!dn->is_auth())
- continue;
+ dout(20) << " must xlock " << *lock << " " << *object << dendl;
- if (xlocks.count(&dn->versionlock))
- continue; // we're xlocking the versionlock too; don't wrlock it!
+ mustpin.insert(object);
- if (mdr->is_master()) {
- // master. wrlock versionlock so we can pipeline dentry updates to journal.
- wrlocks.insert(&dn->versionlock);
- } else {
- // slave. exclusively lock the dentry version (i.e. block other journal updates).
- // this makes rollback safe.
- xlocks.insert(&dn->versionlock);
- sorted.insert(&dn->versionlock);
+ // augment xlock with a versionlock?
+ if (lock->get_type() == CEPH_LOCK_DN) {
+ CDentry *dn = static_cast<CDentry*>(object);
+ if (!dn->is_auth())
+ continue;
+ if (mdr->is_master()) {
+ // master. wrlock versionlock so we can pipeline dentry updates to journal.
+ lov.add_wrlock(&dn->versionlock);
+ } else {
+ // slave. exclusively lock the dentry version (i.e. block other journal updates).
+ // this makes rollback safe.
+ lov.add_xlock(&dn->versionlock);
+ }
}
- }
- if (lock->get_type() > CEPH_LOCK_IVERSION) {
- // inode version lock?
- CInode *in = (CInode*)lock->get_parent();
- if (!in->is_auth())
- continue;
- if (mdr->is_master()) {
- // master. wrlock versionlock so we can pipeline inode updates to journal.
- wrlocks.insert(&in->versionlock);
- } else {
- // slave. exclusively lock the inode version (i.e. block other journal updates).
- // this makes rollback safe.
- xlocks.insert(&in->versionlock);
- sorted.insert(&in->versionlock);
+ if (lock->get_type() > CEPH_LOCK_IVERSION) {
+ // inode version lock?
+ CInode *in = static_cast<CInode*>(object);
+ if (!in->is_auth())
+ continue;
+ if (mdr->is_master()) {
+ // master. wrlock versionlock so we can pipeline inode updates to journal.
+ lov.add_wrlock(&in->versionlock);
+ } else {
+ // slave. exclusively lock the inode version (i.e. block other journal updates).
+ // this makes rollback safe.
+ lov.add_xlock(&in->versionlock);
+ }
}
- }
- }
-
- // wrlocks
- for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- dout(20) << " must wrlock " << **p << " " << *object << dendl;
- sorted.insert(*p);
- if (object->is_auth())
- mustpin.insert(object);
- else if (!object->is_auth() &&
- !(*p)->can_wrlock(client) && // we might have to request a scatter
- !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
- dout(15) << " will also auth_pin " << *object
- << " in case we need to request a scatter" << dendl;
- mustpin.insert(object);
- }
- }
-
- // remote_wrlocks
- if (remote_wrlocks) {
- for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
- MDSCacheObject *object = p->first->get_parent();
- dout(20) << " must remote_wrlock on mds." << p->second << " "
- << *p->first << " " << *object << dendl;
- sorted.insert(p->first);
- mustpin.insert(object);
- }
- }
-
- // rdlocks
- for (set<SimpleLock*>::iterator p = rdlocks.begin();
- p != rdlocks.end();
- ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- dout(20) << " must rdlock " << **p << " " << *object << dendl;
- sorted.insert(*p);
- if (object->is_auth())
- mustpin.insert(object);
- else if (!object->is_auth() &&
- !(*p)->can_rdlock(client)) { // we might have to request an rdlock
- dout(15) << " will also auth_pin " << *object
- << " in case we need to request a rdlock" << dendl;
+ } else if (p.is_wrlock()) {
+ dout(20) << " must wrlock " << *lock << " " << *object << dendl;
+ if (object->is_auth()) {
+ mustpin.insert(object);
+ } else if (!object->is_auth() &&
+ !lock->can_wrlock(client) && // we might have to request a scatter
+ !mdr->is_slave()) { // if we are slave (remote_wrlock), the master already authpinned
+ dout(15) << " will also auth_pin " << *object
+ << " in case we need to request a scatter" << dendl;
+ mustpin.insert(object);
+ }
+ } else if (p.is_remote_wrlock()) {
+ dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
+ << *lock << " " << *object << dendl;
mustpin.insert(object);
+ } else if (p.is_rdlock()) {
+
+ dout(20) << " must rdlock " << *lock << " " << *object << dendl;
+ if (object->is_auth()) {
+ mustpin.insert(object);
+ } else if (!object->is_auth() &&
+ !lock->can_rdlock(client)) { // we might have to request an rdlock
+ dout(15) << " will also auth_pin " << *object
+ << " in case we need to request a rdlock" << dendl;
+ mustpin.insert(object);
+ }
+ } else {
+ ceph_assert(0 == "locker unknown lock operation");
}
}
+ lov.sort_and_merge();
// AUTH PINS
map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote; // mds -> (object set)
// can i auth pin them all now?
marker.message = "failed to authpin local pins";
- for (set<MDSCacheObject*>::iterator p = mustpin.begin();
- p != mustpin.end();
- ++p) {
- MDSCacheObject *object = *p;
+ for (const auto &p : mustpin) {
+ MDSCacheObject *object = p;
dout(10) << " must authpin " << *object << dendl;
}
// ok, grab local auth pins
- for (set<MDSCacheObject*>::iterator p = mustpin.begin();
- p != mustpin.end();
- ++p) {
- MDSCacheObject *object = *p;
+ for (const auto& p : mustpin) {
+ MDSCacheObject *object = p;
if (mdr->is_auth_pinned(object)) {
dout(10) << " already auth_pinned " << *object << dendl;
} else if (object->is_auth()) {
// request remote auth_pins
if (!mustpin_remote.empty()) {
marker.message = "requesting remote authpins";
- for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
- p != mdr->remote_auth_pins.end();
- ++p) {
- if (mustpin.count(p->first)) {
- ceph_assert(p->second == p->first->authority().first);
- map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+ for (const auto& p : mdr->remote_auth_pins) {
+ if (mustpin.count(p.first)) {
+ ceph_assert(p.second == p.first->authority().first);
+ map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
if (q != mustpin_remote.end())
- q->second.insert(p->first);
+ q->second.insert(p.first);
}
}
for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
// acquire locks.
// make sure they match currently acquired locks.
- set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
- for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
- p != sorted.end();
- ++p) {
- bool need_wrlock = !!wrlocks.count(*p);
- bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+ auto existing = mdr->locks.begin();
+ for (const auto& p : lov) {
+ bool need_wrlock = p.is_wrlock();
+ bool need_remote_wrlock = p.is_remote_wrlock();
// already locked?
- if (existing != mdr->locks.end() && *existing == *p) {
+ if (existing != mdr->locks.end() && existing->lock == p.lock) {
// right kind?
- SimpleLock *have = *existing;
- ++existing;
- if (xlocks.count(have) && mdr->xlocks.count(have)) {
- dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+ auto it = existing++;
+ auto have = *it; // don't reference
+
+ if (have.is_xlock() && p.is_xlock()) {
+ dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
- if (mdr->remote_wrlocks.count(have)) {
- if (!need_remote_wrlock ||
- mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
- dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
- << " " << *have << " " << *have->get_parent() << dendl;
- remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
- }
+
+ if (have.is_remote_wrlock() &&
+ (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
+ dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
+ << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ remote_wrlock_finish(it, mdr.get());
+ have.clear_remote_wrlock();
}
+
if (need_wrlock || need_remote_wrlock) {
- if (need_wrlock == !!mdr->wrlocks.count(have) &&
- need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+ if (need_wrlock == have.is_wrlock() &&
+ need_remote_wrlock == have.is_remote_wrlock()) {
if (need_wrlock)
- dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+ dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
if (need_remote_wrlock)
- dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+ dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
- }
- if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
- dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+
+ if (have.is_wrlock()) {
+ if (!need_wrlock)
+ dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ else if (need_remote_wrlock) // acquire remote_wrlock first
+ dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
+ bool need_issue = false;
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (need_issue)
+ issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
+ }
+ } else if (have.is_rdlock() && p.is_rdlock()) {
+ dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
continue;
}
}
// hose any stray locks
- if (existing != mdr->locks.end() && *existing == *p) {
- ceph_assert(need_wrlock || need_remote_wrlock);
- SimpleLock *lock = *existing;
- if (mdr->wrlocks.count(lock)) {
- if (!need_wrlock)
- dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
- else if (need_remote_wrlock) // acquire remote_wrlock first
- dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
- bool need_issue = false;
- wrlock_finish(lock, mdr.get(), &need_issue);
- if (need_issue)
- issue_set.insert(static_cast<CInode*>(lock->get_parent()));
- }
- ++existing;
- }
while (existing != mdr->locks.end()) {
- SimpleLock *stray = *existing;
- ++existing;
- dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+ auto it = existing++;
+ auto stray = *it; // don't reference
+ dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
bool need_issue = false;
- if (mdr->xlocks.count(stray)) {
- xlock_finish(stray, mdr.get(), &need_issue);
- } else if (mdr->rdlocks.count(stray)) {
- rdlock_finish(stray, mdr.get(), &need_issue);
+ if (stray.is_xlock()) {
+ xlock_finish(it, mdr.get(), &need_issue);
+ } else if (stray.is_rdlock()) {
+ rdlock_finish(it, mdr.get(), &need_issue);
} else {
// may have acquired both wrlock and remore wrlock
- if (mdr->wrlocks.count(stray))
- wrlock_finish(stray, mdr.get(), &need_issue);
- if (mdr->remote_wrlocks.count(stray))
- remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+ if (stray.is_wrlock())
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (stray.is_remote_wrlock())
+ remote_wrlock_finish(it, mdr.get());
}
if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+ issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
}
// lock
- if (mdr->locking && *p != mdr->locking) {
+ if (mdr->locking && p.lock != mdr->locking) {
cancel_locking(mdr.get(), &issue_set);
}
- if (xlocks.count(*p)) {
+ if (p.is_xlock()) {
marker.message = "failed to xlock, waiting";
- if (!xlock_start(*p, mdr))
+ if (!xlock_start(p.lock, mdr))
goto out;
- dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
} else if (need_wrlock || need_remote_wrlock) {
- if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+ if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
marker.message = "waiting for remote wrlocks";
- remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+ remote_wrlock_start(p, p.wrlock_target, mdr);
goto out;
}
- if (need_wrlock && !mdr->wrlocks.count(*p)) {
+ if (need_wrlock) {
marker.message = "failed to wrlock, waiting";
- if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+ if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
marker.message = "failed to wrlock, dropping remote wrlock and waiting";
// can't take the wrlock because the scatter lock is gathering. need to
// release the remote wrlock, so that the gathering process can finish.
- remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
- remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+ auto it = mdr->locks.end();
+ ++it;
+ remote_wrlock_finish(it, mdr.get());
+ remote_wrlock_start(p, p.wrlock_target, mdr);
goto out;
}
// nowait if we have already gotten remote wrlock
- if (!wrlock_start(*p, mdr, need_remote_wrlock))
+ if (!wrlock_start(p, mdr, need_remote_wrlock))
goto out;
- dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
}
} else {
ceph_assert(mdr->is_master());
- if ((*p)->needs_recover()) {
+ if (p.lock->needs_recover()) {
if (mds->is_cluster_degraded()) {
if (!mdr->is_queued_for_replay()) {
// see comments in SimpleLock::set_state_rejoin() and
// ScatterLock::encode_state_for_rejoin()
drop_locks(mdr.get());
mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
- dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
+ dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
<< ", waiting for cluster recovered" << dendl;
marker.message = "rejoin recovering lock, waiting for cluster recovered";
return false;
}
} else {
- (*p)->clear_need_recover();
+ p.lock->clear_need_recover();
}
}
marker.message = "failed to rdlock, waiting";
- if (!rdlock_start(*p, mdr))
+ if (!rdlock_start(p, mdr))
goto out;
- dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+ dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
}
}
// any extra unneeded locks?
while (existing != mdr->locks.end()) {
- SimpleLock *stray = *existing;
- ++existing;
- dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+ auto it = existing++;
+ auto stray = *it;
+ dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
bool need_issue = false;
- if (mdr->xlocks.count(stray)) {
- xlock_finish(stray, mdr.get(), &need_issue);
- } else if (mdr->rdlocks.count(stray)) {
- rdlock_finish(stray, mdr.get(), &need_issue);
+ if (stray.is_xlock()) {
+ xlock_finish(it, mdr.get(), &need_issue);
+ } else if (stray.is_rdlock()) {
+ rdlock_finish(it, mdr.get(), &need_issue);
} else {
// may have acquired both wrlock and remore wrlock
- if (mdr->wrlocks.count(stray))
- wrlock_finish(stray, mdr.get(), &need_issue);
- if (mdr->remote_wrlocks.count(stray))
- remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+ if (stray.is_wrlock())
+ wrlock_finish(it, mdr.get(), &need_issue);
+ if (stray.is_remote_wrlock())
+ remote_wrlock_finish(it, mdr.get());
}
if (need_issue)
- issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+ issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
}
mdr->done_locking = true;
void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
{
- for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
- p != mut->xlocks.end();
- ++p) {
- MDSCacheObject *object = (*p)->get_parent();
- ceph_assert(object->is_auth());
+ for (const auto &p : mut->locks) {
+ if (!p.is_xlock())
+ continue;
+ MDSCacheObject *obj = p.lock->get_parent();
+ ceph_assert(obj->is_auth());
if (skip_dentry &&
- ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
+ (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
continue;
- dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
- (*p)->set_xlock_done();
- }
-}
-
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
- while (!mut->rdlocks.empty()) {
- bool ni = false;
- MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
- rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
+ dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
+ p.lock->set_xlock_done();
}
}
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
+ bool drop_rdlocks)
{
set<mds_rank_t> slaves;
- while (!mut->xlocks.empty()) {
- SimpleLock *lock = *mut->xlocks.begin();
- MDSCacheObject *p = lock->get_parent();
- if (!p->is_auth()) {
- ceph_assert(lock->get_sm()->can_remote_xlock);
- slaves.insert(p->authority().first);
- lock->put_xlock();
- mut->locks.erase(lock);
- mut->xlocks.erase(lock);
- continue;
- }
- bool ni = false;
- xlock_finish(lock, mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
- }
-
- while (!mut->remote_wrlocks.empty()) {
- map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
- slaves.insert(p->second);
- if (mut->wrlocks.count(p->first) == 0)
- mut->locks.erase(p->first);
- mut->remote_wrlocks.erase(p);
- }
+ for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+ SimpleLock *lock = it->lock;
+ MDSCacheObject *obj = lock->get_parent();
- while (!mut->wrlocks.empty()) {
- bool ni = false;
- MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
- wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
- if (ni)
- pneed_issue->insert(static_cast<CInode*>(p));
+ if (it->is_xlock()) {
+ if (obj->is_auth()) {
+ bool ni = false;
+ xlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ ceph_assert(lock->get_sm()->can_remote_xlock);
+ slaves.insert(obj->authority().first);
+ lock->put_xlock();
+ mut->locks.erase(it++);
+ }
+ } else if (it->is_wrlock() || it->is_remote_wrlock()) {
+ if (it->is_remote_wrlock()) {
+ slaves.insert(it->wrlock_target);
+ it->clear_remote_wrlock();
+ }
+ if (it->is_wrlock()) {
+ bool ni = false;
+ wrlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ mut->locks.erase(it++);
+ }
+ } else if (drop_rdlocks && it->is_rdlock()) {
+ bool ni = false;
+ rdlock_finish(it++, mut, &ni);
+ if (ni)
+ pneed_issue->insert(static_cast<CInode*>(obj));
+ } else {
+ ++it;
+ }
}
for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
if (mut->locking)
cancel_locking(mut, pneed_issue);
- _drop_non_rdlocks(mut, pneed_issue);
- _drop_rdlocks(mut, pneed_issue);
+ _drop_locks(mut, pneed_issue, true);
if (pneed_issue == &my_need_issue)
issue_caps_set(*pneed_issue);
if (!pneed_issue)
pneed_issue = &my_need_issue;
- _drop_non_rdlocks(mut, pneed_issue);
+ _drop_locks(mut, pneed_issue, false);
if (pneed_issue == &my_need_issue)
issue_caps_set(*pneed_issue);
{
set<CInode*> need_issue;
- for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
- SimpleLock *lock = *p;
- ++p;
+ for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+ if (!it->is_rdlock()) {
+ ++it;
+ continue;
+ }
+ SimpleLock *lock = it->lock;
// make later mksnap/setlayout (at other mds) wait for this unsafe request
if (lock->get_type() == CEPH_LOCK_ISNAP ||
- lock->get_type() == CEPH_LOCK_IPOLICY)
+ lock->get_type() == CEPH_LOCK_IPOLICY) {
+ ++it;
continue;
+ }
bool ni = false;
- rdlock_finish(lock, mut, &ni);
+ rdlock_finish(it++, mut, &ni);
if (ni)
need_issue.insert(static_cast<CInode*>(lock->get_parent()));
}
// can read? grab ref.
if (lock->can_rdlock(client)) {
lock->get_rdlock();
- mut->rdlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
return true;
}
mds->mdlog->flush();
}
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_rdlock());
+ SimpleLock *lock = it->lock;
// drop ref
lock->put_rdlock();
- if (mut) {
- mut->rdlocks.erase(lock);
- mut->locks.erase(lock);
- }
+ if (mut)
+ mut->locks.erase(it);
dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
}
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
{
- dout(10) << "can_rdlock_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
- if (!(*p)->can_rdlock(-1)) {
- dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+ dout(10) << "can_rdlock_set " << dendl;
+ for (const auto& p : lov) {
+ ceph_assert(p.is_rdlock());
+ if (!p.lock->can_rdlock(-1)) {
+ dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
return false;
}
+ }
return true;
}
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
- dout(10) << "rdlock_try_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
- if (!rdlock_try(*p, -1, NULL)) {
- dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
- return false;
- }
- return true;
-}
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
{
- dout(10) << "rdlock_take_set " << locks << dendl;
- for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
- (*p)->get_rdlock();
- mut->rdlocks.insert(*p);
- mut->locks.insert(*p);
+ dout(10) << "rdlock_take_set " << dendl;
+ for (const auto& p : lov) {
+ ceph_assert(p.is_rdlock());
+ p.lock->get_rdlock();
+ mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
}
}
dout(7) << "wrlock_force on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->get_wrlock(true);
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
}
bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
if (lock->can_wrlock(client) &&
(!want_scatter || lock->get_state() == LOCK_MIX)) {
lock->get_wrlock();
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
return true;
}
return false;
}
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_wrlock());
+ SimpleLock* lock = it->lock;
+
if (lock->get_type() == CEPH_LOCK_IVERSION ||
lock->get_type() == CEPH_LOCK_DVERSION)
- return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+ return local_wrlock_finish(it, mut);
dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
lock->put_wrlock();
- if (mut) {
- mut->wrlocks.erase(lock);
- if (mut->remote_wrlocks.count(lock) == 0)
- mut->locks.erase(lock);
- }
+
+ if (it->is_remote_wrlock())
+ it->clear_wrlock();
+ else
+ mut->locks.erase(it);
if (!lock->is_wrlocked()) {
if (!lock->is_stable())
mut->more()->waiting_on_slave.insert(target);
}
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
- MutationImpl *mut)
+void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
- // drop ref
- mut->remote_wrlocks.erase(lock);
- if (mut->wrlocks.count(lock) == 0)
- mut->locks.erase(lock);
+ ceph_assert(it->is_remote_wrlock());
+ SimpleLock *lock = it->lock;
+ mds_rank_t target = it->wrlock_target;
+
+ if (it->is_wrlock())
+ it->clear_remote_wrlock();
+ else
+ mut->locks.erase(it);
dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
<< " " << *lock->get_parent() << dendl;
if (lock->can_xlock(client)) {
lock->set_state(LOCK_XLOCK);
lock->get_xlock(mut, client);
- mut->xlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
mut->finish_locking(lock);
return true;
}
eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
}
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
{
+ ceph_assert(it->is_xlock());
+ SimpleLock *lock = it->lock;
+
if (lock->get_type() == CEPH_LOCK_IVERSION ||
lock->get_type() == CEPH_LOCK_DVERSION)
- return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+ return local_xlock_finish(it, mut);
dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
// drop ref
lock->put_xlock();
ceph_assert(mut);
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
bool do_issue = false;
}
}
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_xlock());
+ SimpleLock *lock = it->lock;
dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
lock->put_xlock();
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
MDSCacheObject *p = lock->get_parent();
ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH)); // we are exporting this (inode)
// forcefully take a wrlock
lock->get_wrlock(true);
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
in->pre_cow_old_inode(); // avoid cow mayhem
ceph_assert(lock->get_parent()->is_auth());
ceph_assert(lock->can_wrlock());
- ceph_assert(!mut->wrlocks.count(lock));
lock->get_wrlock(mut->get_client());
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+
+ auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+ ceph_assert(ret.second);
}
bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
ceph_assert(lock->get_parent()->is_auth());
if (lock->can_wrlock()) {
- ceph_assert(!mut->wrlocks.count(lock));
lock->get_wrlock(mut->get_client());
- mut->wrlocks.insert(lock);
- mut->locks.insert(lock);
+ auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+ ceph_assert(it->is_wrlock());
return true;
} else {
lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
}
}
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_wrlock());
+ LocalLock *lock = static_cast<LocalLock*>(it->lock);
dout(7) << "local_wrlock_finish on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->put_wrlock();
- mut->wrlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
if (lock->get_num_wrlocks() == 0) {
lock->finish_waiters(SimpleLock::WAIT_STABLE |
SimpleLock::WAIT_WR |
}
lock->get_xlock(mut, mut->get_client());
- mut->xlocks.insert(lock);
- mut->locks.insert(lock);
+ mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
return true;
}
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
{
+ ceph_assert(it->is_xlock());
+ LocalLock *lock = static_cast<LocalLock*>(it->lock);
dout(7) << "local_xlock_finish on " << *lock
<< " on " << *lock->get_parent() << dendl;
lock->put_xlock();
- mut->xlocks.erase(lock);
- mut->locks.erase(lock);
+ mut->locks.erase(it);
lock->finish_waiters(SimpleLock::WAIT_STABLE |
SimpleLock::WAIT_WR |
void send_lock_message(SimpleLock *lock, int msg, const bufferlist &data);
// -- locks --
- void _drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue);
- void _drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue);
+ void _drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue, bool drop_rdlocks);
public:
- void include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in);
- void include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
- file_layout_t **layout);
+ void include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov);
+ void include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
+ file_layout_t **layout);
bool acquire_locks(MDRequestRef& mdr,
- set<SimpleLock*> &rdlocks,
- set<SimpleLock*> &wrlocks,
- set<SimpleLock*> &xlocks,
- map<SimpleLock*,mds_rank_t> *remote_wrlocks=NULL,
+ MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze=NULL,
bool auth_pin_nonblock=false);
bool _rdlock_kick(SimpleLock *lock, bool as_anon);
bool rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *c);
bool rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon=false);
- void rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
- bool can_rdlock_set(set<SimpleLock*>& locks);
- bool rdlock_try_set(set<SimpleLock*>& locks);
- void rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut);
+ void rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
+ bool can_rdlock_set(MutationImpl::LockOpVec& lov);
+ void rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut);
void wrlock_force(SimpleLock *lock, MutationRef& mut);
bool wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait=false);
- void wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
+ void wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
void remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut);
- void remote_wrlock_finish(SimpleLock *lock, mds_rank_t target, MutationImpl *mut);
+ void remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
bool xlock_start(SimpleLock *lock, MDRequestRef& mut);
void _finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue);
- void xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
+ void xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
- void xlock_export(SimpleLock *lock, MutationImpl *mut);
+ void xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut);
void xlock_import(SimpleLock *lock);
void local_wrlock_grab(LocalLock *lock, MutationRef& mut);
protected:
bool local_wrlock_start(LocalLock *lock, MDRequestRef& mut);
- void local_wrlock_finish(LocalLock *lock, MutationImpl *mut);
+ void local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
bool local_xlock_start(LocalLock *lock, MDRequestRef& mut);
- void local_xlock_finish(LocalLock *lock, MutationImpl *mut);
+ void local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
// file
pf->version = parent->pre_dirty();
if (do_parent_mtime || linkunlink) {
- ceph_assert(mut->wrlocks.count(&pin->filelock));
- ceph_assert(mut->wrlocks.count(&pin->nestlock));
+ ceph_assert(mut->is_wrlocked(&pin->filelock));
+ ceph_assert(mut->is_wrlocked(&pin->nestlock));
ceph_assert(cfollows == CEPH_NOSNAP);
// update stale fragstat/rstat?
ceph_assert(pin->nestlock.get_num_wrlocks() || mut->is_slave());
}
- if (mut->wrlocks.count(&pin->nestlock) == 0) {
+ if (!mut->is_wrlocked(&pin->nestlock)) {
dout(10) << " taking wrlock on " << pin->nestlock << " on " << *pin << dendl;
mds->locker->wrlock_force(&pin->nestlock, mut);
}
// can cast only because i'm passing nowait=true in the sole user
MDRequestRef mdmut = static_cast<MDRequestImpl*>(mut.get());
if (!stop &&
- mut->wrlocks.count(&pin->nestlock) == 0 &&
+ !mut->is_wrlocked(&pin->nestlock) &&
(!pin->versionlock.can_wrlock() || // make sure we can take versionlock, too
//true
!mds->locker->wrlock_start(&pin->nestlock, mdmut, true)
}
break;
}
- if (!mut->wrlocks.count(&pin->versionlock))
+ if (!mut->is_wrlocked(&pin->versionlock))
mds->locker->local_wrlock_grab(&pin->versionlock, mut);
- ceph_assert(mut->wrlocks.count(&pin->nestlock) ||
- mut->is_slave());
+ ceph_assert(mut->is_wrlocked(&pin->nestlock) || mut->is_slave());
pin->last_dirstat_prop = mut->get_mds_stamp();
if (mdr->is_slave())
continue;
// auth pins
- for (map<MDSCacheObject*,mds_rank_t>::iterator q = mdr->remote_auth_pins.begin();
- q != mdr->remote_auth_pins.end();
- ++q) {
- if (!q->first->is_auth()) {
- ceph_assert(q->second == q->first->authority().first);
- if (rejoins.count(q->second) == 0) continue;
- const MMDSCacheRejoin::ref &rejoin = rejoins[q->second];
+ for (const auto& q : mdr->remote_auth_pins) {
+ if (!q.first->is_auth()) {
+ ceph_assert(q.second == q.first->authority().first);
+ if (rejoins.count(q.second) == 0) continue;
+ const MMDSCacheRejoin::ref &rejoin = rejoins[q.second];
- dout(15) << " " << *mdr << " authpin on " << *q->first << dendl;
+ dout(15) << " " << *mdr << " authpin on " << *q.first << dendl;
MDSCacheObjectInfo i;
- q->first->set_object_info(i);
+ q.first->set_object_info(i);
if (i.ino)
rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), mdr->reqid, mdr->attempt);
else
rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, mdr->reqid, mdr->attempt);
if (mdr->has_more() && mdr->more()->is_remote_frozen_authpin &&
- mdr->more()->rename_inode == q->first)
+ mdr->more()->rename_inode == q.first)
rejoin->add_inode_frozen_authpin(vinodeno_t(i.ino, i.snapid),
mdr->reqid, mdr->attempt);
}
}
// xlocks
- for (set<SimpleLock*>::iterator q = mdr->xlocks.begin();
- q != mdr->xlocks.end();
- ++q) {
- if (!(*q)->get_parent()->is_auth()) {
- mds_rank_t who = (*q)->get_parent()->authority().first;
+ for (const auto& q : mdr->locks) {
+ auto lock = q.lock;
+ auto obj = lock->get_parent();
+ if (q.is_xlock() && !obj->is_auth()) {
+ mds_rank_t who = obj->authority().first;
if (rejoins.count(who) == 0) continue;
const MMDSCacheRejoin::ref &rejoin = rejoins[who];
- dout(15) << " " << *mdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
+ dout(15) << " " << *mdr << " xlock on " << *lock << " " << *obj << dendl;
MDSCacheObjectInfo i;
- (*q)->get_parent()->set_object_info(i);
+ obj->set_object_info(i);
if (i.ino)
- rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(),
+ rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), lock->get_type(),
mdr->reqid, mdr->attempt);
else
rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid,
mdr->reqid, mdr->attempt);
- }
- }
- // remote wrlocks
- for (map<SimpleLock*, mds_rank_t>::iterator q = mdr->remote_wrlocks.begin();
- q != mdr->remote_wrlocks.end();
- ++q) {
- mds_rank_t who = q->second;
- if (rejoins.count(who) == 0) continue;
- const MMDSCacheRejoin::ref &rejoin = rejoins[who];
+ } else if (q.is_remote_wrlock()) {
+ mds_rank_t who = q.wrlock_target;
+ if (rejoins.count(who) == 0) continue;
+ const MMDSCacheRejoin::ref &rejoin = rejoins[who];
- dout(15) << " " << *mdr << " wrlock on " << q->second
- << " " << q->first->get_parent() << dendl;
- MDSCacheObjectInfo i;
- q->first->get_parent()->set_object_info(i);
- ceph_assert(i.ino);
- rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), q->first->get_type(),
- mdr->reqid, mdr->attempt);
+ dout(15) << " " << *mdr << " wrlock on " << *lock << " " << *obj << dendl;
+ MDSCacheObjectInfo i;
+ obj->set_object_info(i);
+ ceph_assert(i.ino);
+ rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), lock->get_type(),
+ mdr->reqid, mdr->attempt);
+ }
}
}
}
}
// dn xlock?
- const auto xlocked_dentries_it = strong->xlocked_dentries.find(dirfrag);
- if (xlocked_dentries_it != strong->xlocked_dentries.end()) {
- const auto ss_req_it = xlocked_dentries_it->second.find(ss);
- if (ss_req_it != xlocked_dentries_it->second.end()) {
+ const auto xlocked_it = strong->xlocked_dentries.find(dirfrag);
+ if (xlocked_it != strong->xlocked_dentries.end()) {
+ const auto ss_req_it = xlocked_it->second.find(ss);
+ if (ss_req_it != xlocked_it->second.end()) {
const MMDSCacheRejoin::slave_reqid& r = ss_req_it->second;
dout(10) << " dn xlock by " << r << " on " << *dn << dendl;
MDRequestRef mdr = request_get(r.reqid); // should have this from auth_pin above.
ceph_assert(mdr->is_auth_pinned(dn));
- if (!mdr->xlocks.count(&dn->versionlock)) {
+ if (!mdr->is_xlocked(&dn->versionlock)) {
ceph_assert(dn->versionlock.can_xlock_local());
dn->versionlock.get_xlock(mdr, mdr->get_client());
- mdr->xlocks.insert(&dn->versionlock);
- mdr->locks.insert(&dn->versionlock);
+ mdr->locks.emplace(&dn->versionlock, MutationImpl::LockOp::XLOCK);
}
if (dn->lock.is_stable())
dn->auth_pin(&dn->lock);
dn->lock.set_state(LOCK_XLOCK);
dn->lock.get_xlock(mdr, mdr->get_client());
- mdr->xlocks.insert(&dn->lock);
- mdr->locks.insert(&dn->lock);
+ mdr->locks.emplace(&dn->lock, MutationImpl::LockOp::XLOCK);
}
}
dout(10) << " inode xlock by " << q.second << " on " << *lock << " on " << *in << dendl;
MDRequestRef mdr = request_get(q.second.reqid); // should have this from auth_pin above.
ceph_assert(mdr->is_auth_pinned(in));
- if (!mdr->xlocks.count(&in->versionlock)) {
+ if (!mdr->is_xlocked(&in->versionlock)) {
ceph_assert(in->versionlock.can_xlock_local());
in->versionlock.get_xlock(mdr, mdr->get_client());
- mdr->xlocks.insert(&in->versionlock);
- mdr->locks.insert(&in->versionlock);
+ mdr->locks.emplace(&in->versionlock, MutationImpl::LockOp::XLOCK);
}
if (lock->is_stable())
in->auth_pin(lock);
if (lock == &in->filelock)
in->loner_cap = -1;
lock->get_xlock(mdr, mdr->get_client());
- mdr->xlocks.insert(lock);
- mdr->locks.insert(lock);
+ mdr->locks.emplace(lock, MutationImpl::LockOp::XLOCK);
}
}
}
if (lock == &in->filelock)
in->loner_cap = -1;
lock->get_wrlock(true);
- mdr->wrlocks.insert(lock);
- mdr->locks.insert(lock);
+ mdr->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
}
}
}
* implicitly. Note that we don't call the finishers -- there shouldn't
* be any on a remote lock and the request finish wakes up all
* the waiters anyway! */
- set<SimpleLock*>::iterator p = mdr->xlocks.begin();
- while (p != mdr->xlocks.end()) {
- if ((*p)->get_parent()->is_auth())
- ++p;
- else {
- dout(10) << "request_drop_foreign_locks forgetting lock " << **p
- << " on " << *(*p)->get_parent() << dendl;
- (*p)->put_xlock();
- mdr->locks.erase(*p);
- mdr->xlocks.erase(p++);
- }
- }
- map<SimpleLock*, mds_rank_t>::iterator q = mdr->remote_wrlocks.begin();
- while (q != mdr->remote_wrlocks.end()) {
- dout(10) << "request_drop_foreign_locks forgetting remote_wrlock " << *q->first
- << " on mds." << q->second
- << " on " << *(q->first)->get_parent() << dendl;
- mdr->locks.erase(q->first);
- mdr->remote_wrlocks.erase(q++);
+ for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
+ SimpleLock *lock = it->lock;
+ if (it->is_xlock() && !lock->get_parent()->is_auth()) {
+ dout(10) << "request_drop_foreign_locks forgetting lock " << *lock
+ << " on " << lock->get_parent() << dendl;
+ lock->put_xlock();
+ mdr->locks.erase(it++);
+ } else if (it->is_remote_wrlock()) {
+ dout(10) << "request_drop_foreign_locks forgetting remote_wrlock " << *lock
+ << " on mds." << it->wrlock_target << " on " << *lock->get_parent() << dendl;
+ if (it->is_wrlock()) {
+ it->clear_remote_wrlock();
+ ++it;
+ } else {
+ mdr->locks.erase(it++);
+ }
+ } else {
+ ++it;
+ }
}
mdr->more()->slaves.clear(); /* we no longer have requests out to them, and
mdr->drop_local_auth_pins();
// drop stickydirs
- for (set<CInode*>::iterator p = mdr->stickydirs.begin();
- p != mdr->stickydirs.end();
- ++p)
- (*p)->put_stickydirs();
+ mdr->put_stickydirs();
mds->locker->kick_cap_releases(mdr);
dout(10) << "dispatch_fragment_dir " << basedirfrag << " bits " << info.bits
<< " on " << *diri << dendl;
if (!mdr->aborted) {
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- wrlocks.insert(&diri->dirfragtreelock);
+ MutationImpl::LockOpVec lov;
+ lov.add_wrlock(&diri->dirfragtreelock);
// prevent a racing gather on any other scatterlocks too
- wrlocks.insert(&diri->nestlock);
- wrlocks.insert(&diri->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true))
+ lov.add_wrlock(&diri->nestlock);
+ lov.add_wrlock(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov, NULL, true))
if (!mdr->aborted)
return;
}
void MDCache::enqueue_scrub_work(MDRequestRef& mdr)
{
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ MutationImpl::LockOpVec lov;
+ CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, lov, true);
if (NULL == in)
return;
// TODO: Remove this restriction
ceph_assert(in->is_auth());
- bool locked = mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks);
+ bool locked = mds->locker->acquire_locks(mdr, lov);
if (!locked)
return;
mdr->auth_pin(dir);
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
CInode *diri = dir->inode;
- rdlocks.insert(&diri->dirfragtreelock);
- wrlocks.insert(&diri->nestlock);
- wrlocks.insert(&diri->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->dirfragtreelock);
+ lov.add_wrlock(&diri->nestlock);
+ lov.add_wrlock(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!dir->is_complete()) {
return;
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
std::list<frag_t> frags;
if (mdr->ls) // already marked filelock/nestlock dirty ?
goto do_rdlocks;
- rdlocks.insert(&diri->dirfragtreelock);
- wrlocks.insert(&diri->nestlock);
- wrlocks.insert(&diri->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->dirfragtreelock);
+ lov.add_wrlock(&diri->nestlock);
+ lov.add_wrlock(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
// Fetch all dirfrags and mark filelock/nestlock dirty. This will tirgger
do_rdlocks:
// force the scatter-gather process
- rdlocks.insert(&diri->dirfragtreelock);
- rdlocks.insert(&diri->nestlock);
- rdlocks.insert(&diri->filelock);
- wrlocks.clear();
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.clear();
+ lov.add_rdlock(&diri->dirfragtreelock);
+ lov.add_rdlock(&diri->nestlock);
+ lov.add_rdlock(&diri->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
diri->state_clear(CInode::STATE_REPAIRSTATS);
return;
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- mds->locker->include_snap_rdlocks(rdlocks, in);
- rdlocks.erase(&in->snaplock);
- xlocks.insert(&in->snaplock);
+ MutationImpl::LockOpVec lov;
+ mds->locker->include_snap_rdlocks(in, lov);
+ lov.erase_rdlock(&in->snaplock);
+ lov.add_xlock(&in->snaplock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
// project_snaprealm() upgrades snaprealm format
void MDCache::flush_dentry_work(MDRequestRef& mdr)
{
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ MutationImpl::LockOpVec lov;
+ CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, lov, true);
if (NULL == in)
return;
// TODO: Is this necessary? Fix it if so
ceph_assert(in->is_auth());
- bool locked = mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks);
+ bool locked = mds->locker->acquire_locks(mdr, lov);
if (!locked)
return;
in->flush(new C_FinishIOMDR(mds, mdr));
};
-void Migrator::get_export_lock_set(CDir *dir, set<SimpleLock*>& locks)
+void Migrator::get_export_lock_set(CDir *dir, MutationImpl::LockOpVec& lov)
{
// path
vector<CDentry*> trace;
cache->make_trace(trace, dir->inode);
- for (vector<CDentry*>::iterator it = trace.begin();
- it != trace.end();
- ++it)
- locks.insert(&(*it)->lock);
+
+ set<CDir*> wouldbe_bounds;
+ cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
+
+ lov.reserve(trace.size() + wouldbe_bounds.size() + 8);
+
+ for (auto& dn : trace)
+ lov.add_rdlock(&dn->lock);
// prevent scatter gather race
- locks.insert(&dir->get_inode()->dirfragtreelock);
+ lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
// bound dftlocks:
// NOTE: We need to take an rdlock on bounding dirfrags during
// redivvy it up. And it's needed for the scatterlocks to work
// properly: when the auth is in a sync/lock state it keeps each
// dirfrag's portion in the local (auth OR replica) dirfrag.
- set<CDir*> wouldbe_bounds;
- cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
- for (set<CDir*>::iterator p = wouldbe_bounds.begin(); p != wouldbe_bounds.end(); ++p)
- locks.insert(&(*p)->get_inode()->dirfragtreelock);
+ for (auto& dir : wouldbe_bounds)
+ lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
+
+ // above code may add duplicated locks
+ lov.sort_and_merge();
}
}
// locks?
- set<SimpleLock*> rdlocks;
- set<SimpleLock*> xlocks;
- set<SimpleLock*> wrlocks;
- get_export_lock_set(dir, rdlocks);
+ MutationImpl::LockOpVec lov;
+ get_export_lock_set(dir, lov);
// If auth MDS of the subtree root inode is neither the exporter MDS
// nor the importer MDS and it gathers subtree root's fragstat/neststat
// while the subtree is exporting. It's possible that the exporter MDS
// are not auth MDS of the subtree root at the time they receive the
// lock messages. So the auth MDS of the subtree root inode may get no
// or duplicated fragstat/neststat for the subtree root dirfrag.
- wrlocks.insert(&dir->get_inode()->filelock);
- wrlocks.insert(&dir->get_inode()->nestlock);
+ lov.add_wrlock(&dir->get_inode()->filelock);
+ lov.add_wrlock(&dir->get_inode()->nestlock);
if (dir->get_inode()->is_auth()) {
dir->get_inode()->filelock.set_scatter_wanted();
dir->get_inode()->nestlock.set_scatter_wanted();
}
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true)) {
+ if (!mds->locker->acquire_locks(mdr, lov, NULL, true)) {
if (mdr->aborted)
export_try_cancel(dir);
return;
CInode *diri = dir->get_inode();
// ok, try to grab all my locks.
- set<SimpleLock*> rdlocks;
- get_export_lock_set(dir, rdlocks);
+ MutationImpl::LockOpVec lov;
+ get_export_lock_set(dir, lov);
if ((diri->is_auth() && diri->is_frozen()) ||
- !mds->locker->can_rdlock_set(rdlocks) ||
+ !mds->locker->can_rdlock_set(lov) ||
!diri->filelock.can_wrlock(-1) ||
!diri->nestlock.can_wrlock(-1)) {
dout(7) << "export_dir couldn't acquire all needed locks, failing. "
it->second.mut = new MutationImpl();
if (diri->is_auth())
it->second.mut->auth_pin(diri);
- mds->locker->rdlock_take_set(rdlocks, it->second.mut);
+ mds->locker->rdlock_take_set(lov, it->second.mut);
mds->locker->wrlock_force(&diri->filelock, it->second.mut);
mds->locker->wrlock_force(&diri->nestlock, it->second.mut);
void adjust_export_after_rename(CInode* diri, CDir *olddir);
void child_export_finish(std::shared_ptr<export_base_t>& parent, bool success);
- void get_export_lock_set(CDir *dir, set<SimpleLock*>& locks);
+ void get_export_lock_set(CDir *dir, MutationImpl::LockOpVec& lov);
void get_export_client_set(CDir *dir, set<client_t> &client_set);
void get_export_client_set(CInode *in, set<client_t> &client_set);
void MutationImpl::set_stickydirs(CInode *in)
{
- if (stickydirs.count(in) == 0) {
+ if (!stickydiri || stickydiri != in) {
in->get_stickydirs();
- stickydirs.insert(in);
+ if (stickydiri)
+ stickydiri->put_stickydirs();
+ stickydiri = in;
+ }
+}
+
+void MutationImpl::put_stickydirs()
+{
+ if (stickydiri) {
+ stickydiri->put_stickydirs();
+ stickydiri = nullptr;
+
}
}
void MutationImpl::drop_pins()
{
- for (set<MDSCacheObject*>::iterator it = pins.begin();
- it != pins.end();
- ++it)
- (*it)->put(MDSCacheObject::PIN_REQUEST);
+ for (auto& o : pins)
+ o->put(MDSCacheObject::PIN_REQUEST);
pins.clear();
}
locking_target_mds = -1;
}
+void MutationImpl::LockOpVec::erase_rdlock(SimpleLock* lock)
+{
+ for (int i = size() - 1; i >= 0; --i) {
+ auto& op = (*this)[i];
+ if (op.lock == lock && op.is_rdlock()) {
+ erase(begin() + i);
+ return;
+ }
+ }
+}
+
+void MutationImpl::LockOpVec::sort_and_merge()
+{
+ std::sort(begin(), end(), SimpleLock::ptr_lt());
+ // merge ops on the same lock
+ for (auto i = end() - 1; i > begin(); ) {
+ auto j = i;
+ while (--j >= begin()) {
+ if (i->lock != j->lock)
+ break;
+ }
+ if (i - j == 1) {
+ i = j;
+ continue;
+ }
+
+ // merge
+ ++j;
+ for (auto k = i; k > j; --k) {
+ if (k->is_remote_wrlock()) {
+ ceph_assert(!j->is_remote_wrlock());
+ j->wrlock_target = k->wrlock_target;
+ }
+ j->flags |= k->flags;
+ }
+ if (j->is_xlock()) {
+ // xlock overwrites other types
+ ceph_assert(!j->is_remote_wrlock());
+ j->flags = MutationImpl::LockOp::XLOCK;
+ }
+ erase(j + 1, i + 1);
+ i = j - 1;
+ }
+}
// auth pins
bool MutationImpl::is_auth_pinned(MDSCacheObject *object) const
void MutationImpl::drop_local_auth_pins()
{
- for (set<MDSCacheObject*>::iterator it = auth_pins.begin();
- it != auth_pins.end();
- ++it) {
- ceph_assert((*it)->is_auth());
- (*it)->auth_unpin(this);
+ for (const auto& p : auth_pins) {
+ ceph_assert(p->is_auth());
+ p->auth_unpin(this);
}
auth_pins.clear();
}
// -- my pins and locks --
// cache pins (so things don't expire)
set< MDSCacheObject* > pins;
- set<CInode*> stickydirs;
+ CInode* stickydiri = nullptr;
// auth pins
map<MDSCacheObject*, mds_rank_t> remote_auth_pins;
- set< MDSCacheObject* > auth_pins;
+ set<MDSCacheObject*> auth_pins;
// held locks
- set< SimpleLock* > rdlocks; // always local.
- set< SimpleLock* > wrlocks; // always local.
- map< SimpleLock*, mds_rank_t > remote_wrlocks;
- set< SimpleLock* > xlocks; // local or remote.
- set< SimpleLock*, SimpleLock::ptr_lt > locks; // full ordering
+ struct LockOp {
+ enum {
+ RDLOCK = 1,
+ WRLOCK = 2,
+ XLOCK = 4,
+ REMOTE_WRLOCK = 8,
+ };
+ SimpleLock* lock;
+ mutable unsigned flags;
+ mutable mds_rank_t wrlock_target;
+ operator SimpleLock*() const {
+ return lock;
+ }
+ LockOp(SimpleLock *l, unsigned f=0, mds_rank_t t=MDS_RANK_NONE) :
+ lock(l), flags(f), wrlock_target(t) {}
+ bool is_rdlock() const { return !!(flags & RDLOCK); }
+ bool is_xlock() const { return !!(flags & XLOCK); }
+ bool is_wrlock() const { return !!(flags & WRLOCK); }
+ void clear_wrlock() const { flags &= ~WRLOCK; }
+ bool is_remote_wrlock() const { return !!(flags & REMOTE_WRLOCK); }
+ void clear_remote_wrlock() const {
+ flags &= ~REMOTE_WRLOCK;
+ wrlock_target = MDS_RANK_NONE;
+ }
+ };
+
+ struct LockOpVec : public vector<LockOp> {
+ void add_rdlock(SimpleLock *lock) {
+ emplace_back(lock, LockOp::RDLOCK);
+ }
+ void erase_rdlock(SimpleLock *lock);
+ void add_xlock(SimpleLock *lock) {
+ emplace_back(lock, LockOp::XLOCK);
+ }
+ void add_wrlock(SimpleLock *lock) {
+ emplace_back(lock, LockOp::WRLOCK);
+ }
+ void add_remote_wrlock(SimpleLock *lock, mds_rank_t rank) {
+ ceph_assert(rank != MDS_RANK_NONE);
+ emplace_back(lock, LockOp::REMOTE_WRLOCK, rank);
+ }
+ void sort_and_merge();
+
+ LockOpVec() {
+ reserve(32);
+ }
+ };
+ typedef set<LockOp, SimpleLock::ptr_lt> lock_set;
+ typedef lock_set::iterator lock_iterator;
+ lock_set locks; // full ordering
+
+ bool is_rdlocked(SimpleLock *lock) const {
+ auto it = locks.find(lock);
+ return it != locks.end() && it->is_rdlock();
+ }
+ bool is_xlocked(SimpleLock *lock) const {
+ auto it = locks.find(lock);
+ return it != locks.end() && it->is_xlock();
+ }
+ bool is_wrlocked(SimpleLock *lock) const {
+ auto it = locks.find(lock);
+ return it != locks.end() && it->is_wrlock();
+ }
+ bool is_remote_wrlocked(SimpleLock *lock) const {
+ auto it = locks.find(lock);
+ return it != locks.end() && it->is_remote_wrlock();
+ }
// lock we are currently trying to acquire. if we give up for some reason,
// be sure to eval() this.
ceph_assert(locking == NULL);
ceph_assert(pins.empty());
ceph_assert(auth_pins.empty());
- ceph_assert(xlocks.empty());
- ceph_assert(rdlocks.empty());
- ceph_assert(wrlocks.empty());
- ceph_assert(remote_wrlocks.empty());
}
bool is_master() const { return slave_to_mds == MDS_RANK_NONE; }
void pin(MDSCacheObject *o);
void unpin(MDSCacheObject *o);
void set_stickydirs(CInode *in);
+ void put_stickydirs();
void drop_pins();
void start_locking(SimpleLock *lock, int target=-1);
mdr->more()->slaves.insert(from);
lock->decode_locked_state(m->get_lock_data());
dout(10) << "got remote xlock on " << *lock << " on " << *lock->get_parent() << dendl;
- mdr->xlocks.insert(lock);
- mdr->locks.insert(lock);
+ mdr->locks.emplace_hint(mdr->locks.end(), lock, MutationImpl::LockOp::XLOCK);
mdr->finish_locking(lock);
lock->get_xlock(mdr, mdr->get_client());
m->get_object_info());
mdr->more()->slaves.insert(from);
dout(10) << "got remote wrlock on " << *lock << " on " << *lock->get_parent() << dendl;
- mdr->remote_wrlocks[lock] = from;
- mdr->locks.insert(lock);
+ auto it = mdr->locks.emplace_hint(mdr->locks.end(),
+ lock, MutationImpl::LockOp::REMOTE_WRLOCK, from);
+ ceph_assert(it->is_remote_wrlock());
+ ceph_assert(it->wrlock_target == from);
+
mdr->finish_locking(lock);
ceph_assert(mdr->more()->waiting_on_slave.count(from));
<< *lock << " on " << *lock->get_parent() << dendl;
} else {
// use acquire_locks so that we get auth_pinning.
- set<SimpleLock*> rdlocks;
- set<SimpleLock*> wrlocks = mdr->wrlocks;
- set<SimpleLock*> xlocks = mdr->xlocks;
+ MutationImpl::LockOpVec lov;
+ for (const auto& p : mdr->locks) {
+ if (p.is_xlock())
+ lov.add_xlock(p.lock);
+ else if (p.is_wrlock())
+ lov.add_wrlock(p.lock);
+ }
int replycode = 0;
switch (op) {
case MMDSSlaveRequest::OP_XLOCK:
- xlocks.insert(lock);
+ lov.add_xlock(lock);
replycode = MMDSSlaveRequest::OP_XLOCKACK;
break;
case MMDSSlaveRequest::OP_WRLOCK:
- wrlocks.insert(lock);
+ lov.add_wrlock(lock);
replycode = MMDSSlaveRequest::OP_WRLOCKACK;
break;
}
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
// ack
SimpleLock *lock = mds->locker->get_lock(mdr->slave_request->get_lock_type(),
mdr->slave_request->get_object_info());
ceph_assert(lock);
+ auto it = mdr->locks.find(lock);
+ ceph_assert(it != mdr->locks.end());
bool need_issue = false;
switch (op) {
case MMDSSlaveRequest::OP_UNXLOCK:
- mds->locker->xlock_finish(lock, mdr.get(), &need_issue);
+ mds->locker->xlock_finish(it, mdr.get(), &need_issue);
break;
case MMDSSlaveRequest::OP_UNWRLOCK:
- mds->locker->wrlock_finish(lock, mdr.get(), &need_issue);
+ mds->locker->wrlock_finish(it, mdr.get(), &need_issue);
break;
}
if (need_issue)
auto reply = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
// return list of my auth_pins (if any)
- for (set<MDSCacheObject*>::iterator p = mdr->auth_pins.begin();
- p != mdr->auth_pins.end();
- ++p) {
+ for (const auto &p : mdr->auth_pins) {
MDSCacheObjectInfo info;
- (*p)->set_object_info(info);
+ p->set_object_info(info);
reply->get_authpins().push_back(info);
- if (*p == (MDSCacheObject*)auth_pin_freeze)
+ if (p == (MDSCacheObject*)auth_pin_freeze)
auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
}
}
// removed auth pins?
- map<MDSCacheObject*, mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
+ auto p = mdr->remote_auth_pins.begin();
while (p != mdr->remote_auth_pins.end()) {
MDSCacheObject* object = p->first;
if (p->second == from && pinned.count(object) == 0) {
/* If this returns null, the request has been handled
* as appropriate: forwarded on, or the client's been replied to */
CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
- set<SimpleLock*> &rdlocks,
+ MutationImpl::LockOpVec& lov,
bool want_auth,
bool no_want_auth, /* for readdir, who doesn't want auth _even_if_ it's
a snapped dir */
}
for (int i=0; i<(int)mdr->dn[n].size(); i++)
- rdlocks.insert(&mdr->dn[n][i]->lock);
+ lov.add_rdlock(&mdr->dn[n][i]->lock);
if (layout)
- mds->locker->include_snap_rdlocks_wlayout(rdlocks, ref, layout);
+ mds->locker->include_snap_rdlocks_wlayout(ref, lov, layout);
else
- mds->locker->include_snap_rdlocks(rdlocks, ref);
+ mds->locker->include_snap_rdlocks(ref, lov);
// set and pin ref
mdr->pin(ref);
* get rdlocks on traversed dentries, xlock on new dentry.
*/
CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
- set<SimpleLock*>& rdlocks, set<SimpleLock*>& wrlocks, set<SimpleLock*>& xlocks,
+ MutationImpl::LockOpVec& lov,
bool okexist, bool mustexist, bool alwaysxlock,
file_layout_t **layout)
{
// -- lock --
// NOTE: rename takes the same set of locks for srcdn
for (int i=0; i<(int)mdr->dn[n].size(); i++)
- rdlocks.insert(&mdr->dn[n][i]->lock);
+ lov.add_rdlock(&mdr->dn[n][i]->lock);
if (alwaysxlock || dnl->is_null())
- xlocks.insert(&dn->lock); // new dn, xlock
+ lov.add_xlock(&dn->lock); // new dn, xlock
else
- rdlocks.insert(&dn->lock); // existing dn, rdlock
- wrlocks.insert(&dn->get_dir()->inode->filelock); // also, wrlock on dir mtime
- wrlocks.insert(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
+ lov.add_rdlock(&dn->lock); // existing dn, rdlock
+ lov.add_wrlock(&dn->get_dir()->inode->filelock); // also, wrlock on dir mtime
+ lov.add_wrlock(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
if (layout)
- mds->locker->include_snap_rdlocks_wlayout(rdlocks, dn->get_dir()->inode, layout);
+ mds->locker->include_snap_rdlocks_wlayout(dn->get_dir()->inode, lov, layout);
else
- mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode);
+ mds->locker->include_snap_rdlocks(dn->get_dir()->inode, lov);
return dn;
}
void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
if (req->get_filepath().depth() == 0 && is_lookup) {
// refpath can't be empty for lookup but it can for
if (mask & CEPH_STAT_RSTAT)
want_auth = true; // set want_auth for CEPH_STAT_RSTAT mask
- CInode *ref = rdlock_path_pin_ref(mdr, 0, rdlocks, want_auth, false, NULL,
+ MutationImpl::LockOpVec lov;
+ CInode *ref = rdlock_path_pin_ref(mdr, 0, lov, want_auth, false, NULL,
!is_lookup);
if (!ref) return;
issued = cap->issued();
if ((mask & CEPH_CAP_LINK_SHARED) && !(issued & CEPH_CAP_LINK_EXCL))
- rdlocks.insert(&ref->linklock);
+ lov.add_rdlock(&ref->linklock);
if ((mask & CEPH_CAP_AUTH_SHARED) && !(issued & CEPH_CAP_AUTH_EXCL))
- rdlocks.insert(&ref->authlock);
+ lov.add_rdlock(&ref->authlock);
if ((mask & CEPH_CAP_XATTR_SHARED) && !(issued & CEPH_CAP_XATTR_EXCL))
- rdlocks.insert(&ref->xattrlock);
+ lov.add_rdlock(&ref->xattrlock);
if ((mask & CEPH_CAP_FILE_SHARED) && !(issued & CEPH_CAP_FILE_EXCL)) {
// Don't wait on unstable filelock if client is allowed to read file size.
// This can reduce the response time of getattr in the case that multiple
// clients do stat(2) and there are writers.
// The downside of this optimization is that mds may not issue Fs caps along
// with getattr reply. Client may need to send more getattr requests.
- if (mdr->rdlocks.count(&ref->filelock)) {
- rdlocks.insert(&ref->filelock);
+ if (mdr->is_rdlocked(&ref->filelock)) {
+ lov.add_rdlock(&ref->filelock);
} else if (ref->filelock.is_stable() ||
ref->filelock.get_num_wrlocks() > 0 ||
!ref->filelock.can_read(mdr->get_client())) {
- rdlocks.insert(&ref->filelock);
+ lov.add_rdlock(&ref->filelock);
mdr->done_locking = false;
}
}
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, ref, MAY_READ))
CDentry *dn = in->get_projected_parent_dn();
CInode *diri = dn ? dn->get_dir()->inode : NULL;
- set<SimpleLock*> rdlocks;
+ MutationImpl::LockOpVec lov;
if (dn && (want_parent || want_dentry)) {
mdr->pin(dn);
- rdlocks.insert(&dn->lock);
+ lov.add_rdlock(&dn->lock);
}
unsigned mask = req->head.args.lookupino.mask;
issued = cap->issued();
// permission bits, ACL/security xattrs
if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0)
- rdlocks.insert(&in->authlock);
+ lov.add_rdlock(&in->authlock);
if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0)
- rdlocks.insert(&in->xattrlock);
+ lov.add_rdlock(&in->xattrlock);
mdr->getattr_caps = mask;
}
- if (!rdlocks.empty()) {
- set<SimpleLock*> wrlocks, xlocks;
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!lov.empty()) {
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (diri != NULL) {
return;
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- rdlocks.insert(&diri->dirfragtreelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ MutationImpl::LockOpVec lov;
+ lov.add_rdlock(&diri->dirfragtreelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
frag_t frag = diri->dirfragtree[hash];
return;
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, need_auth);
+ MutationImpl::LockOpVec lov;
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, need_auth);
if (!cur)
return;
if (cur->is_frozen() || cur->state_test(CInode::STATE_EXPORTINGCAPS)) {
ceph_assert(!need_auth);
mdr->done_locking = false;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
}
issued = cap->issued();
// permission bits, ACL/security xattrs
if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0)
- rdlocks.insert(&cur->authlock);
+ lov.add_rdlock(&cur->authlock);
if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0)
- rdlocks.insert(&cur->xattrlock);
+ lov.add_rdlock(&cur->xattrlock);
mdr->getattr_caps = mask;
}
if ((flags & CEPH_O_TRUNC) && !mdr->has_completed) {
ceph_assert(cur->is_auth());
- xlocks.insert(&cur->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, cur, MAY_WRITE))
// this makes us wait for writers to flushsnaps, ensuring we get accurate metadata,
// and that data itself is flushed so that we can read the snapped data off disk.
if (mdr->snapid != CEPH_NOSNAP && !cur->is_dir()) {
- rdlocks.insert(&cur->filelock);
+ lov.add_rdlock(&cur->filelock);
}
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
mask = MAY_READ;
}
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- file_layout_t *dir_layout = NULL;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks,
+ MutationImpl::LockOpVec lov;
+ file_layout_t *dir_layout = nullptr;
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov,
!excl, false, false, &dir_layout);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
// created null dn.
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
- rdlocks.insert(&diri->authlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, access))
{
const MClientRequest::const_ref &req = mdr->client_request;
client_t client = req->get_source().num();
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *diri = rdlock_path_pin_ref(mdr, 0, rdlocks, false, true);
+ MutationImpl::LockOpVec lov;
+ CInode *diri = rdlock_path_pin_ref(mdr, 0, lov, false, true);
if (!diri) return;
// it's a directory, right?
return;
}
- rdlocks.insert(&diri->filelock);
- rdlocks.insert(&diri->dirfragtreelock);
+ lov.add_rdlock(&diri->filelock);
+ lov.add_rdlock(&diri->dirfragtreelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_READ))
void Server::handle_client_file_setlock(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
// get the inode to operate on, and set up any locks needed for that
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
- xlocks.insert(&cur->flocklock);
+ lov.add_xlock(&cur->flocklock);
/* acquire_locks will return true if it gets the locks. If it fails,
it will redeliver this request at a later date, so drop the request.
*/
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
+ if (!mds->locker->acquire_locks(mdr, lov)) {
dout(10) << "handle_client_file_setlock could not get locks!" << dendl;
return;
}
void Server::handle_client_file_readlock(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
// get the inode to operate on, and set up any locks needed for that
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
/* acquire_locks will return true if it gets the locks. If it fails,
it will redeliver this request at a later date, so drop the request.
*/
- rdlocks.insert(&cur->flocklock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
+ lov.add_rdlock(&cur->flocklock);
+ if (!mds->locker->acquire_locks(mdr, lov)) {
dout(10) << "handle_client_file_readlock could not get locks!" << dendl;
return;
}
void Server::handle_client_setattr(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ MutationImpl::LockOpVec lov;
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur) return;
if (mdr->snapid != CEPH_NOSNAP) {
// xlock inode
if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
- xlocks.insert(&cur->authlock);
+ lov.add_xlock(&cur->authlock);
if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
- xlocks.insert(&cur->filelock);
+ lov.add_xlock(&cur->filelock);
if (mask & CEPH_SETATTR_CTIME)
- wrlocks.insert(&cur->versionlock);
+ lov.add_wrlock(&cur->versionlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if ((mask & CEPH_SETATTR_UID) && (cur->inode.uid != req->head.args.setattr.uid))
truncating_smaller, changed_ranges));
// flush immediately if there are readers/writers waiting
- if (xlocks.count(&cur->filelock) &&
+ if (mdr->is_xlocked(&cur->filelock) &&
(cur->get_caps_wanted() & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
mds->mdlog->flush();
}
void Server::handle_client_setlayout(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ MutationImpl::LockOpVec lov;
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur) return;
if (mdr->snapid != CEPH_NOSNAP) {
return;
}
- xlocks.insert(&cur->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, cur, access))
void Server::handle_client_setdirlayout(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- file_layout_t *dir_layout = NULL;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+ MutationImpl::LockOpVec lov;
+ file_layout_t *dir_layout = nullptr;
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
if (!cur) return;
if (mdr->snapid != CEPH_NOSNAP) {
return;
}
- xlocks.insert(&cur->policylock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->policylock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
// validate layout
void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
file_layout_t *dir_layout,
- set<SimpleLock*> rdlocks,
- set<SimpleLock*> wrlocks,
- set<SimpleLock*> xlocks)
+ MutationImpl::LockOpVec& lov)
{
const MClientRequest::const_ref &req = mdr->client_request;
string name(req->get_path2());
if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
return;
- xlocks.insert(&cur->policylock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->policylock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto &pi = cur->project_inode();
if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
return;
- xlocks.insert(&cur->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->filelock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto &pi = cur->project_inode();
return;
}
- xlocks.insert(&cur->policylock);
+ lov.add_xlock(&cur->policylock);
if (quota.is_enable() && !cur->get_projected_srnode()) {
- xlocks.insert(&cur->snaplock);
+ lov.add_xlock(&cur->snaplock);
new_realm = true;
}
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto &pi = cur->project_inode(false, new_realm);
return;
}
- xlocks.insert(&cur->policylock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->policylock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto &pi = cur->project_inode();
void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
file_layout_t *dir_layout,
- set<SimpleLock*> rdlocks,
- set<SimpleLock*> wrlocks,
- set<SimpleLock*> xlocks)
+ MutationImpl::LockOpVec& lov)
{
const MClientRequest::const_ref &req = mdr->client_request;
string name(req->get_path2());
return;
}
- xlocks.insert(&cur->policylock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->policylock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto &pi = cur->project_inode();
// null/none value (empty string, means default layout). Is equivalent
// to a setxattr with empty string: pass through the empty payload of
// the rmxattr request to do this.
- handle_set_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+ handle_set_vxattr(mdr, cur, dir_layout, lov);
return;
}
{
const MClientRequest::const_ref &req = mdr->client_request;
string name(req->get_path2());
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
CInode *cur;
file_layout_t *dir_layout = NULL;
if (name.compare(0, 15, "ceph.dir.layout") == 0)
- cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
else
- cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
// magic ceph.* namespace?
if (name.compare(0, 5, "ceph.") == 0) {
- handle_set_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+ handle_set_vxattr(mdr, cur, dir_layout, lov);
return;
}
- xlocks.insert(&cur->xattrlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->xattrlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, cur, MAY_WRITE))
{
const MClientRequest::const_ref &req = mdr->client_request;
std::string name(req->get_path2());
- std::set<SimpleLock*> rdlocks, wrlocks, xlocks;
- file_layout_t *dir_layout = NULL;
+
+ MutationImpl::LockOpVec lov;
+ file_layout_t *dir_layout = nullptr;
CInode *cur;
if (name == "ceph.dir.layout")
- cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
else
- cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
}
if (name.compare(0, 5, "ceph.") == 0) {
- handle_remove_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+ handle_remove_vxattr(mdr, cur, dir_layout, lov);
return;
}
- xlocks.insert(&cur->xattrlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_xlock(&cur->xattrlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
auto pxattrs = cur->get_projected_xattrs();
{
const MClientRequest::const_ref &req = mdr->client_request;
client_t client = mdr->get_client();
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- file_layout_t *dir_layout = NULL;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false,
+ MutationImpl::LockOpVec lov;
+ file_layout_t *dir_layout = nullptr;
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false,
&dir_layout);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
return;
}
CInode *diri = dn->get_dir()->get_inode();
- rdlocks.insert(&diri->authlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_WRITE))
return;
}
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+ MutationImpl::LockOpVec lov;
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
respond_to_request(mdr, -EROFS);
}
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
- rdlocks.insert(&diri->authlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
// mkdir check access
void Server::handle_client_symlink(MDRequestRef& mdr)
{
const MClientRequest::const_ref &req = mdr->client_request;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+ MutationImpl::LockOpVec lov;
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
respond_to_request(mdr, -EROFS);
}
CDir *dir = dn->get_dir();
CInode *diri = dir->get_inode();
- rdlocks.insert(&diri->authlock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ lov.add_rdlock(&diri->authlock);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_WRITE))
<< " to " << req->get_filepath2()
<< dendl;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
if (!dn) return;
- CInode *targeti = rdlock_path_pin_ref(mdr, 1, rdlocks, false);
+ CInode *targeti = rdlock_path_pin_ref(mdr, 1, lov, false);
if (!targeti) return;
if (mdr->snapid != CEPH_NOSNAP) {
respond_to_request(mdr, -EROFS);
}
}
- xlocks.insert(&targeti->linklock);
- xlocks.insert(&targeti->snaplock);
- rdlocks.erase(&targeti->snaplock);
+ lov.erase_rdlock(&targeti->snaplock);
+ lov.add_xlock(&targeti->snaplock);
+ lov.add_xlock(&targeti->linklock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
}
// lock
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- for (int i=0; i<(int)trace.size()-1; i++) {
- rdlocks.insert(&trace[i]->lock);
- }
- xlocks.insert(&dn->lock);
- wrlocks.insert(&diri->filelock);
- wrlocks.insert(&diri->nestlock);
- xlocks.insert(&in->linklock);
+ MutationImpl::LockOpVec lov;
+
+ for (int i=0; i<(int)trace.size()-1; i++)
+ lov.add_rdlock(&trace[i]->lock);
+ lov.add_xlock(&dn->lock);
+ lov.add_wrlock(&diri->filelock);
+ lov.add_wrlock(&diri->nestlock);
+ lov.add_xlock(&in->linklock);
if (straydn) {
- wrlocks.insert(&straydn->get_dir()->inode->filelock);
- wrlocks.insert(&straydn->get_dir()->inode->nestlock);
- xlocks.insert(&straydn->lock);
+ lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+ lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+ lov.add_xlock(&straydn->lock);
}
- mds->locker->include_snap_rdlocks(rdlocks, diri);
- xlocks.insert(&in->snaplock);
+ mds->locker->include_snap_rdlocks(diri, lov);
+ lov.add_xlock(&in->snaplock);
if (in->is_dir())
- rdlocks.insert(&in->filelock); // to verify it's empty
+ lov.add_rdlock(&in->filelock); // to verify it's empty
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (in->is_dir() &&
vector<CDentry*>& srctrace = mdr->dn[1];
vector<CDentry*>& desttrace = mdr->dn[0];
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
- CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, true, false, true);
+ CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, false, true);
if (!destdn) return;
dout(10) << " destdn " << *destdn << dendl;
if (mdr->snapid != CEPH_NOSNAP) {
while (destbase != srcbase) {
CDentry *pdn = destbase->get_projected_parent_dn();
desttrace.insert(desttrace.begin(), pdn);
- rdlocks.insert(&pdn->lock);
+ lov.add_rdlock(&pdn->lock);
dout(10) << "rename prepending desttrace with " << *pdn << dendl;
destbase = pdn->get_dir()->get_inode();
}
// -- locks --
- map<SimpleLock*, mds_rank_t> remote_wrlocks;
// srctrace items. this mirrors locks taken in rdlock_path_xlock_dentry
for (int i=0; i<(int)srctrace.size(); i++)
- rdlocks.insert(&srctrace[i]->lock);
- xlocks.insert(&srcdn->lock);
+ lov.add_rdlock(&srctrace[i]->lock);
+ lov.add_xlock(&srcdn->lock);
mds_rank_t srcdirauth = srcdir->authority().first;
if (srcdirauth != mds->get_nodeid()) {
dout(10) << " will remote_wrlock srcdir scatterlocks on mds." << srcdirauth << dendl;
- remote_wrlocks[&srcdir->inode->filelock] = srcdirauth;
- remote_wrlocks[&srcdir->inode->nestlock] = srcdirauth;
+ lov.add_remote_wrlock(&srcdir->inode->filelock, srcdirauth);
+ lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdirauth);
if (srci->is_dir())
- rdlocks.insert(&srci->dirfragtreelock);
+ lov.add_rdlock(&srci->dirfragtreelock);
} else {
- wrlocks.insert(&srcdir->inode->filelock);
- wrlocks.insert(&srcdir->inode->nestlock);
+ lov.add_wrlock(&srcdir->inode->filelock);
+ lov.add_wrlock(&srcdir->inode->nestlock);
}
- mds->locker->include_snap_rdlocks(rdlocks, srcdir->inode);
+ mds->locker->include_snap_rdlocks(srcdir->inode, lov);
// straydn?
if (straydn) {
- wrlocks.insert(&straydn->get_dir()->inode->filelock);
- wrlocks.insert(&straydn->get_dir()->inode->nestlock);
- xlocks.insert(&straydn->lock);
+ lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+ lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+ lov.add_xlock(&straydn->lock);
}
// xlock versionlock on dentries if there are witnesses.
// this ensures the srcdn and destdn can be traversed to by the witnesses.
for (int i= 0; i<(int)srctrace.size(); i++) {
if (srctrace[i]->is_auth() && srctrace[i]->is_projected())
- xlocks.insert(&srctrace[i]->versionlock);
+ lov.add_xlock(&srctrace[i]->versionlock);
}
for (int i=0; i<(int)desttrace.size(); i++) {
if (desttrace[i]->is_auth() && desttrace[i]->is_projected())
- xlocks.insert(&desttrace[i]->versionlock);
+ lov.add_xlock(&desttrace[i]->versionlock);
}
// xlock srci and oldin's primary dentries, so witnesses can call
// open_remote_ino() with 'want_locked=true' when the srcdn or destdn
// is traversed.
if (srcdnl->is_remote())
- xlocks.insert(&srci->get_projected_parent_dn()->lock);
+ lov.add_xlock(&srci->get_projected_parent_dn()->lock);
if (destdnl->is_remote())
- xlocks.insert(&oldin->get_projected_parent_dn()->lock);
+ lov.add_xlock(&oldin->get_projected_parent_dn()->lock);
}
// we need to update srci's ctime. xlock its least contended lock to do that...
- xlocks.insert(&srci->linklock);
- xlocks.insert(&srci->snaplock);
+ lov.add_xlock(&srci->linklock);
+ lov.add_xlock(&srci->snaplock);
if (oldin) {
// xlock oldin (for nlink--)
- xlocks.insert(&oldin->linklock);
- xlocks.insert(&oldin->snaplock);
+ lov.add_xlock(&oldin->linklock);
+ lov.add_xlock(&oldin->snaplock);
if (oldin->is_dir())
- rdlocks.insert(&oldin->filelock); // to verify it's empty
+ lov.add_rdlock(&oldin->filelock); // to verify it's empty
}
CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : NULL;
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks,
- &remote_wrlocks, auth_pin_freeze))
+ if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
return;
if (linkmerge)
/* hack: add an auth pin for each xlock we hold. These were
* remote xlocks previously but now they're local and
* we're going to try and unpin when we xlock_finish. */
- for (set<SimpleLock *>::iterator i = mdr->xlocks.begin();
- i != mdr->xlocks.end();
- ++i)
- if ((*i)->get_parent() == destdnl->get_inode() &&
- !(*i)->is_locallock())
- mds->locker->xlock_import(*i);
+
+ for (auto i = mdr->locks.lower_bound(&destdnl->get_inode()->versionlock);
+ i != mdr->locks.end();
+ ++i) {
+ SimpleLock *lock = i->lock;
+ if (lock->get_parent() != destdnl->get_inode())
+ break;
+ if (i->is_xlock() && !lock->is_locallock())
+ mds->locker->xlock_import(lock);
+ }
// hack: fix auth bit
in->state_set(CInode::STATE_AUTH);
if (mdr->more()->is_inode_exporter) {
// drop our pins
// we exported, clear out any xlocks that we moved to another MDS
- set<SimpleLock*>::iterator i = mdr->xlocks.begin();
- while (i != mdr->xlocks.end()) {
- SimpleLock *lock = *i++;
+ for (auto i = mdr->locks.lower_bound(&in->versionlock);
+ i != mdr->locks.end(); ) {
+ SimpleLock *lock = i->lock;
+ if (lock->get_parent() != in)
+ break;
// we only care about xlocks on the exported inode
- if (lock->get_parent() == in &&
- !lock->is_locallock())
- mds->locker->xlock_export(lock, mdr.get());
+ if (i->is_xlock() && !lock->is_locallock())
+ mds->locker->xlock_export(i++, mdr.get());
+ else
+ ++i;
}
map<client_t,Capability::Import> peer_imported;
dout(10) << "lssnap on " << *diri << dendl;
// lock snap
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- mds->locker->include_snap_rdlocks(rdlocks, diri);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ MutationImpl::LockOpVec lov;
+ mds->locker->include_snap_rdlocks(diri, lov);
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_READ))
dout(10) << "mksnap " << snapname << " on " << *diri << dendl;
// lock snap
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
- mds->locker->include_snap_rdlocks(rdlocks, diri);
- rdlocks.erase(&diri->snaplock);
- xlocks.insert(&diri->snaplock);
+ mds->locker->include_snap_rdlocks(diri, lov);
+ lov.erase_rdlock(&diri->snaplock);
+ lov.add_xlock(&diri->snaplock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
dout(10) << " snapname " << snapname << " is " << snapid << dendl;
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- mds->locker->include_snap_rdlocks(rdlocks, diri);
- rdlocks.erase(&diri->snaplock);
- xlocks.insert(&diri->snaplock);
+ MutationImpl::LockOpVec lov;
+ mds->locker->include_snap_rdlocks(diri, lov);
+ lov.erase_rdlock(&diri->snaplock);
+ lov.add_xlock(&diri->snaplock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
dout(10) << " snapname " << srcname << " is " << snapid << dendl;
// lock snap
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ MutationImpl::LockOpVec lov;
- mds->locker->include_snap_rdlocks(rdlocks, diri);
- rdlocks.erase(&diri->snaplock);
- xlocks.insert(&diri->snaplock);
+ mds->locker->include_snap_rdlocks(diri, lov);
+ lov.erase_rdlock(&diri->snaplock);
+ lov.add_xlock(&diri->snaplock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ if (!mds->locker->acquire_locks(mdr, lov))
return;
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
void journal_allocated_inos(MDRequestRef& mdr, EMetaBlob *blob);
void apply_allocated_inos(MDRequestRef& mdr, Session *session);
- CInode* rdlock_path_pin_ref(MDRequestRef& mdr, int n, set<SimpleLock*>& rdlocks, bool want_auth,
- bool no_want_auth=false,
- file_layout_t **layout=NULL,
+ CInode* rdlock_path_pin_ref(MDRequestRef& mdr, int n, MutationImpl::LockOpVec& lov,
+ bool want_auth, bool no_want_auth=false,
+ file_layout_t **layout=nullptr,
bool no_lookup=false);
CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
- set<SimpleLock*>& rdlocks,
- set<SimpleLock*>& wrlocks,
- set<SimpleLock*>& xlocks, bool okexist,
- bool mustexist, bool alwaysxlock,
- file_layout_t **layout=NULL);
+ MutationImpl::LockOpVec& lov,
+ bool okexist, bool mustexist, bool alwaysxlock,
+ file_layout_t **layout=nullptr);
CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr);
file_layout_t *layout);
void handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
file_layout_t *dir_layout,
- set<SimpleLock*> rdlocks,
- set<SimpleLock*> wrlocks,
- set<SimpleLock*> xlocks);
+ MutationImpl::LockOpVec& lov);
void handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
file_layout_t *dir_layout,
- set<SimpleLock*> rdlocks,
- set<SimpleLock*> wrlocks,
- set<SimpleLock*> xlocks);
+ MutationImpl::LockOpVec& lov);
void handle_client_setxattr(MDRequestRef& mdr);
void handle_client_removexattr(MDRequestRef& mdr);