]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: optimize acquiring locks 23198/head
authorYan, Zheng <zyan@redhat.com>
Sat, 7 Jul 2018 05:14:19 +0000 (13:14 +0800)
committerYan, Zheng <zyan@redhat.com>
Tue, 28 Aug 2018 04:46:04 +0000 (21:46 -0700)
There are several changes:
 - use single LockOp vector to pass all locks (rdlock, wrlock, xlock)
   to Locker::acquire_locks(). this avoids memory allocation overhead
   of using multimple set
 - change MutationImpl::locks to LockOp set, use it to track both locks
   and locks' types.
 - use empalce_hint to optimize insertion of lock to MutationImpl::locks
 - use iterator to optimize removal of lock from MutationImpl::locks

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/mds/CInode.cc
src/mds/Locker.cc
src/mds/Locker.h
src/mds/MDCache.cc
src/mds/Migrator.cc
src/mds/Migrator.h
src/mds/Mutation.cc
src/mds/Mutation.h
src/mds/Server.cc
src/mds/Server.h

index 1f25f7b2c06260816e2e9a8e5e2d0b746ece3135..ac08db1a866f0f24c389bd4e224d697ba9eb2b93 100644 (file)
@@ -2083,7 +2083,7 @@ void CInode::finish_scatter_update(ScatterLock *lock, CDir *dir,
         dir->assimilate_dirty_rstat_inodes_finish(mut, &le->metablob);
 
         if (!(pf->rstat == pf->accounted_rstat)) {
-          if (mut->wrlocks.count(&nestlock) == 0) {
+          if (!mut->is_wrlocked(&nestlock)) {
             mdcache->mds->locker->wrlock_force(&nestlock, mut);
           }
 
index d70d77453d99119a60fe1e44e23a9e86b2af7c93..5c714ab49d8c44f7cf0a097006819eb9448a27f9 100644 (file)
@@ -156,29 +156,29 @@ void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data
 
 
 
-void Locker::include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in)
+void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
 {
   // rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
   while (t->get_projected_parent_dn()) {
     t = t->get_projected_parent_dn()->get_dir()->get_inode();
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
   }
+  lov.add_rdlock(&in->snaplock);
 }
 
-void Locker::include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
+void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
                                          file_layout_t **layout)
 {
   //rdlock ancestor snaps
   CInode *t = in;
-  rdlocks.insert(&in->snaplock);
-  rdlocks.insert(&in->policylock);
+  lov.add_rdlock(&in->snaplock);
+  lov.add_rdlock(&in->policylock);
   bool found_layout = false;
   while (t) {
-    rdlocks.insert(&t->snaplock);
+    lov.add_rdlock(&t->snaplock);
     if (!found_layout) {
-      rdlocks.insert(&t->policylock);
+      lov.add_rdlock(&t->policylock);
       if (t->get_projected_inode()->has_layout()) {
         *layout = &t->get_projected_inode()->layout;
         found_layout = true;
@@ -208,10 +208,7 @@ struct MarkEventOnDestruct {
 /* If this function returns false, the mdr has been placed
  * on the appropriate wait list */
 bool Locker::acquire_locks(MDRequestRef& mdr,
-                          set<SimpleLock*> &rdlocks,
-                          set<SimpleLock*> &wrlocks,
-                          set<SimpleLock*> &xlocks,
-                          map<SimpleLock*,mds_rank_t> *remote_wrlocks,
+                          MutationImpl::LockOpVec& lov,
                           CInode *auth_pin_freeze,
                           bool auth_pin_nonblock)
 {
@@ -226,143 +223,122 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   client_t client = mdr->get_client();
 
-  set<SimpleLock*, SimpleLock::ptr_lt> sorted;  // sort everything we will lock
-  set<MDSCacheObject*> mustpin;            // items to authpin
+  set<MDSCacheObject*> mustpin;  // items to authpin
 
   // xlocks
-  for (set<SimpleLock*>::iterator p = xlocks.begin(); p != xlocks.end(); ++p) {
-    SimpleLock *lock = *p;
-
-    if ((lock->get_type() == CEPH_LOCK_ISNAP ||
-         lock->get_type() == CEPH_LOCK_IPOLICY) &&
-       mds->is_cluster_degraded() &&
-       mdr->is_master() &&
-       !mdr->is_queued_for_replay()) {
-      // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
-      // get processed in proper order.
-      bool wait = false;
-      if (lock->get_parent()->is_auth()) {
-       if (!mdr->locks.count(lock)) {
-         set<mds_rank_t> ls;
-         lock->get_parent()->list_replicas(ls);
-         for (auto m : ls) {
-           if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
-             wait = true;
-             break;
+  for (int i = 0, size = lov.size(); i < size; ++i) {
+    auto& p = lov[i];
+    SimpleLock *lock = p.lock;
+    MDSCacheObject *object = lock->get_parent();
+
+    if (p.is_xlock()) {
+      if ((lock->get_type() == CEPH_LOCK_ISNAP ||
+          lock->get_type() == CEPH_LOCK_IPOLICY) &&
+         mds->is_cluster_degraded() &&
+         mdr->is_master() &&
+         !mdr->is_queued_for_replay()) {
+       // waiting for recovering mds, to guarantee replayed requests and mksnap/setlayout
+       // get processed in proper order.
+       bool wait = false;
+       if (object->is_auth()) {
+         if (!mdr->locks.count(lock)) {
+           set<mds_rank_t> ls;
+           object->list_replicas(ls);
+           for (auto m : ls) {
+             if (mds->mdsmap->get_state(m) < MDSMap::STATE_ACTIVE) {
+               wait = true;
+               break;
+             }
            }
          }
+       } else {
+         // if the lock is the latest locked one, it's possible that slave mds got the lock
+         // while there are recovering mds.
+         if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
+           wait = true;
+       }
+       if (wait) {
+         dout(10) << " must xlock " << *lock << " " << *object
+                  << ", waiting for cluster recovered" << dendl;
+         mds->locker->drop_locks(mdr.get(), NULL);
+         mdr->drop_local_auth_pins();
+         mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
+         return false;
        }
-      } else {
-       // if the lock is the latest locked one, it's possible that slave mds got the lock
-       // while there are recovering mds.
-       if (!mdr->locks.count(lock) || lock == *mdr->locks.rbegin())
-         wait = true;
-      }
-      if (wait) {
-       dout(10) << " must xlock " << *lock << " " << *lock->get_parent()
-                << ", waiting for cluster recovered" << dendl;
-       mds->locker->drop_locks(mdr.get(), NULL);
-       mdr->drop_local_auth_pins();
-       mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-       return false;
       }
-    }
-
-    dout(20) << " must xlock " << *lock << " " << *lock->get_parent() << dendl;
-
-    sorted.insert(lock);
-    mustpin.insert(lock->get_parent());
 
-    // augment xlock with a versionlock?
-    if ((*p)->get_type() == CEPH_LOCK_DN) {
-      CDentry *dn = (CDentry*)lock->get_parent();
-      if (!dn->is_auth())
-       continue;
+      dout(20) << " must xlock " << *lock << " " << *object << dendl;
 
-      if (xlocks.count(&dn->versionlock))
-       continue;  // we're xlocking the versionlock too; don't wrlock it!
+      mustpin.insert(object);
 
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline dentry updates to journal.
-       wrlocks.insert(&dn->versionlock);
-      } else {
-       // slave.  exclusively lock the dentry version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&dn->versionlock);
-       sorted.insert(&dn->versionlock);
+      // augment xlock with a versionlock?
+      if (lock->get_type() == CEPH_LOCK_DN) {
+       CDentry *dn = static_cast<CDentry*>(object);
+       if (!dn->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline dentry updates to journal.
+         lov.add_wrlock(&dn->versionlock);
+       } else {
+         // slave.  exclusively lock the dentry version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&dn->versionlock);
+       }
       }
-    }
-    if (lock->get_type() > CEPH_LOCK_IVERSION) {
-      // inode version lock?
-      CInode *in = (CInode*)lock->get_parent();
-      if (!in->is_auth())
-       continue;
-      if (mdr->is_master()) {
-       // master.  wrlock versionlock so we can pipeline inode updates to journal.
-       wrlocks.insert(&in->versionlock);
-      } else {
-       // slave.  exclusively lock the inode version (i.e. block other journal updates).
-       // this makes rollback safe.
-       xlocks.insert(&in->versionlock);
-       sorted.insert(&in->versionlock);
+      if (lock->get_type() > CEPH_LOCK_IVERSION) {
+       // inode version lock?
+       CInode *in = static_cast<CInode*>(object);
+       if (!in->is_auth())
+         continue;
+       if (mdr->is_master()) {
+         // master.  wrlock versionlock so we can pipeline inode updates to journal.
+         lov.add_wrlock(&in->versionlock);
+       } else {
+         // slave.  exclusively lock the inode version (i.e. block other journal updates).
+         // this makes rollback safe.
+         lov.add_xlock(&in->versionlock);
+       }
       }
-    }
-  }
-
-  // wrlocks
-  for (set<SimpleLock*>::iterator p = wrlocks.begin(); p != wrlocks.end(); ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must wrlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_wrlock(client) &&  // we might have to request a scatter
-            !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a scatter" << dendl;
-      mustpin.insert(object);
-    }
-  }
-
-  // remote_wrlocks
-  if (remote_wrlocks) {
-    for (map<SimpleLock*,mds_rank_t>::iterator p = remote_wrlocks->begin(); p != remote_wrlocks->end(); ++p) {
-      MDSCacheObject *object = p->first->get_parent();
-      dout(20) << " must remote_wrlock on mds." << p->second << " "
-              << *p->first << " " << *object << dendl;
-      sorted.insert(p->first);
-      mustpin.insert(object);
-    }
-  }
-
-  // rdlocks
-  for (set<SimpleLock*>::iterator p = rdlocks.begin();
-        p != rdlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    dout(20) << " must rdlock " << **p << " " << *object << dendl;
-    sorted.insert(*p);
-    if (object->is_auth())
-      mustpin.insert(object);
-    else if (!object->is_auth() &&
-            !(*p)->can_rdlock(client)) {      // we might have to request an rdlock
-      dout(15) << " will also auth_pin " << *object
-              << " in case we need to request a rdlock" << dendl;
+    } else if (p.is_wrlock()) {
+      dout(20) << " must wrlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_wrlock(client) &&  // we might have to request a scatter
+                !mdr->is_slave()) {           // if we are slave (remote_wrlock), the master already authpinned
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a scatter" << dendl;
+       mustpin.insert(object);
+      }
+    } else if (p.is_remote_wrlock()) {
+      dout(20) << " must remote_wrlock on mds." << p.wrlock_target << " "
+              << *lock << " " << *object << dendl;
       mustpin.insert(object);
+    } else if (p.is_rdlock()) {
+
+      dout(20) << " must rdlock " << *lock << " " << *object << dendl;
+      if (object->is_auth()) {
+       mustpin.insert(object);
+      } else if (!object->is_auth() &&
+                !lock->can_rdlock(client)) {      // we might have to request an rdlock
+       dout(15) << " will also auth_pin " << *object
+                << " in case we need to request a rdlock" << dendl;
+       mustpin.insert(object);
+      }
+    } else {
+      ceph_assert(0 == "locker unknown lock operation");
     }
   }
 
+  lov.sort_and_merge();
  
   // AUTH PINS
   map<mds_rank_t, set<MDSCacheObject*> > mustpin_remote;  // mds -> (object set)
   
   // can i auth pin them all now?
   marker.message = "failed to authpin local pins";
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto &p : mustpin) {
+    MDSCacheObject *object = p;
 
     dout(10) << " must authpin " << *object << dendl;
 
@@ -419,10 +395,8 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   }
 
   // ok, grab local auth pins
-  for (set<MDSCacheObject*>::iterator p = mustpin.begin();
-       p != mustpin.end();
-       ++p) {
-    MDSCacheObject *object = *p;
+  for (const auto& p : mustpin) {
+    MDSCacheObject *object = p;
     if (mdr->is_auth_pinned(object)) {
       dout(10) << " already auth_pinned " << *object << dendl;
     } else if (object->is_auth()) {
@@ -434,14 +408,12 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
   // request remote auth_pins
   if (!mustpin_remote.empty()) {
     marker.message = "requesting remote authpins";
-    for (map<MDSCacheObject*,mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
-        p != mdr->remote_auth_pins.end();
-        ++p) {
-      if (mustpin.count(p->first)) {
-       ceph_assert(p->second == p->first->authority().first);
-       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p->second);
+    for (const auto& p : mdr->remote_auth_pins) {
+      if (mustpin.count(p.first)) {
+       ceph_assert(p.second == p.first->authority().first);
+       map<mds_rank_t, set<MDSCacheObject*> >::iterator q = mustpin_remote.find(p.second);
        if (q != mustpin_remote.end())
-         q->second.insert(p->first);
+         q->second.insert(p.first);
       }
     }
     for (map<mds_rank_t, set<MDSCacheObject*> >::iterator p = mustpin_remote.begin();
@@ -487,157 +459,154 @@ bool Locker::acquire_locks(MDRequestRef& mdr,
 
   // acquire locks.
   // make sure they match currently acquired locks.
-  set<SimpleLock*, SimpleLock::ptr_lt>::iterator existing = mdr->locks.begin();
-  for (set<SimpleLock*, SimpleLock::ptr_lt>::iterator p = sorted.begin();
-       p != sorted.end();
-       ++p) {
-    bool need_wrlock = !!wrlocks.count(*p);
-    bool need_remote_wrlock = !!(remote_wrlocks && remote_wrlocks->count(*p));
+  auto existing = mdr->locks.begin();
+  for (const auto& p : lov) {
+    bool need_wrlock = p.is_wrlock();
+    bool need_remote_wrlock = p.is_remote_wrlock();
 
     // already locked?
-    if (existing != mdr->locks.end() && *existing == *p) {
+    if (existing != mdr->locks.end() && existing->lock == p.lock) {
       // right kind?
-      SimpleLock *have = *existing;
-      ++existing;
-      if (xlocks.count(have) && mdr->xlocks.count(have)) {
-       dout(10) << " already xlocked " << *have << " " << *have->get_parent() << dendl;
+      auto it = existing++;
+      auto have = *it; // don't reference
+
+      if (have.is_xlock() && p.is_xlock()) {
+       dout(10) << " already xlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
-      if (mdr->remote_wrlocks.count(have)) {
-       if (!need_remote_wrlock ||
-           mdr->remote_wrlocks[have] != (*remote_wrlocks)[have]) {
-         dout(10) << " unlocking remote_wrlock on wrong mds." << mdr->remote_wrlocks[have]
-                  << " " << *have << " " << *have->get_parent() << dendl;
-         remote_wrlock_finish(have, mdr->remote_wrlocks[have], mdr.get());
-       }
+
+      if (have.is_remote_wrlock() &&
+         (!need_remote_wrlock || have.wrlock_target != p.wrlock_target)) {
+       dout(10) << " unlocking remote_wrlock on wrong mds." << have.wrlock_target
+                << " " << *have.lock << " " << *have.lock->get_parent() << dendl;
+       remote_wrlock_finish(it, mdr.get());
+       have.clear_remote_wrlock();
       }
+
       if (need_wrlock || need_remote_wrlock) {
-       if (need_wrlock == !!mdr->wrlocks.count(have) &&
-           need_remote_wrlock == !!mdr->remote_wrlocks.count(have)) {
+       if (need_wrlock == have.is_wrlock() &&
+           need_remote_wrlock == have.is_remote_wrlock()) {
          if (need_wrlock)
-           dout(10) << " already wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          if (need_remote_wrlock)
-           dout(10) << " already remote_wrlocked " << *have << " " << *have->get_parent() << dendl;
+           dout(10) << " already remote_wrlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
          continue;
        }
-      }
-      if (rdlocks.count(have) && mdr->rdlocks.count(have)) {
-       dout(10) << " already rdlocked " << *have << " " << *have->get_parent() << dendl;
+
+       if (have.is_wrlock()) {
+         if (!need_wrlock)
+           dout(10) << " unlocking extra " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         else if (need_remote_wrlock) // acquire remote_wrlock first
+           dout(10) << " unlocking out-of-order " << *have.lock << " " << *have.lock->get_parent() << dendl;
+         bool need_issue = false;
+         wrlock_finish(it, mdr.get(), &need_issue);
+         if (need_issue)
+           issue_set.insert(static_cast<CInode*>(have.lock->get_parent()));
+       }
+      } else if (have.is_rdlock() && p.is_rdlock()) {
+       dout(10) << " already rdlocked " << *have.lock << " " << *have.lock->get_parent() << dendl;
        continue;
       }
     }
     
     // hose any stray locks
-    if (existing != mdr->locks.end() && *existing == *p) {
-      ceph_assert(need_wrlock || need_remote_wrlock);
-      SimpleLock *lock = *existing;
-      if (mdr->wrlocks.count(lock)) {
-       if (!need_wrlock)
-         dout(10) << " unlocking extra " << *lock << " " << *lock->get_parent() << dendl;
-       else if (need_remote_wrlock) // acquire remote_wrlock first
-         dout(10) << " unlocking out-of-order " << *lock << " " << *lock->get_parent() << dendl;
-       bool need_issue = false;
-       wrlock_finish(lock, mdr.get(), &need_issue);
-       if (need_issue)
-         issue_set.insert(static_cast<CInode*>(lock->get_parent()));
-      }
-      ++existing;
-    }
     while (existing != mdr->locks.end()) {
-      SimpleLock *stray = *existing;
-      ++existing;
-      dout(10) << " unlocking out-of-order " << *stray << " " << *stray->get_parent() << dendl;
+      auto it = existing++;
+      auto stray = *it; // don't reference
+      dout(10) << " unlocking out-of-order " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
       bool need_issue = false;
-      if (mdr->xlocks.count(stray)) {
-       xlock_finish(stray, mdr.get(), &need_issue);
-      } else if (mdr->rdlocks.count(stray)) {
-       rdlock_finish(stray, mdr.get(), &need_issue);
+      if (stray.is_xlock()) {
+       xlock_finish(it, mdr.get(), &need_issue);
+      } else if (stray.is_rdlock()) {
+       rdlock_finish(it, mdr.get(), &need_issue);
       } else {
        // may have acquired both wrlock and remore wrlock
-       if (mdr->wrlocks.count(stray))
-         wrlock_finish(stray, mdr.get(), &need_issue);
-       if (mdr->remote_wrlocks.count(stray))
-         remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+       if (stray.is_wrlock())
+         wrlock_finish(it, mdr.get(), &need_issue);
+       if (stray.is_remote_wrlock())
+         remote_wrlock_finish(it, mdr.get());
       }
       if (need_issue)
-       issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+       issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
     }
 
     // lock
-    if (mdr->locking && *p != mdr->locking) {
+    if (mdr->locking && p.lock != mdr->locking) {
       cancel_locking(mdr.get(), &issue_set);
     }
-    if (xlocks.count(*p)) {
+    if (p.is_xlock()) {
       marker.message = "failed to xlock, waiting";
-      if (!xlock_start(*p, mdr)) 
+      if (!xlock_start(p.lock, mdr))
        goto out;
-      dout(10) << " got xlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got xlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     } else if (need_wrlock || need_remote_wrlock) {
-      if (need_remote_wrlock && !mdr->remote_wrlocks.count(*p)) {
+      if (need_remote_wrlock && !mdr->is_remote_wrlocked(p)) {
         marker.message = "waiting for remote wrlocks";
-       remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+       remote_wrlock_start(p, p.wrlock_target, mdr);
        goto out;
       }
-      if (need_wrlock && !mdr->wrlocks.count(*p)) {
+      if (need_wrlock) {
         marker.message = "failed to wrlock, waiting";
-       if (need_remote_wrlock && !(*p)->can_wrlock(mdr->get_client())) {
+       if (need_remote_wrlock && !p.lock->can_wrlock(mdr->get_client())) {
          marker.message = "failed to wrlock, dropping remote wrlock and waiting";
          // can't take the wrlock because the scatter lock is gathering. need to
          // release the remote wrlock, so that the gathering process can finish.
-         remote_wrlock_finish(*p, mdr->remote_wrlocks[*p], mdr.get());
-         remote_wrlock_start(*p, (*remote_wrlocks)[*p], mdr);
+         auto it =  mdr->locks.end();
+         ++it;
+         remote_wrlock_finish(it, mdr.get());
+         remote_wrlock_start(p, p.wrlock_target, mdr);
          goto out;
        }
        // nowait if we have already gotten remote wrlock
-       if (!wrlock_start(*p, mdr, need_remote_wrlock))
+       if (!wrlock_start(p, mdr, need_remote_wrlock))
          goto out;
-       dout(10) << " got wrlock on " << **p << " " << *(*p)->get_parent() << dendl;
+       dout(10) << " got wrlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
       }
     } else {
       ceph_assert(mdr->is_master());
-      if ((*p)->needs_recover()) {
+      if (p.lock->needs_recover()) {
        if (mds->is_cluster_degraded()) {
          if (!mdr->is_queued_for_replay()) {
            // see comments in SimpleLock::set_state_rejoin() and
            // ScatterLock::encode_state_for_rejoin()
            drop_locks(mdr.get());
            mds->wait_for_cluster_recovered(new C_MDS_RetryRequest(mdcache, mdr));
-           dout(10) << " rejoin recovering " << **p << " " << *(*p)->get_parent()
+           dout(10) << " rejoin recovering " << *p.lock << " " << *p.lock->get_parent()
                     << ", waiting for cluster recovered" << dendl;
            marker.message = "rejoin recovering lock, waiting for cluster recovered";
            return false;
          }
        } else {
-         (*p)->clear_need_recover();
+         p.lock->clear_need_recover();
        }
       }
 
       marker.message = "failed to rdlock, waiting";
-      if (!rdlock_start(*p, mdr)) 
+      if (!rdlock_start(p, mdr))
        goto out;
-      dout(10) << " got rdlock on " << **p << " " << *(*p)->get_parent() << dendl;
+      dout(10) << " got rdlock on " << *p.lock << " " << *p.lock->get_parent() << dendl;
     }
   }
     
   // any extra unneeded locks?
   while (existing != mdr->locks.end()) {
-    SimpleLock *stray = *existing;
-    ++existing;
-    dout(10) << " unlocking extra " << *stray << " " << *stray->get_parent() << dendl;
+    auto it = existing++;
+    auto stray = *it;
+    dout(10) << " unlocking extra " << *stray.lock << " " << *stray.lock->get_parent() << dendl;
     bool need_issue = false;
-    if (mdr->xlocks.count(stray)) {
-      xlock_finish(stray, mdr.get(), &need_issue);
-    } else if (mdr->rdlocks.count(stray)) {
-      rdlock_finish(stray, mdr.get(), &need_issue);
+    if (stray.is_xlock()) {
+      xlock_finish(it, mdr.get(), &need_issue);
+    } else if (stray.is_rdlock()) {
+      rdlock_finish(it, mdr.get(), &need_issue);
     } else {
       // may have acquired both wrlock and remore wrlock
-      if (mdr->wrlocks.count(stray))
-       wrlock_finish(stray, mdr.get(), &need_issue);
-      if (mdr->remote_wrlocks.count(stray))
-       remote_wrlock_finish(stray, mdr->remote_wrlocks[stray], mdr.get());
+      if (stray.is_wrlock())
+       wrlock_finish(it, mdr.get(), &need_issue);
+      if (stray.is_remote_wrlock())
+       remote_wrlock_finish(it, mdr.get());
     }
     if (need_issue)
-      issue_set.insert(static_cast<CInode*>(stray->get_parent()));
+      issue_set.insert(static_cast<CInode*>(stray.lock->get_parent()));
   }
 
   mdr->done_locking = true;
@@ -675,65 +644,61 @@ void Locker::notify_freeze_waiter(MDSCacheObject *o)
 
 void Locker::set_xlocks_done(MutationImpl *mut, bool skip_dentry)
 {
-  for (set<SimpleLock*>::iterator p = mut->xlocks.begin();
-       p != mut->xlocks.end();
-       ++p) {
-    MDSCacheObject *object = (*p)->get_parent();
-    ceph_assert(object->is_auth());
+  for (const auto &p : mut->locks) {
+    if (!p.is_xlock())
+      continue;
+    MDSCacheObject *obj = p.lock->get_parent();
+    ceph_assert(obj->is_auth());
     if (skip_dentry &&
-       ((*p)->get_type() == CEPH_LOCK_DN || (*p)->get_type() == CEPH_LOCK_DVERSION))
+       (p.lock->get_type() == CEPH_LOCK_DN || p.lock->get_type() == CEPH_LOCK_DVERSION))
       continue;
-    dout(10) << "set_xlocks_done on " << **p << " " << *object << dendl;
-    (*p)->set_xlock_done();
-  }
-}
-
-void Locker::_drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
-{
-  while (!mut->rdlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->rdlocks.begin())->get_parent();
-    rdlock_finish(*mut->rdlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
+    dout(10) << "set_xlocks_done on " << *p.lock << " " << *obj << dendl;
+    p.lock->set_xlock_done();
   }
 }
 
-void Locker::_drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
+void Locker::_drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue,
+                        bool drop_rdlocks)
 {
   set<mds_rank_t> slaves;
 
-  while (!mut->xlocks.empty()) {
-    SimpleLock *lock = *mut->xlocks.begin();
-    MDSCacheObject *p = lock->get_parent();
-    if (!p->is_auth()) {
-      ceph_assert(lock->get_sm()->can_remote_xlock);
-      slaves.insert(p->authority().first);
-      lock->put_xlock();
-      mut->locks.erase(lock);
-      mut->xlocks.erase(lock);
-      continue;
-    }
-    bool ni = false;
-    xlock_finish(lock, mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
-  }
-
-  while (!mut->remote_wrlocks.empty()) {
-    map<SimpleLock*,mds_rank_t>::iterator p = mut->remote_wrlocks.begin();
-    slaves.insert(p->second);
-    if (mut->wrlocks.count(p->first) == 0)
-      mut->locks.erase(p->first);
-    mut->remote_wrlocks.erase(p);
-  }
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    MDSCacheObject *obj = lock->get_parent();
 
-  while (!mut->wrlocks.empty()) {
-    bool ni = false;
-    MDSCacheObject *p = (*mut->wrlocks.begin())->get_parent();
-    wrlock_finish(*mut->wrlocks.begin(), mut, &ni);
-    if (ni)
-      pneed_issue->insert(static_cast<CInode*>(p));
+    if (it->is_xlock()) {
+      if (obj->is_auth()) {
+       bool ni = false;
+       xlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       ceph_assert(lock->get_sm()->can_remote_xlock);
+       slaves.insert(obj->authority().first);
+       lock->put_xlock();
+       mut->locks.erase(it++);
+      }
+    } else if (it->is_wrlock() || it->is_remote_wrlock()) {
+      if (it->is_remote_wrlock()) {
+       slaves.insert(it->wrlock_target);
+       it->clear_remote_wrlock();
+      }
+      if (it->is_wrlock()) {
+       bool ni = false;
+       wrlock_finish(it++, mut, &ni);
+       if (ni)
+         pneed_issue->insert(static_cast<CInode*>(obj));
+      } else {
+       mut->locks.erase(it++);
+      }
+    } else if (drop_rdlocks && it->is_rdlock()) {
+      bool ni = false;
+      rdlock_finish(it++, mut, &ni);
+      if (ni)
+       pneed_issue->insert(static_cast<CInode*>(obj));
+    } else {
+      ++it;
+    }
   }
 
   for (set<mds_rank_t>::iterator p = slaves.begin(); p != slaves.end(); ++p) {
@@ -776,8 +741,7 @@ void Locker::drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue)
 
   if (mut->locking)
     cancel_locking(mut, pneed_issue);
-  _drop_non_rdlocks(mut, pneed_issue);
-  _drop_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, true);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
@@ -790,7 +754,7 @@ void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
   if (!pneed_issue)
     pneed_issue = &my_need_issue;
 
-  _drop_non_rdlocks(mut, pneed_issue);
+  _drop_locks(mut, pneed_issue, false);
 
   if (pneed_issue == &my_need_issue)
     issue_caps_set(*pneed_issue);
@@ -800,15 +764,20 @@ void Locker::drop_rdlocks_for_early_reply(MutationImpl *mut)
 {
   set<CInode*> need_issue;
 
-  for (auto p = mut->rdlocks.begin(); p != mut->rdlocks.end(); ) {
-    SimpleLock *lock = *p;
-    ++p;
+  for (auto it = mut->locks.begin(); it != mut->locks.end(); ) {
+    if (!it->is_rdlock()) {
+      ++it;
+      continue;
+    }
+    SimpleLock *lock = it->lock;
     // make later mksnap/setlayout (at other mds) wait for this unsafe request
     if (lock->get_type() == CEPH_LOCK_ISNAP ||
-       lock->get_type() == CEPH_LOCK_IPOLICY)
+       lock->get_type() == CEPH_LOCK_IPOLICY) {
+      ++it;
       continue;
+    }
     bool ni = false;
-    rdlock_finish(lock, mut, &ni);
+    rdlock_finish(it++, mut, &ni);
     if (ni)
       need_issue.insert(static_cast<CInode*>(lock->get_parent()));
   }
@@ -1355,8 +1324,7 @@ bool Locker::rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon)
     // can read?  grab ref.
     if (lock->can_rdlock(client)) {
       lock->get_rdlock();
-      mut->rdlocks.insert(lock);
-      mut->locks.insert(lock);
+      mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::RDLOCK);
       return true;
     }
 
@@ -1401,14 +1369,14 @@ void Locker::nudge_log(SimpleLock *lock)
     mds->mdlog->flush();
 }
 
-void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_rdlock());
+  SimpleLock *lock = it->lock;
   // drop ref
   lock->put_rdlock();
-  if (mut) {
-    mut->rdlocks.erase(lock);
-    mut->locks.erase(lock);
-  }
+  if (mut)
+    mut->locks.erase(it);
 
   dout(7) << "rdlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   
@@ -1422,35 +1390,27 @@ void Locker::rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issu
 }
 
 
-bool Locker::can_rdlock_set(set<SimpleLock*>& locks)
+bool Locker::can_rdlock_set(MutationImpl::LockOpVec& lov)
 {
-  dout(10) << "can_rdlock_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!(*p)->can_rdlock(-1)) {
-      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
+  dout(10) << "can_rdlock_set " << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    if (!p.lock->can_rdlock(-1)) {
+      dout(10) << "can_rdlock_set can't rdlock " << *p << " on " << *p.lock->get_parent() << dendl;
       return false;
     }
+  }
   return true;
 }
 
-bool Locker::rdlock_try_set(set<SimpleLock*>& locks)
-{
-  dout(10) << "rdlock_try_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p)
-    if (!rdlock_try(*p, -1, NULL)) {
-      dout(10) << "rdlock_try_set can't rdlock " << *p << " on " << *(*p)->get_parent() << dendl;
-      return false;
-    }
-  return true;
-}
 
-void Locker::rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut)
+void Locker::rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut)
 {
-  dout(10) << "rdlock_take_set " << locks << dendl;
-  for (set<SimpleLock*>::iterator p = locks.begin(); p != locks.end(); ++p) {
-    (*p)->get_rdlock();
-    mut->rdlocks.insert(*p);
-    mut->locks.insert(*p);
+  dout(10) << "rdlock_take_set "  << dendl;
+  for (const auto& p : lov) {
+    ceph_assert(p.is_rdlock());
+    p.lock->get_rdlock();
+    mut->locks.emplace(p.lock, MutationImpl::LockOp::RDLOCK);
   }
 }
 
@@ -1466,8 +1426,7 @@ void Locker::wrlock_force(SimpleLock *lock, MutationRef& mut)
   dout(7) << "wrlock_force  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 }
 
 bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
@@ -1489,8 +1448,8 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
     if (lock->can_wrlock(client) &&
        (!want_scatter || lock->get_state() == LOCK_MIX)) {
       lock->get_wrlock();
-      mut->wrlocks.insert(lock);
-      mut->locks.insert(lock);
+      auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+      it->flags |= MutationImpl::LockOp::WRLOCK; // may already remote_wrlocked
       return true;
     }
 
@@ -1540,19 +1499,22 @@ bool Locker::wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait)
   return false;
 }
 
-void Locker::wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_wrlock());
+  SimpleLock* lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_wrlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_wrlock_finish(it, mut);
 
   dout(7) << "wrlock_finish on " << *lock << " on " << *lock->get_parent() << dendl;
   lock->put_wrlock();
-  if (mut) {
-    mut->wrlocks.erase(lock);
-    if (mut->remote_wrlocks.count(lock) == 0)
-      mut->locks.erase(lock);
-  }
+
+  if (it->is_remote_wrlock())
+    it->clear_wrlock();
+  else
+    mut->locks.erase(it);
 
   if (!lock->is_wrlocked()) {
     if (!lock->is_stable())
@@ -1590,13 +1552,16 @@ void Locker::remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestR
   mut->more()->waiting_on_slave.insert(target);
 }
 
-void Locker::remote_wrlock_finish(SimpleLock *lock, mds_rank_t target,
-                                  MutationImpl *mut)
+void Locker::remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
-  // drop ref
-  mut->remote_wrlocks.erase(lock);
-  if (mut->wrlocks.count(lock) == 0)
-    mut->locks.erase(lock);
+  ceph_assert(it->is_remote_wrlock());
+  SimpleLock *lock = it->lock;
+  mds_rank_t target = it->wrlock_target;
+
+  if (it->is_wrlock())
+    it->clear_remote_wrlock();
+  else
+    mut->locks.erase(it);
   
   dout(7) << "remote_wrlock_finish releasing remote wrlock on mds." << target
          << " " << *lock->get_parent()  << dendl;
@@ -1629,8 +1594,7 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequestRef& mut)
       if (lock->can_xlock(client)) {
        lock->set_state(LOCK_XLOCK);
        lock->get_xlock(mut, client);
-       mut->xlocks.insert(lock);
-       mut->locks.insert(lock);
+       mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
        mut->finish_locking(lock);
        return true;
       }
@@ -1722,11 +1686,14 @@ void Locker::_finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue
   eval_gather(lock, lock->get_state() != LOCK_XLOCKSNAP, pneed_issue);
 }
 
-void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue)
+void Locker::xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
+
   if (lock->get_type() == CEPH_LOCK_IVERSION ||
       lock->get_type() == CEPH_LOCK_DVERSION)
-    return local_xlock_finish(static_cast<LocalLock*>(lock), mut);
+    return local_xlock_finish(it, mut);
 
   dout(10) << "xlock_finish on " << *lock << " " << *lock->get_parent() << dendl;
 
@@ -1735,8 +1702,7 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
   // drop ref
   lock->put_xlock();
   ceph_assert(mut);
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
   
   bool do_issue = false;
 
@@ -1777,13 +1743,14 @@ void Locker::xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue
   }
 }
 
-void Locker::xlock_export(SimpleLock *lock, MutationImpl *mut)
+void Locker::xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  SimpleLock *lock = it->lock;
   dout(10) << "xlock_export on " << *lock << " " << *lock->get_parent() << dendl;
 
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   MDSCacheObject *p = lock->get_parent();
   ceph_assert(p->state_test(CInode::STATE_AMBIGUOUSAUTH));  // we are exporting this (inode)
@@ -4419,8 +4386,7 @@ void Locker::scatter_writebehind(ScatterLock *lock)
 
   // forcefully take a wrlock
   lock->get_wrlock(true);
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
 
   in->pre_cow_old_inode();  // avoid cow mayhem
 
@@ -4758,10 +4724,10 @@ void Locker::local_wrlock_grab(LocalLock *lock, MutationRef& mut)
   
   ceph_assert(lock->get_parent()->is_auth());
   ceph_assert(lock->can_wrlock());
-  ceph_assert(!mut->wrlocks.count(lock));
   lock->get_wrlock(mut->get_client());
-  mut->wrlocks.insert(lock);
-  mut->locks.insert(lock);
+
+  auto ret = mut->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
+  ceph_assert(ret.second);
 }
 
 bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
@@ -4771,10 +4737,9 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   
   ceph_assert(lock->get_parent()->is_auth());
   if (lock->can_wrlock()) {
-    ceph_assert(!mut->wrlocks.count(lock));
     lock->get_wrlock(mut->get_client());
-    mut->wrlocks.insert(lock);
-    mut->locks.insert(lock);
+    auto it = mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::WRLOCK);
+    ceph_assert(it->is_wrlock());
     return true;
   } else {
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
@@ -4782,13 +4747,14 @@ bool Locker::local_wrlock_start(LocalLock *lock, MDRequestRef& mut)
   }
 }
 
-void Locker::local_wrlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_wrlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_wrlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_wrlock();
-  mut->wrlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
   if (lock->get_num_wrlocks() == 0) {
     lock->finish_waiters(SimpleLock::WAIT_STABLE |
                          SimpleLock::WAIT_WR |
@@ -4808,18 +4774,18 @@ bool Locker::local_xlock_start(LocalLock *lock, MDRequestRef& mut)
   }
 
   lock->get_xlock(mut, mut->get_client());
-  mut->xlocks.insert(lock);
-  mut->locks.insert(lock);
+  mut->locks.emplace_hint(mut->locks.end(), lock, MutationImpl::LockOp::XLOCK);
   return true;
 }
 
-void Locker::local_xlock_finish(LocalLock *lock, MutationImpl *mut)
+void Locker::local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut)
 {
+  ceph_assert(it->is_xlock());
+  LocalLock *lock = static_cast<LocalLock*>(it->lock);
   dout(7) << "local_xlock_finish  on " << *lock
          << " on " << *lock->get_parent() << dendl;  
   lock->put_xlock();
-  mut->xlocks.erase(lock);
-  mut->locks.erase(lock);
+  mut->locks.erase(it);
 
   lock->finish_waiters(SimpleLock::WAIT_STABLE | 
                       SimpleLock::WAIT_WR | 
index 1678f5c01047638c050f7f3023639367b04c4366..2b935c11c3a895e5c0ab6b2524c417e1f686d6ed 100644 (file)
@@ -69,18 +69,14 @@ protected:
   void send_lock_message(SimpleLock *lock, int msg, const bufferlist &data);
 
   // -- locks --
-  void _drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue);
-  void _drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue);
+  void _drop_locks(MutationImpl *mut, set<CInode*> *pneed_issue, bool drop_rdlocks);
 public:
-  void include_snap_rdlocks(set<SimpleLock*>& rdlocks, CInode *in);
-  void include_snap_rdlocks_wlayout(set<SimpleLock*>& rdlocks, CInode *in,
-                                    file_layout_t **layout);
+  void include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov);
+  void include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
+                                   file_layout_t **layout);
 
   bool acquire_locks(MDRequestRef& mdr,
-                    set<SimpleLock*> &rdlocks,
-                    set<SimpleLock*> &wrlocks,
-                    set<SimpleLock*> &xlocks,
-                    map<SimpleLock*,mds_rank_t> *remote_wrlocks=NULL,
+                    MutationImpl::LockOpVec& lov,
                     CInode *auth_pin_freeze=NULL,
                     bool auth_pin_nonblock=false);
 
@@ -111,23 +107,22 @@ public:
   bool _rdlock_kick(SimpleLock *lock, bool as_anon);
   bool rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *c);
   bool rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon=false);
-  void rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
-  bool can_rdlock_set(set<SimpleLock*>& locks);
-  bool rdlock_try_set(set<SimpleLock*>& locks);
-  void rdlock_take_set(set<SimpleLock*>& locks, MutationRef& mut);
+  void rdlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
+  bool can_rdlock_set(MutationImpl::LockOpVec& lov);
+  void rdlock_take_set(MutationImpl::LockOpVec& lov, MutationRef& mut);
 
   void wrlock_force(SimpleLock *lock, MutationRef& mut);
   bool wrlock_start(SimpleLock *lock, MDRequestRef& mut, bool nowait=false);
-  void wrlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
+  void wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
 
   void remote_wrlock_start(SimpleLock *lock, mds_rank_t target, MDRequestRef& mut);
-  void remote_wrlock_finish(SimpleLock *lock, mds_rank_t target, MutationImpl *mut);
+  void remote_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
 
   bool xlock_start(SimpleLock *lock, MDRequestRef& mut);
   void _finish_xlock(SimpleLock *lock, client_t xlocker, bool *pneed_issue);
-  void xlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
+  void xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut, bool *pneed_issue);
 
-  void xlock_export(SimpleLock *lock, MutationImpl *mut);
+  void xlock_export(const MutationImpl::lock_iterator& it, MutationImpl *mut);
   void xlock_import(SimpleLock *lock);
 
 
@@ -221,9 +216,9 @@ public:
   void local_wrlock_grab(LocalLock *lock, MutationRef& mut);
 protected:
   bool local_wrlock_start(LocalLock *lock, MDRequestRef& mut);
-  void local_wrlock_finish(LocalLock *lock, MutationImpl *mut);
+  void local_wrlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
   bool local_xlock_start(LocalLock *lock, MDRequestRef& mut);
-  void local_xlock_finish(LocalLock *lock, MutationImpl *mut);
+  void local_xlock_finish(const MutationImpl::lock_iterator& it, MutationImpl *mut);
 
 
   // file
index a115fdbdb0e17f09ca4ff154de4a8ef9fb76f886..462bd2957f1835c48b301c4acee7c43803ac0f70 100644 (file)
@@ -2141,8 +2141,8 @@ void MDCache::predirty_journal_parents(MutationRef mut, EMetaBlob *blob,
     pf->version = parent->pre_dirty();
 
     if (do_parent_mtime || linkunlink) {
-      ceph_assert(mut->wrlocks.count(&pin->filelock));
-      ceph_assert(mut->wrlocks.count(&pin->nestlock));
+      ceph_assert(mut->is_wrlocked(&pin->filelock));
+      ceph_assert(mut->is_wrlocked(&pin->nestlock));
       ceph_assert(cfollows == CEPH_NOSNAP);
       
       // update stale fragstat/rstat?
@@ -2188,7 +2188,7 @@ void MDCache::predirty_journal_parents(MutationRef mut, EMetaBlob *blob,
        ceph_assert(pin->nestlock.get_num_wrlocks() || mut->is_slave());
       }
 
-      if (mut->wrlocks.count(&pin->nestlock) == 0) {
+      if (!mut->is_wrlocked(&pin->nestlock)) {
        dout(10) << " taking wrlock on " << pin->nestlock << " on " << *pin << dendl;
        mds->locker->wrlock_force(&pin->nestlock, mut);
       }
@@ -2233,7 +2233,7 @@ void MDCache::predirty_journal_parents(MutationRef mut, EMetaBlob *blob,
     // can cast only because i'm passing nowait=true in the sole user
     MDRequestRef mdmut = static_cast<MDRequestImpl*>(mut.get());
     if (!stop &&
-       mut->wrlocks.count(&pin->nestlock) == 0 &&
+       !mut->is_wrlocked(&pin->nestlock) &&
        (!pin->versionlock.can_wrlock() ||                   // make sure we can take versionlock, too
         //true
         !mds->locker->wrlock_start(&pin->nestlock, mdmut, true)
@@ -2254,11 +2254,10 @@ void MDCache::predirty_journal_parents(MutationRef mut, EMetaBlob *blob,
       }
       break;
     }
-    if (!mut->wrlocks.count(&pin->versionlock))
+    if (!mut->is_wrlocked(&pin->versionlock))
       mds->locker->local_wrlock_grab(&pin->versionlock, mut);
 
-    ceph_assert(mut->wrlocks.count(&pin->nestlock) ||
-          mut->is_slave());
+    ceph_assert(mut->is_wrlocked(&pin->nestlock) || mut->is_slave());
     
     pin->last_dirstat_prop = mut->get_mds_stamp();
 
@@ -4092,63 +4091,56 @@ void MDCache::rejoin_send_rejoins()
       if (mdr->is_slave())
        continue;
       // auth pins
-      for (map<MDSCacheObject*,mds_rank_t>::iterator q = mdr->remote_auth_pins.begin();
-          q != mdr->remote_auth_pins.end();
-          ++q) {
-       if (!q->first->is_auth()) {
-         ceph_assert(q->second == q->first->authority().first);
-         if (rejoins.count(q->second) == 0) continue;
-         const MMDSCacheRejoin::ref &rejoin = rejoins[q->second];
+      for (const auto& q : mdr->remote_auth_pins) {
+       if (!q.first->is_auth()) {
+         ceph_assert(q.second == q.first->authority().first);
+         if (rejoins.count(q.second) == 0) continue;
+         const MMDSCacheRejoin::ref &rejoin = rejoins[q.second];
          
-         dout(15) << " " << *mdr << " authpin on " << *q->first << dendl;
+         dout(15) << " " << *mdr << " authpin on " << *q.first << dendl;
          MDSCacheObjectInfo i;
-         q->first->set_object_info(i);
+         q.first->set_object_info(i);
          if (i.ino)
            rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), mdr->reqid, mdr->attempt);
          else
            rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, mdr->reqid, mdr->attempt);
 
          if (mdr->has_more() && mdr->more()->is_remote_frozen_authpin &&
-             mdr->more()->rename_inode == q->first)
+             mdr->more()->rename_inode == q.first)
            rejoin->add_inode_frozen_authpin(vinodeno_t(i.ino, i.snapid),
                                             mdr->reqid, mdr->attempt);
        }
       }
       // xlocks
-      for (set<SimpleLock*>::iterator q = mdr->xlocks.begin();
-          q != mdr->xlocks.end();
-          ++q) {
-       if (!(*q)->get_parent()->is_auth()) {
-         mds_rank_t who = (*q)->get_parent()->authority().first;
+      for (const auto& q : mdr->locks) {
+       auto lock = q.lock;
+       auto obj = lock->get_parent();
+       if (q.is_xlock() && !obj->is_auth()) {
+         mds_rank_t who = obj->authority().first;
          if (rejoins.count(who) == 0) continue;
          const MMDSCacheRejoin::ref &rejoin = rejoins[who];
          
-         dout(15) << " " << *mdr << " xlock on " << **q << " " << *(*q)->get_parent() << dendl;
+         dout(15) << " " << *mdr << " xlock on " << *lock << " " << *obj << dendl;
          MDSCacheObjectInfo i;
-         (*q)->get_parent()->set_object_info(i);
+         obj->set_object_info(i);
          if (i.ino)
-           rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(),
+           rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), lock->get_type(),
                                    mdr->reqid, mdr->attempt);
          else
            rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid,
                                     mdr->reqid, mdr->attempt);
-       }
-      }
-      // remote wrlocks
-      for (map<SimpleLock*, mds_rank_t>::iterator q = mdr->remote_wrlocks.begin();
-          q != mdr->remote_wrlocks.end();
-          ++q) {
-       mds_rank_t who = q->second;
-       if (rejoins.count(who) == 0) continue;
-       const MMDSCacheRejoin::ref &rejoin = rejoins[who];
+       } else if (q.is_remote_wrlock()) {
+         mds_rank_t who = q.wrlock_target;
+         if (rejoins.count(who) == 0) continue;
+         const MMDSCacheRejoin::ref &rejoin = rejoins[who];
 
-       dout(15) << " " << *mdr << " wrlock on " << q->second
-                << " " << q->first->get_parent() << dendl;
-       MDSCacheObjectInfo i;
-       q->first->get_parent()->set_object_info(i);
-       ceph_assert(i.ino);
-       rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), q->first->get_type(),
-                                mdr->reqid, mdr->attempt);
+         dout(15) << " " << *mdr << " wrlock on " << *lock << " " << *obj << dendl;
+         MDSCacheObjectInfo i;
+         obj->set_object_info(i);
+         ceph_assert(i.ino);
+         rejoin->add_inode_wrlock(vinodeno_t(i.ino, i.snapid), lock->get_type(),
+                                  mdr->reqid, mdr->attempt);
+       }
       }
     }
   }
@@ -4752,26 +4744,24 @@ void MDCache::handle_cache_rejoin_strong(const MMDSCacheRejoin::const_ref &stron
        }
 
         // dn xlock?
-        const auto xlocked_dentries_it = strong->xlocked_dentries.find(dirfrag);
-        if (xlocked_dentries_it != strong->xlocked_dentries.end()) {
-          const auto ss_req_it = xlocked_dentries_it->second.find(ss);
-          if (ss_req_it != xlocked_dentries_it->second.end()) {
+        const auto xlocked_it = strong->xlocked_dentries.find(dirfrag);
+        if (xlocked_it != strong->xlocked_dentries.end()) {
+          const auto ss_req_it = xlocked_it->second.find(ss);
+          if (ss_req_it != xlocked_it->second.end()) {
            const MMDSCacheRejoin::slave_reqid& r = ss_req_it->second;
            dout(10) << " dn xlock by " << r << " on " << *dn << dendl;
            MDRequestRef mdr = request_get(r.reqid);  // should have this from auth_pin above.
            ceph_assert(mdr->is_auth_pinned(dn));
-           if (!mdr->xlocks.count(&dn->versionlock)) {
+           if (!mdr->is_xlocked(&dn->versionlock)) {
              ceph_assert(dn->versionlock.can_xlock_local());
              dn->versionlock.get_xlock(mdr, mdr->get_client());
-             mdr->xlocks.insert(&dn->versionlock);
-             mdr->locks.insert(&dn->versionlock);
+             mdr->locks.emplace(&dn->versionlock, MutationImpl::LockOp::XLOCK);
            }
            if (dn->lock.is_stable())
              dn->auth_pin(&dn->lock);
            dn->lock.set_state(LOCK_XLOCK);
            dn->lock.get_xlock(mdr, mdr->get_client());
-           mdr->xlocks.insert(&dn->lock);
-           mdr->locks.insert(&dn->lock);
+           mdr->locks.emplace(&dn->lock, MutationImpl::LockOp::XLOCK);
           }
         }
 
@@ -4860,11 +4850,10 @@ void MDCache::handle_cache_rejoin_strong(const MMDSCacheRejoin::const_ref &stron
        dout(10) << " inode xlock by " << q.second << " on " << *lock << " on " << *in << dendl;
        MDRequestRef mdr = request_get(q.second.reqid);  // should have this from auth_pin above.
        ceph_assert(mdr->is_auth_pinned(in));
-       if (!mdr->xlocks.count(&in->versionlock)) {
+       if (!mdr->is_xlocked(&in->versionlock)) {
          ceph_assert(in->versionlock.can_xlock_local());
          in->versionlock.get_xlock(mdr, mdr->get_client());
-         mdr->xlocks.insert(&in->versionlock);
-         mdr->locks.insert(&in->versionlock);
+         mdr->locks.emplace(&in->versionlock, MutationImpl::LockOp::XLOCK);
        }
        if (lock->is_stable())
          in->auth_pin(lock);
@@ -4872,8 +4861,7 @@ void MDCache::handle_cache_rejoin_strong(const MMDSCacheRejoin::const_ref &stron
        if (lock == &in->filelock)
          in->loner_cap = -1;
        lock->get_xlock(mdr, mdr->get_client());
-       mdr->xlocks.insert(lock);
-       mdr->locks.insert(lock);
+       mdr->locks.emplace(lock, MutationImpl::LockOp::XLOCK);
       }
     }
   }
@@ -4891,8 +4879,7 @@ void MDCache::handle_cache_rejoin_strong(const MMDSCacheRejoin::const_ref &stron
        if (lock == &in->filelock)
          in->loner_cap = -1;
        lock->get_wrlock(true);
-       mdr->wrlocks.insert(lock);
-       mdr->locks.insert(lock);
+       mdr->locks.emplace(lock, MutationImpl::LockOp::WRLOCK);
       }
     }
   }
@@ -9344,26 +9331,26 @@ void MDCache::request_drop_foreign_locks(MDRequestRef& mdr)
    * implicitly. Note that we don't call the finishers -- there shouldn't
    * be any on a remote lock and the request finish wakes up all
    * the waiters anyway! */
-  set<SimpleLock*>::iterator p = mdr->xlocks.begin();
-  while (p != mdr->xlocks.end()) {
-    if ((*p)->get_parent()->is_auth()) 
-      ++p;
-    else {
-      dout(10) << "request_drop_foreign_locks forgetting lock " << **p
-              << " on " << *(*p)->get_parent() << dendl;
-      (*p)->put_xlock();
-      mdr->locks.erase(*p);
-      mdr->xlocks.erase(p++);
-    }
-  }
 
-  map<SimpleLock*, mds_rank_t>::iterator q = mdr->remote_wrlocks.begin();
-  while (q != mdr->remote_wrlocks.end()) {
-    dout(10) << "request_drop_foreign_locks forgetting remote_wrlock " << *q->first
-            << " on mds." << q->second
-            << " on " << *(q->first)->get_parent() << dendl;
-    mdr->locks.erase(q->first);
-    mdr->remote_wrlocks.erase(q++);
+  for (auto it = mdr->locks.begin(); it != mdr->locks.end(); ) {
+    SimpleLock *lock = it->lock;
+    if (it->is_xlock() && !lock->get_parent()->is_auth()) {
+      dout(10) << "request_drop_foreign_locks forgetting lock " << *lock
+              << " on " << lock->get_parent() << dendl;
+      lock->put_xlock();
+      mdr->locks.erase(it++);
+    } else if (it->is_remote_wrlock()) {
+      dout(10) << "request_drop_foreign_locks forgetting remote_wrlock " << *lock
+              << " on mds." << it->wrlock_target << " on " << *lock->get_parent() << dendl;
+      if (it->is_wrlock()) {
+       it->clear_remote_wrlock();
+       ++it;
+      } else {
+       mdr->locks.erase(it++);
+      }
+    } else {
+      ++it;
+    }
   }
 
   mdr->more()->slaves.clear(); /* we no longer have requests out to them, and
@@ -9400,10 +9387,7 @@ void MDCache::request_cleanup(MDRequestRef& mdr)
   mdr->drop_local_auth_pins();
 
   // drop stickydirs
-  for (set<CInode*>::iterator p = mdr->stickydirs.begin();
-       p != mdr->stickydirs.end();
-       ++p) 
-    (*p)->put_stickydirs();
+  mdr->put_stickydirs();
 
   mds->locker->kick_cap_releases(mdr);
 
@@ -11393,12 +11377,12 @@ void MDCache::dispatch_fragment_dir(MDRequestRef& mdr)
   dout(10) << "dispatch_fragment_dir " << basedirfrag << " bits " << info.bits
           << " on " << *diri << dendl;
   if (!mdr->aborted) {
-    set<SimpleLock*> rdlocks, wrlocks, xlocks;
-    wrlocks.insert(&diri->dirfragtreelock);
+    MutationImpl::LockOpVec lov;
+    lov.add_wrlock(&diri->dirfragtreelock);
     // prevent a racing gather on any other scatterlocks too
-    wrlocks.insert(&diri->nestlock);
-    wrlocks.insert(&diri->filelock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true))
+    lov.add_wrlock(&diri->nestlock);
+    lov.add_wrlock(&diri->filelock);
+    if (!mds->locker->acquire_locks(mdr, lov, NULL, true))
       if (!mdr->aborted)
        return;
   }
@@ -12238,15 +12222,15 @@ void MDCache::enqueue_scrub(
 
 void MDCache::enqueue_scrub_work(MDRequestRef& mdr)
 {
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  MutationImpl::LockOpVec lov;
+  CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, lov, true);
   if (NULL == in)
     return;
 
   // TODO: Remove this restriction
   ceph_assert(in->is_auth());
 
-  bool locked = mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks);
+  bool locked = mds->locker->acquire_locks(mdr, lov);
   if (!locked)
     return;
 
@@ -12376,12 +12360,12 @@ void MDCache::repair_dirfrag_stats_work(MDRequestRef& mdr)
 
   mdr->auth_pin(dir);
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
   CInode *diri = dir->inode;
-  rdlocks.insert(&diri->dirfragtreelock);
-  wrlocks.insert(&diri->nestlock);
-  wrlocks.insert(&diri->filelock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->dirfragtreelock);
+  lov.add_wrlock(&diri->nestlock);
+  lov.add_wrlock(&diri->filelock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!dir->is_complete()) {
@@ -12473,16 +12457,16 @@ void MDCache::repair_inode_stats_work(MDRequestRef& mdr)
     return;
   }
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
   std::list<frag_t> frags;
 
   if (mdr->ls) // already marked filelock/nestlock dirty ?
     goto do_rdlocks;
 
-  rdlocks.insert(&diri->dirfragtreelock);
-  wrlocks.insert(&diri->nestlock);
-  wrlocks.insert(&diri->filelock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->dirfragtreelock);
+  lov.add_wrlock(&diri->nestlock);
+  lov.add_wrlock(&diri->filelock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   // Fetch all dirfrags and mark filelock/nestlock dirty. This will tirgger
@@ -12512,11 +12496,11 @@ void MDCache::repair_inode_stats_work(MDRequestRef& mdr)
 
 do_rdlocks:
   // force the scatter-gather process
-  rdlocks.insert(&diri->dirfragtreelock);
-  rdlocks.insert(&diri->nestlock);
-  rdlocks.insert(&diri->filelock);
-  wrlocks.clear();
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.clear();
+  lov.add_rdlock(&diri->dirfragtreelock);
+  lov.add_rdlock(&diri->nestlock);
+  lov.add_rdlock(&diri->filelock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   diri->state_clear(CInode::STATE_REPAIRSTATS);
@@ -12564,12 +12548,12 @@ void MDCache::upgrade_inode_snaprealm_work(MDRequestRef& mdr)
     return;
   }
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  mds->locker->include_snap_rdlocks(rdlocks, in);
-  rdlocks.erase(&in->snaplock);
-  xlocks.insert(&in->snaplock);
+  MutationImpl::LockOpVec lov;
+  mds->locker->include_snap_rdlocks(in, lov);
+  lov.erase_rdlock(&in->snaplock);
+  lov.add_xlock(&in->snaplock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   // project_snaprealm() upgrades snaprealm format
@@ -12619,14 +12603,14 @@ public:
 
 void MDCache::flush_dentry_work(MDRequestRef& mdr)
 {
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  MutationImpl::LockOpVec lov;
+  CInode *in = mds->server->rdlock_path_pin_ref(mdr, 0, lov, true);
   if (NULL == in)
     return;
 
   // TODO: Is this necessary? Fix it if so
   ceph_assert(in->is_auth());
-  bool locked = mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks);
+  bool locked = mds->locker->acquire_locks(mdr, lov);
   if (!locked)
     return;
   in->flush(new C_FinishIOMDR(mds, mdr));
index 3c6473def58fcbffedc362c8fbb069864a798766..6e9b6c66331738163f2f213e5c5e4ffa61c35ad6 100644 (file)
@@ -732,18 +732,22 @@ public:
 };
 
 
-void Migrator::get_export_lock_set(CDir *dir, set<SimpleLock*>& locks)
+void Migrator::get_export_lock_set(CDir *dir, MutationImpl::LockOpVec& lov)
 {
   // path
   vector<CDentry*> trace;
   cache->make_trace(trace, dir->inode);
-  for (vector<CDentry*>::iterator it = trace.begin();
-       it != trace.end();
-       ++it)
-    locks.insert(&(*it)->lock);
+
+  set<CDir*> wouldbe_bounds;
+  cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
+
+  lov.reserve(trace.size() + wouldbe_bounds.size() + 8);
+
+  for (auto& dn : trace)
+    lov.add_rdlock(&dn->lock);
 
   // prevent scatter gather race
-  locks.insert(&dir->get_inode()->dirfragtreelock);
+  lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
 
   // bound dftlocks:
   // NOTE: We need to take an rdlock on bounding dirfrags during
@@ -754,10 +758,11 @@ void Migrator::get_export_lock_set(CDir *dir, set<SimpleLock*>& locks)
   //  redivvy it up.  And it's needed for the scatterlocks to work
   //  properly: when the auth is in a sync/lock state it keeps each
   //  dirfrag's portion in the local (auth OR replica) dirfrag.
-  set<CDir*> wouldbe_bounds;
-  cache->get_wouldbe_subtree_bounds(dir, wouldbe_bounds);
-  for (set<CDir*>::iterator p = wouldbe_bounds.begin(); p != wouldbe_bounds.end(); ++p)
-    locks.insert(&(*p)->get_inode()->dirfragtreelock);
+  for (auto& dir : wouldbe_bounds)
+    lov.add_rdlock(&dir->get_inode()->dirfragtreelock);
+
+  // above code may add duplicated locks
+  lov.sort_and_merge();
 }
 
 
@@ -1057,10 +1062,8 @@ void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
   }
 
   // locks?
-  set<SimpleLock*> rdlocks;
-  set<SimpleLock*> xlocks;
-  set<SimpleLock*> wrlocks;
-  get_export_lock_set(dir, rdlocks);
+  MutationImpl::LockOpVec lov;
+  get_export_lock_set(dir, lov);
   // If auth MDS of the subtree root inode is neither the exporter MDS
   // nor the importer MDS and it gathers subtree root's fragstat/neststat
   // while the subtree is exporting. It's possible that the exporter MDS
@@ -1068,14 +1071,14 @@ void Migrator::dispatch_export_dir(MDRequestRef& mdr, int count)
   // are not auth MDS of the subtree root at the time they receive the
   // lock messages. So the auth MDS of the subtree root inode may get no
   // or duplicated fragstat/neststat for the subtree root dirfrag.
-  wrlocks.insert(&dir->get_inode()->filelock);
-  wrlocks.insert(&dir->get_inode()->nestlock);
+  lov.add_wrlock(&dir->get_inode()->filelock);
+  lov.add_wrlock(&dir->get_inode()->nestlock);
   if (dir->get_inode()->is_auth()) {
     dir->get_inode()->filelock.set_scatter_wanted();
     dir->get_inode()->nestlock.set_scatter_wanted();
   }
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks, NULL, NULL, true)) {
+  if (!mds->locker->acquire_locks(mdr, lov, NULL, true)) {
     if (mdr->aborted)
       export_try_cancel(dir);
     return;
@@ -1348,10 +1351,10 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
   CInode *diri = dir->get_inode();
 
   // ok, try to grab all my locks.
-  set<SimpleLock*> rdlocks;
-  get_export_lock_set(dir, rdlocks);
+  MutationImpl::LockOpVec lov;
+  get_export_lock_set(dir, lov);
   if ((diri->is_auth() && diri->is_frozen()) ||
-      !mds->locker->can_rdlock_set(rdlocks) ||
+      !mds->locker->can_rdlock_set(lov) ||
       !diri->filelock.can_wrlock(-1) ||
       !diri->nestlock.can_wrlock(-1)) {
     dout(7) << "export_dir couldn't acquire all needed locks, failing. "
@@ -1363,7 +1366,7 @@ void Migrator::export_frozen(CDir *dir, uint64_t tid)
   it->second.mut = new MutationImpl();
   if (diri->is_auth())
     it->second.mut->auth_pin(diri);
-  mds->locker->rdlock_take_set(rdlocks, it->second.mut);
+  mds->locker->rdlock_take_set(lov, it->second.mut);
   mds->locker->wrlock_force(&diri->filelock, it->second.mut);
   mds->locker->wrlock_force(&diri->nestlock, it->second.mut);
 
index 0dfb78b17a7ed6a32fc9042195339a6be42af75e..de1638d776a5484dc1b0b9adfde6ade3b289cd3e 100644 (file)
@@ -324,7 +324,7 @@ public:
   void adjust_export_after_rename(CInode* diri, CDir *olddir);
   void child_export_finish(std::shared_ptr<export_base_t>& parent, bool success);
 
-  void get_export_lock_set(CDir *dir, set<SimpleLock*>& locks);
+  void get_export_lock_set(CDir *dir, MutationImpl::LockOpVec& lov);
   void get_export_client_set(CDir *dir, set<client_t> &client_set);
   void get_export_client_set(CInode *in, set<client_t> &client_set);
 
index 6c4660ec7fa81e209ae3ff02d7afb11961f2b9c6..becb040bb5d385de95890c44a40125f443004695 100644 (file)
@@ -35,18 +35,27 @@ void MutationImpl::unpin(MDSCacheObject *o)
 
 void MutationImpl::set_stickydirs(CInode *in)
 {
-  if (stickydirs.count(in) == 0) {
+  if (!stickydiri || stickydiri != in) {
     in->get_stickydirs();
-    stickydirs.insert(in);
+    if (stickydiri)
+      stickydiri->put_stickydirs();
+    stickydiri = in;
+  }
+}
+
+void MutationImpl::put_stickydirs()
+{
+  if (stickydiri) {
+    stickydiri->put_stickydirs();
+    stickydiri = nullptr;
+
   }
 }
 
 void MutationImpl::drop_pins()
 {
-  for (set<MDSCacheObject*>::iterator it = pins.begin();
-       it != pins.end();
-       ++it) 
-    (*it)->put(MDSCacheObject::PIN_REQUEST);
+  for (auto& o : pins)
+    o->put(MDSCacheObject::PIN_REQUEST);
   pins.clear();
 }
 
@@ -65,6 +74,50 @@ void MutationImpl::finish_locking(SimpleLock *lock)
   locking_target_mds = -1;
 }
 
+void MutationImpl::LockOpVec::erase_rdlock(SimpleLock* lock)
+{
+  for (int i = size() - 1; i >= 0; --i) {
+    auto& op = (*this)[i];
+    if (op.lock == lock && op.is_rdlock()) {
+      erase(begin() + i);
+      return;
+    }
+  }
+}
+
+void MutationImpl::LockOpVec::sort_and_merge()
+{
+  std::sort(begin(), end(), SimpleLock::ptr_lt());
+  // merge ops on the same lock
+  for (auto i = end() - 1; i > begin(); ) {
+    auto j = i;
+    while (--j >= begin()) {
+      if (i->lock != j->lock)
+       break;
+    }
+    if (i - j == 1) {
+      i = j;
+      continue;
+    }
+
+    // merge
+    ++j;
+    for (auto k = i; k > j; --k) {
+      if (k->is_remote_wrlock()) {
+       ceph_assert(!j->is_remote_wrlock());
+       j->wrlock_target = k->wrlock_target;
+      }
+      j->flags |= k->flags;
+    }
+    if (j->is_xlock()) {
+      // xlock overwrites other types
+      ceph_assert(!j->is_remote_wrlock());
+      j->flags = MutationImpl::LockOp::XLOCK;
+    }
+    erase(j + 1, i + 1);
+    i = j - 1;
+  }
+}
 
 // auth pins
 bool MutationImpl::is_auth_pinned(MDSCacheObject *object) const
@@ -89,11 +142,9 @@ void MutationImpl::auth_unpin(MDSCacheObject *object)
 
 void MutationImpl::drop_local_auth_pins()
 {
-  for (set<MDSCacheObject*>::iterator it = auth_pins.begin();
-       it != auth_pins.end();
-       ++it) {
-    ceph_assert((*it)->is_auth());
-    (*it)->auth_unpin(this);
+  for (const auto& p : auth_pins) {
+    ceph_assert(p->is_auth());
+    p->auth_unpin(this);
   }
   auth_pins.clear();
 }
index 04464562335760df6a2c151a7d05262eed2461da..25e29366a002ad6a8853ad10f0a0142c3801cb6b 100644 (file)
@@ -54,18 +54,80 @@ public:
   // -- my pins and locks --
   // cache pins (so things don't expire)
   set< MDSCacheObject* > pins;
-  set<CInode*> stickydirs;
+  CInode* stickydiri = nullptr;
 
   // auth pins
   map<MDSCacheObject*, mds_rank_t> remote_auth_pins;
-  set< MDSCacheObject* > auth_pins;
+  set<MDSCacheObject*> auth_pins;
   
   // held locks
-  set< SimpleLock* > rdlocks;  // always local.
-  set< SimpleLock* > wrlocks;  // always local.
-  map< SimpleLock*, mds_rank_t > remote_wrlocks;
-  set< SimpleLock* > xlocks;   // local or remote.
-  set< SimpleLock*, SimpleLock::ptr_lt > locks;  // full ordering
+  struct LockOp {
+    enum {
+      RDLOCK           = 1,
+      WRLOCK           = 2,
+      XLOCK            = 4,
+      REMOTE_WRLOCK    = 8,
+    };
+    SimpleLock* lock;
+    mutable unsigned flags;
+    mutable mds_rank_t wrlock_target;
+    operator SimpleLock*() const {
+      return lock;
+    }
+    LockOp(SimpleLock *l, unsigned f=0, mds_rank_t t=MDS_RANK_NONE) :
+      lock(l), flags(f), wrlock_target(t) {}
+    bool is_rdlock() const { return !!(flags & RDLOCK); }
+    bool is_xlock() const { return !!(flags & XLOCK); }
+    bool is_wrlock() const { return !!(flags & WRLOCK); }
+    void clear_wrlock() const { flags &= ~WRLOCK; }
+    bool is_remote_wrlock() const { return !!(flags & REMOTE_WRLOCK); }
+    void clear_remote_wrlock() const {
+      flags &= ~REMOTE_WRLOCK;
+      wrlock_target = MDS_RANK_NONE;
+    }
+  };
+
+  struct LockOpVec : public vector<LockOp> {
+    void add_rdlock(SimpleLock *lock) {
+      emplace_back(lock, LockOp::RDLOCK);
+    }
+    void erase_rdlock(SimpleLock *lock);
+    void add_xlock(SimpleLock *lock) {
+      emplace_back(lock, LockOp::XLOCK);
+    }
+    void add_wrlock(SimpleLock *lock) {
+      emplace_back(lock, LockOp::WRLOCK);
+    }
+    void add_remote_wrlock(SimpleLock *lock, mds_rank_t rank) {
+      ceph_assert(rank != MDS_RANK_NONE);
+      emplace_back(lock, LockOp::REMOTE_WRLOCK, rank);
+    }
+    void sort_and_merge();
+
+    LockOpVec() {
+      reserve(32);
+    }
+  };
+  typedef set<LockOp, SimpleLock::ptr_lt> lock_set;
+  typedef lock_set::iterator lock_iterator;
+  lock_set locks;  // full ordering
+
+  bool is_rdlocked(SimpleLock *lock) const {
+    auto it = locks.find(lock);
+    return it != locks.end() && it->is_rdlock();
+  }
+  bool is_xlocked(SimpleLock *lock) const {
+    auto it = locks.find(lock);
+    return it != locks.end() && it->is_xlock();
+  }
+  bool is_wrlocked(SimpleLock *lock) const {
+    auto it = locks.find(lock);
+    return it != locks.end() && it->is_wrlock();
+  }
+  bool is_remote_wrlocked(SimpleLock *lock) const {
+    auto it = locks.find(lock);
+    return it != locks.end() && it->is_remote_wrlock();
+  }
 
   // lock we are currently trying to acquire.  if we give up for some reason,
   // be sure to eval() this.
@@ -98,10 +160,6 @@ public:
     ceph_assert(locking == NULL);
     ceph_assert(pins.empty());
     ceph_assert(auth_pins.empty());
-    ceph_assert(xlocks.empty());
-    ceph_assert(rdlocks.empty());
-    ceph_assert(wrlocks.empty());
-    ceph_assert(remote_wrlocks.empty());
   }
 
   bool is_master() const { return slave_to_mds == MDS_RANK_NONE; }
@@ -132,6 +190,7 @@ public:
   void pin(MDSCacheObject *o);
   void unpin(MDSCacheObject *o);
   void set_stickydirs(CInode *in);
+  void put_stickydirs();
   void drop_pins();
 
   void start_locking(SimpleLock *lock, int target=-1);
index 72efb1752cd004a746164798b241087096fd9c7f..ae49000e57fe68174d32d91b4386817542ce70b6 100644 (file)
@@ -2191,8 +2191,7 @@ void Server::handle_slave_request_reply(const MMDSSlaveRequest::const_ref &m)
       mdr->more()->slaves.insert(from);
       lock->decode_locked_state(m->get_lock_data());
       dout(10) << "got remote xlock on " << *lock << " on " << *lock->get_parent() << dendl;
-      mdr->xlocks.insert(lock);
-      mdr->locks.insert(lock);
+      mdr->locks.emplace_hint(mdr->locks.end(), lock, MutationImpl::LockOp::XLOCK);
       mdr->finish_locking(lock);
       lock->get_xlock(mdr, mdr->get_client());
 
@@ -2210,8 +2209,11 @@ void Server::handle_slave_request_reply(const MMDSSlaveRequest::const_ref &m)
                                               m->get_object_info());
       mdr->more()->slaves.insert(from);
       dout(10) << "got remote wrlock on " << *lock << " on " << *lock->get_parent() << dendl;
-      mdr->remote_wrlocks[lock] = from;
-      mdr->locks.insert(lock);
+      auto it = mdr->locks.emplace_hint(mdr->locks.end(),
+                                       lock, MutationImpl::LockOp::REMOTE_WRLOCK, from);
+      ceph_assert(it->is_remote_wrlock());
+      ceph_assert(it->wrlock_target == from);
+
       mdr->finish_locking(lock);
 
       ceph_assert(mdr->more()->waiting_on_slave.count(from));
@@ -2277,23 +2279,27 @@ void Server::dispatch_slave_request(MDRequestRef& mdr)
                 << *lock << " on " << *lock->get_parent() << dendl;
       } else {
        // use acquire_locks so that we get auth_pinning.
-       set<SimpleLock*> rdlocks;
-       set<SimpleLock*> wrlocks = mdr->wrlocks;
-       set<SimpleLock*> xlocks = mdr->xlocks;
+       MutationImpl::LockOpVec lov;
+       for (const auto& p : mdr->locks) {
+         if (p.is_xlock())
+           lov.add_xlock(p.lock);
+         else if (p.is_wrlock())
+           lov.add_wrlock(p.lock);
+       }
 
        int replycode = 0;
        switch (op) {
        case MMDSSlaveRequest::OP_XLOCK:
-         xlocks.insert(lock);
+         lov.add_xlock(lock);
          replycode = MMDSSlaveRequest::OP_XLOCKACK;
          break;
        case MMDSSlaveRequest::OP_WRLOCK:
-         wrlocks.insert(lock);
+         lov.add_wrlock(lock);
          replycode = MMDSSlaveRequest::OP_WRLOCKACK;
          break;
        }
        
-       if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+       if (!mds->locker->acquire_locks(mdr, lov))
          return;
        
        // ack
@@ -2316,13 +2322,15 @@ void Server::dispatch_slave_request(MDRequestRef& mdr)
       SimpleLock *lock = mds->locker->get_lock(mdr->slave_request->get_lock_type(),
                                               mdr->slave_request->get_object_info());
       ceph_assert(lock);
+      auto it = mdr->locks.find(lock);
+      ceph_assert(it != mdr->locks.end());
       bool need_issue = false;
       switch (op) {
       case MMDSSlaveRequest::OP_UNXLOCK:
-       mds->locker->xlock_finish(lock, mdr.get(), &need_issue);
+       mds->locker->xlock_finish(it, mdr.get(), &need_issue);
        break;
       case MMDSSlaveRequest::OP_UNWRLOCK:
-       mds->locker->wrlock_finish(lock, mdr.get(), &need_issue);
+       mds->locker->wrlock_finish(it, mdr.get(), &need_issue);
        break;
       }
       if (need_issue)
@@ -2466,13 +2474,11 @@ void Server::handle_slave_auth_pin(MDRequestRef& mdr)
   auto reply = MMDSSlaveRequest::create(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
   
   // return list of my auth_pins (if any)
-  for (set<MDSCacheObject*>::iterator p = mdr->auth_pins.begin();
-       p != mdr->auth_pins.end();
-       ++p) {
+  for (const auto &p : mdr->auth_pins) {
     MDSCacheObjectInfo info;
-    (*p)->set_object_info(info);
+    p->set_object_info(info);
     reply->get_authpins().push_back(info);
-    if (*p == (MDSCacheObject*)auth_pin_freeze)
+    if (p == (MDSCacheObject*)auth_pin_freeze)
       auth_pin_freeze->set_object_info(reply->get_authpin_freeze());
   }
 
@@ -2517,7 +2523,7 @@ void Server::handle_slave_auth_pin_ack(MDRequestRef& mdr, const MMDSSlaveRequest
   }
 
   // removed auth pins?
-  map<MDSCacheObject*, mds_rank_t>::iterator p = mdr->remote_auth_pins.begin();
+  auto p = mdr->remote_auth_pins.begin();
   while (p != mdr->remote_auth_pins.end()) {
     MDSCacheObject* object = p->first;
     if (p->second == from && pinned.count(object) == 0) {
@@ -2914,7 +2920,7 @@ CDir *Server::traverse_to_auth_dir(MDRequestRef& mdr, vector<CDentry*> &trace, f
 /* If this returns null, the request has been handled
  * as appropriate: forwarded on, or the client's been replied to */
 CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
-                                   set<SimpleLock*> &rdlocks,
+                                   MutationImpl::LockOpVec& lov,
                                    bool want_auth,
                                    bool no_want_auth, /* for readdir, who doesn't want auth _even_if_ it's
                                                          a snapped dir */
@@ -2996,11 +3002,11 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
   }
 
   for (int i=0; i<(int)mdr->dn[n].size(); i++) 
-    rdlocks.insert(&mdr->dn[n][i]->lock);
+    lov.add_rdlock(&mdr->dn[n][i]->lock);
   if (layout)
-    mds->locker->include_snap_rdlocks_wlayout(rdlocks, ref, layout);
+    mds->locker->include_snap_rdlocks_wlayout(ref, lov, layout);
   else
-    mds->locker->include_snap_rdlocks(rdlocks, ref);
+    mds->locker->include_snap_rdlocks(ref, lov);
 
   // set and pin ref
   mdr->pin(ref);
@@ -3015,7 +3021,7 @@ CInode* Server::rdlock_path_pin_ref(MDRequestRef& mdr, int n,
  * get rdlocks on traversed dentries, xlock on new dentry.
  */
 CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
-                                         set<SimpleLock*>& rdlocks, set<SimpleLock*>& wrlocks, set<SimpleLock*>& xlocks,
+                                         MutationImpl::LockOpVec& lov,
                                          bool okexist, bool mustexist, bool alwaysxlock,
                                          file_layout_t **layout)
 {
@@ -3083,17 +3089,17 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
   // -- lock --
   // NOTE: rename takes the same set of locks for srcdn
   for (int i=0; i<(int)mdr->dn[n].size(); i++) 
-    rdlocks.insert(&mdr->dn[n][i]->lock);
+    lov.add_rdlock(&mdr->dn[n][i]->lock);
   if (alwaysxlock || dnl->is_null())
-    xlocks.insert(&dn->lock);                 // new dn, xlock
+    lov.add_xlock(&dn->lock);                 // new dn, xlock
   else
-    rdlocks.insert(&dn->lock);  // existing dn, rdlock
-  wrlocks.insert(&dn->get_dir()->inode->filelock); // also, wrlock on dir mtime
-  wrlocks.insert(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
+    lov.add_rdlock(&dn->lock);  // existing dn, rdlock
+  lov.add_wrlock(&dn->get_dir()->inode->filelock); // also, wrlock on dir mtime
+  lov.add_wrlock(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
   if (layout)
-    mds->locker->include_snap_rdlocks_wlayout(rdlocks, dn->get_dir()->inode, layout);
+    mds->locker->include_snap_rdlocks_wlayout(dn->get_dir()->inode, lov, layout);
   else
-    mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode);
+    mds->locker->include_snap_rdlocks(dn->get_dir()->inode, lov);
 
   return dn;
 }
@@ -3153,7 +3159,6 @@ CDir* Server::try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr)
 void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
 
   if (req->get_filepath().depth() == 0 && is_lookup) {
     // refpath can't be empty for lookup but it can for
@@ -3167,7 +3172,8 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
   if (mask & CEPH_STAT_RSTAT)
     want_auth = true; // set want_auth for CEPH_STAT_RSTAT mask
 
-  CInode *ref = rdlock_path_pin_ref(mdr, 0, rdlocks, want_auth, false, NULL, 
+  MutationImpl::LockOpVec lov;
+  CInode *ref = rdlock_path_pin_ref(mdr, 0, lov, want_auth, false, NULL,
                                    !is_lookup);
   if (!ref) return;
 
@@ -3187,28 +3193,28 @@ void Server::handle_client_getattr(MDRequestRef& mdr, bool is_lookup)
     issued = cap->issued();
 
   if ((mask & CEPH_CAP_LINK_SHARED) && !(issued & CEPH_CAP_LINK_EXCL))
-    rdlocks.insert(&ref->linklock);
+    lov.add_rdlock(&ref->linklock);
   if ((mask & CEPH_CAP_AUTH_SHARED) && !(issued & CEPH_CAP_AUTH_EXCL))
-    rdlocks.insert(&ref->authlock);
+    lov.add_rdlock(&ref->authlock);
   if ((mask & CEPH_CAP_XATTR_SHARED) && !(issued & CEPH_CAP_XATTR_EXCL))
-    rdlocks.insert(&ref->xattrlock);
+    lov.add_rdlock(&ref->xattrlock);
   if ((mask & CEPH_CAP_FILE_SHARED) && !(issued & CEPH_CAP_FILE_EXCL)) {
     // Don't wait on unstable filelock if client is allowed to read file size.
     // This can reduce the response time of getattr in the case that multiple
     // clients do stat(2) and there are writers.
     // The downside of this optimization is that mds may not issue Fs caps along
     // with getattr reply. Client may need to send more getattr requests.
-    if (mdr->rdlocks.count(&ref->filelock)) {
-      rdlocks.insert(&ref->filelock);
+    if (mdr->is_rdlocked(&ref->filelock)) {
+      lov.add_rdlock(&ref->filelock);
     } else if (ref->filelock.is_stable() ||
               ref->filelock.get_num_wrlocks() > 0 ||
               !ref->filelock.can_read(mdr->get_client())) {
-      rdlocks.insert(&ref->filelock);
+      lov.add_rdlock(&ref->filelock);
       mdr->done_locking = false;
     }
   }
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, ref, MAY_READ))
@@ -3275,10 +3281,10 @@ void Server::handle_client_lookup_ino(MDRequestRef& mdr,
   CDentry *dn = in->get_projected_parent_dn();
   CInode *diri = dn ? dn->get_dir()->inode : NULL;
 
-  set<SimpleLock*> rdlocks;
+  MutationImpl::LockOpVec lov;
   if (dn && (want_parent || want_dentry)) {
     mdr->pin(dn);
-    rdlocks.insert(&dn->lock);
+    lov.add_rdlock(&dn->lock);
   }
 
   unsigned mask = req->head.args.lookupino.mask;
@@ -3289,16 +3295,15 @@ void Server::handle_client_lookup_ino(MDRequestRef& mdr,
       issued = cap->issued();
     // permission bits, ACL/security xattrs
     if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0)
-      rdlocks.insert(&in->authlock);
+      lov.add_rdlock(&in->authlock);
     if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0)
-      rdlocks.insert(&in->xattrlock);
+      lov.add_rdlock(&in->xattrlock);
 
     mdr->getattr_caps = mask;
   }
 
-  if (!rdlocks.empty()) {
-    set<SimpleLock*> wrlocks, xlocks;
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!lov.empty()) {
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     if (diri != NULL) {
@@ -3386,9 +3391,9 @@ void Server::_lookup_snap_ino(MDRequestRef& mdr)
       return;
     }
 
-    set<SimpleLock*> rdlocks, wrlocks, xlocks;
-    rdlocks.insert(&diri->dirfragtreelock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    MutationImpl::LockOpVec lov;
+    lov.add_rdlock(&diri->dirfragtreelock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     frag_t frag = diri->dirfragtree[hash];
@@ -3457,15 +3462,15 @@ void Server::handle_client_open(MDRequestRef& mdr)
     return;
   }
   
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, need_auth);
+  MutationImpl::LockOpVec lov;
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, need_auth);
   if (!cur)
     return;
 
   if (cur->is_frozen() || cur->state_test(CInode::STATE_EXPORTINGCAPS)) {
     ceph_assert(!need_auth);
     mdr->done_locking = false;
-    CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+    CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
     if (!cur)
       return;
   }
@@ -3525,9 +3530,9 @@ void Server::handle_client_open(MDRequestRef& mdr)
       issued = cap->issued();
     // permission bits, ACL/security xattrs
     if ((mask & CEPH_CAP_AUTH_SHARED) && (issued & CEPH_CAP_AUTH_EXCL) == 0)
-      rdlocks.insert(&cur->authlock);
+      lov.add_rdlock(&cur->authlock);
     if ((mask & CEPH_CAP_XATTR_SHARED) && (issued & CEPH_CAP_XATTR_EXCL) == 0)
-      rdlocks.insert(&cur->xattrlock);
+      lov.add_rdlock(&cur->xattrlock);
 
     mdr->getattr_caps = mask;
   }
@@ -3536,8 +3541,8 @@ void Server::handle_client_open(MDRequestRef& mdr)
   if ((flags & CEPH_O_TRUNC) && !mdr->has_completed) {
     ceph_assert(cur->is_auth());
 
-    xlocks.insert(&cur->filelock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    lov.add_xlock(&cur->filelock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     if (!check_access(mdr, cur, MAY_WRITE))
@@ -3562,10 +3567,10 @@ void Server::handle_client_open(MDRequestRef& mdr)
   //  this makes us wait for writers to flushsnaps, ensuring we get accurate metadata,
   //  and that data itself is flushed so that we can read the snapped data off disk.
   if (mdr->snapid != CEPH_NOSNAP && !cur->is_dir()) {
-    rdlocks.insert(&cur->filelock);
+    lov.add_rdlock(&cur->filelock);
   }
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   mask = MAY_READ;
@@ -3696,9 +3701,9 @@ void Server::handle_client_openc(MDRequestRef& mdr)
     }
   }
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  file_layout_t *dir_layout = NULL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks,
+  MutationImpl::LockOpVec lov;
+  file_layout_t *dir_layout = nullptr;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov,
                                          !excl, false, false, &dir_layout);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -3756,8 +3761,8 @@ void Server::handle_client_openc(MDRequestRef& mdr)
   // created null dn.
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
-  rdlocks.insert(&diri->authlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->authlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, access))
@@ -3844,8 +3849,8 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   client_t client = req->get_source().num();
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *diri = rdlock_path_pin_ref(mdr, 0, rdlocks, false, true);
+  MutationImpl::LockOpVec lov;
+  CInode *diri = rdlock_path_pin_ref(mdr, 0, lov, false, true);
   if (!diri) return;
 
   // it's a directory, right?
@@ -3856,10 +3861,10 @@ void Server::handle_client_readdir(MDRequestRef& mdr)
     return;
   }
 
-  rdlocks.insert(&diri->filelock);
-  rdlocks.insert(&diri->dirfragtreelock);
+  lov.add_rdlock(&diri->filelock);
+  lov.add_rdlock(&diri->dirfragtreelock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_READ))
@@ -4135,18 +4140,18 @@ public:
 void Server::handle_client_file_setlock(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
   // get the inode to operate on, and set up any locks needed for that
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur)
     return;
 
-  xlocks.insert(&cur->flocklock);
+  lov.add_xlock(&cur->flocklock);
   /* acquire_locks will return true if it gets the locks. If it fails,
      it will redeliver this request at a later date, so drop the request.
    */
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
+  if (!mds->locker->acquire_locks(mdr, lov)) {
     dout(10) << "handle_client_file_setlock could not get locks!" << dendl;
     return;
   }
@@ -4238,18 +4243,18 @@ void Server::handle_client_file_setlock(MDRequestRef& mdr)
 void Server::handle_client_file_readlock(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
   // get the inode to operate on, and set up any locks needed for that
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur)
     return;
 
   /* acquire_locks will return true if it gets the locks. If it fails,
      it will redeliver this request at a later date, so drop the request.
   */
-  rdlocks.insert(&cur->flocklock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) {
+  lov.add_rdlock(&cur->flocklock);
+  if (!mds->locker->acquire_locks(mdr, lov)) {
     dout(10) << "handle_client_file_readlock could not get locks!" << dendl;
     return;
   }
@@ -4291,8 +4296,8 @@ void Server::handle_client_file_readlock(MDRequestRef& mdr)
 void Server::handle_client_setattr(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  MutationImpl::LockOpVec lov;
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur) return;
 
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -4309,13 +4314,13 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
 
   // xlock inode
   if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID|CEPH_SETATTR_BTIME|CEPH_SETATTR_KILL_SGUID))
-    xlocks.insert(&cur->authlock);
+    lov.add_xlock(&cur->authlock);
   if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
-    xlocks.insert(&cur->filelock);
+    lov.add_xlock(&cur->filelock);
   if (mask & CEPH_SETATTR_CTIME)
-    wrlocks.insert(&cur->versionlock);
+    lov.add_wrlock(&cur->versionlock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if ((mask & CEPH_SETATTR_UID) && (cur->inode.uid != req->head.args.setattr.uid))
@@ -4416,7 +4421,7 @@ void Server::handle_client_setattr(MDRequestRef& mdr)
                                                                   truncating_smaller, changed_ranges));
 
   // flush immediately if there are readers/writers waiting
-  if (xlocks.count(&cur->filelock) &&
+  if (mdr->is_xlocked(&cur->filelock) &&
       (cur->get_caps_wanted() & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
     mds->mdlog->flush();
 }
@@ -4486,8 +4491,8 @@ void Server::do_open_truncate(MDRequestRef& mdr, int cmode)
 void Server::handle_client_setlayout(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+  MutationImpl::LockOpVec lov;
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur) return;
 
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -4543,8 +4548,8 @@ void Server::handle_client_setlayout(MDRequestRef& mdr)
     return;
   }
 
-  xlocks.insert(&cur->filelock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_xlock(&cur->filelock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, cur, access))
@@ -4573,9 +4578,9 @@ void Server::handle_client_setlayout(MDRequestRef& mdr)
 void Server::handle_client_setdirlayout(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  file_layout_t *dir_layout = NULL;
-  CInode *cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+  MutationImpl::LockOpVec lov;
+  file_layout_t *dir_layout = nullptr;
+  CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
   if (!cur) return;
 
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -4588,8 +4593,8 @@ void Server::handle_client_setdirlayout(MDRequestRef& mdr)
     return;
   }
 
-  xlocks.insert(&cur->policylock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_xlock(&cur->policylock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   // validate layout
@@ -4857,9 +4862,7 @@ int Server::check_layout_vxattr(MDRequestRef& mdr,
 
 void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
                               file_layout_t *dir_layout,
-                              set<SimpleLock*> rdlocks,
-                              set<SimpleLock*> wrlocks,
-                              set<SimpleLock*> xlocks)
+                              MutationImpl::LockOpVec& lov)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   string name(req->get_path2());
@@ -4896,8 +4899,8 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
       return;
 
-    xlocks.insert(&cur->policylock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    lov.add_xlock(&cur->policylock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     auto &pi = cur->project_inode();
@@ -4919,8 +4922,8 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
     if (check_layout_vxattr(mdr, rest, value, &layout) < 0)
       return;
 
-    xlocks.insert(&cur->filelock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    lov.add_xlock(&cur->filelock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     auto &pi = cur->project_inode();
@@ -4943,13 +4946,13 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
       return;
     }
 
-    xlocks.insert(&cur->policylock);
+    lov.add_xlock(&cur->policylock);
     if (quota.is_enable() && !cur->get_projected_srnode()) {
-      xlocks.insert(&cur->snaplock);
+      lov.add_xlock(&cur->snaplock);
       new_realm = true;
     }
 
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     auto &pi = cur->project_inode(false, new_realm);
@@ -4983,8 +4986,8 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
       return;
     }
 
-    xlocks.insert(&cur->policylock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    lov.add_xlock(&cur->policylock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     auto &pi = cur->project_inode();
@@ -5017,9 +5020,7 @@ void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
 
 void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
                                  file_layout_t *dir_layout,
-                                 set<SimpleLock*> rdlocks,
-                                 set<SimpleLock*> wrlocks,
-                                 set<SimpleLock*> xlocks)
+                                 MutationImpl::LockOpVec& lov)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   string name(req->get_path2());
@@ -5042,8 +5043,8 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
       return;
     }
 
-    xlocks.insert(&cur->policylock);
-    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    lov.add_xlock(&cur->policylock);
+    if (!mds->locker->acquire_locks(mdr, lov))
       return;
 
     auto &pi = cur->project_inode();
@@ -5067,7 +5068,7 @@ void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
     // null/none value (empty string, means default layout).  Is equivalent
     // to a setxattr with empty string: pass through the empty payload of
     // the rmxattr request to do this.
-    handle_set_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+    handle_set_vxattr(mdr, cur, dir_layout, lov);
     return;
   }
 
@@ -5098,14 +5099,14 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   string name(req->get_path2());
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
   CInode *cur;
 
   file_layout_t *dir_layout = NULL;
   if (name.compare(0, 15, "ceph.dir.layout") == 0)
-    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+    cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
   else
-    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+    cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur)
     return;
 
@@ -5118,12 +5119,12 @@ void Server::handle_client_setxattr(MDRequestRef& mdr)
 
   // magic ceph.* namespace?
   if (name.compare(0, 5, "ceph.") == 0) {
-    handle_set_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+    handle_set_vxattr(mdr, cur, dir_layout, lov);
     return;
   }
 
-  xlocks.insert(&cur->xattrlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_xlock(&cur->xattrlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, cur, MAY_WRITE))
@@ -5195,13 +5196,14 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   std::string name(req->get_path2());
-  std::set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  file_layout_t *dir_layout = NULL;
+
+  MutationImpl::LockOpVec lov;
+  file_layout_t *dir_layout = nullptr;
   CInode *cur;
   if (name == "ceph.dir.layout")
-    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true, false, &dir_layout);
+    cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
   else
-    cur = rdlock_path_pin_ref(mdr, 0, rdlocks, true);
+    cur = rdlock_path_pin_ref(mdr, 0, lov, true);
   if (!cur)
     return;
 
@@ -5211,12 +5213,12 @@ void Server::handle_client_removexattr(MDRequestRef& mdr)
   }
 
   if (name.compare(0, 5, "ceph.") == 0) {
-    handle_remove_vxattr(mdr, cur, dir_layout, rdlocks, wrlocks, xlocks);
+    handle_remove_vxattr(mdr, cur, dir_layout, lov);
     return;
   }
 
-  xlocks.insert(&cur->xattrlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_xlock(&cur->xattrlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   auto pxattrs = cur->get_projected_xattrs();
@@ -5306,9 +5308,9 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
   client_t client = mdr->get_client();
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  file_layout_t *dir_layout = NULL;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false,
+  MutationImpl::LockOpVec lov;
+  file_layout_t *dir_layout = nullptr;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false,
                                         &dir_layout);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -5316,8 +5318,8 @@ void Server::handle_client_mknod(MDRequestRef& mdr)
     return;
   }
   CInode *diri = dn->get_dir()->get_inode();
-  rdlocks.insert(&diri->authlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->authlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_WRITE))
@@ -5405,8 +5407,8 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
     return;
   }
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+  MutationImpl::LockOpVec lov;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -5414,8 +5416,8 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
   }
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
-  rdlocks.insert(&diri->authlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->authlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   // mkdir check access
@@ -5487,8 +5489,8 @@ void Server::handle_client_mkdir(MDRequestRef& mdr)
 void Server::handle_client_symlink(MDRequestRef& mdr)
 {
   const MClientRequest::const_ref &req = mdr->client_request;
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+  MutationImpl::LockOpVec lov;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
   if (!dn) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -5496,8 +5498,8 @@ void Server::handle_client_symlink(MDRequestRef& mdr)
   }
   CDir *dir = dn->get_dir();
   CInode *diri = dir->get_inode();
-  rdlocks.insert(&diri->authlock);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  lov.add_rdlock(&diri->authlock);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_WRITE))
@@ -5548,11 +5550,11 @@ void Server::handle_client_link(MDRequestRef& mdr)
          << " to " << req->get_filepath2()
          << dendl;
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, false, false, false);
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, false);
   if (!dn) return;
-  CInode *targeti = rdlock_path_pin_ref(mdr, 1, rdlocks, false);
+  CInode *targeti = rdlock_path_pin_ref(mdr, 1, lov, false);
   if (!targeti) return;
   if (mdr->snapid != CEPH_NOSNAP) {
     respond_to_request(mdr, -EROFS);
@@ -5574,11 +5576,11 @@ void Server::handle_client_link(MDRequestRef& mdr)
     }
   }
 
-  xlocks.insert(&targeti->linklock);
-  xlocks.insert(&targeti->snaplock);
-  rdlocks.erase(&targeti->snaplock);
+  lov.erase_rdlock(&targeti->snaplock);
+  lov.add_xlock(&targeti->snaplock);
+  lov.add_xlock(&targeti->linklock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if ((!mdr->has_more() || mdr->more()->witnessed.empty())) {
@@ -6274,26 +6276,26 @@ void Server::handle_client_unlink(MDRequestRef& mdr)
   }
 
   // lock
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  for (int i=0; i<(int)trace.size()-1; i++) {
-    rdlocks.insert(&trace[i]->lock);
-  }
-  xlocks.insert(&dn->lock);
-  wrlocks.insert(&diri->filelock);
-  wrlocks.insert(&diri->nestlock);
-  xlocks.insert(&in->linklock);
+  MutationImpl::LockOpVec lov;
+
+  for (int i=0; i<(int)trace.size()-1; i++)
+    lov.add_rdlock(&trace[i]->lock);
+  lov.add_xlock(&dn->lock);
+  lov.add_wrlock(&diri->filelock);
+  lov.add_wrlock(&diri->nestlock);
+  lov.add_xlock(&in->linklock);
   if (straydn) {
-    wrlocks.insert(&straydn->get_dir()->inode->filelock);
-    wrlocks.insert(&straydn->get_dir()->inode->nestlock);
-    xlocks.insert(&straydn->lock);
+    lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+    lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+    lov.add_xlock(&straydn->lock);
   }
 
-  mds->locker->include_snap_rdlocks(rdlocks, diri);
-  xlocks.insert(&in->snaplock);
+  mds->locker->include_snap_rdlocks(diri, lov);
+  lov.add_xlock(&in->snaplock);
   if (in->is_dir())
-    rdlocks.insert(&in->filelock);   // to verify it's empty
+    lov.add_rdlock(&in->filelock);   // to verify it's empty
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (in->is_dir() &&
@@ -6978,9 +6980,9 @@ void Server::handle_client_rename(MDRequestRef& mdr)
   vector<CDentry*>& srctrace = mdr->dn[1];
   vector<CDentry*>& desttrace = mdr->dn[0];
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
-  CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, rdlocks, wrlocks, xlocks, true, false, true);
+  CDentry *destdn = rdlock_path_xlock_dentry(mdr, 0, lov, true, false, true);
   if (!destdn) return;
   dout(10) << " destdn " << *destdn << dendl;
   if (mdr->snapid != CEPH_NOSNAP) {
@@ -7072,7 +7074,7 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     while (destbase != srcbase) {
       CDentry *pdn = destbase->get_projected_parent_dn();
       desttrace.insert(desttrace.begin(), pdn);
-      rdlocks.insert(&pdn->lock);
+      lov.add_rdlock(&pdn->lock);
       dout(10) << "rename prepending desttrace with " << *pdn << dendl;
       destbase = pdn->get_dir()->get_inode();
     }
@@ -7148,30 +7150,29 @@ void Server::handle_client_rename(MDRequestRef& mdr)
 
 
   // -- locks --
-  map<SimpleLock*, mds_rank_t> remote_wrlocks;
 
   // srctrace items.  this mirrors locks taken in rdlock_path_xlock_dentry
   for (int i=0; i<(int)srctrace.size(); i++) 
-    rdlocks.insert(&srctrace[i]->lock);
-  xlocks.insert(&srcdn->lock);
+    lov.add_rdlock(&srctrace[i]->lock);
+  lov.add_xlock(&srcdn->lock);
   mds_rank_t srcdirauth = srcdir->authority().first;
   if (srcdirauth != mds->get_nodeid()) {
     dout(10) << " will remote_wrlock srcdir scatterlocks on mds." << srcdirauth << dendl;
-    remote_wrlocks[&srcdir->inode->filelock] = srcdirauth;
-    remote_wrlocks[&srcdir->inode->nestlock] = srcdirauth;
+    lov.add_remote_wrlock(&srcdir->inode->filelock, srcdirauth);
+    lov.add_remote_wrlock(&srcdir->inode->nestlock, srcdirauth);
     if (srci->is_dir())
-      rdlocks.insert(&srci->dirfragtreelock);
+      lov.add_rdlock(&srci->dirfragtreelock);
   } else {
-    wrlocks.insert(&srcdir->inode->filelock);
-    wrlocks.insert(&srcdir->inode->nestlock);
+    lov.add_wrlock(&srcdir->inode->filelock);
+    lov.add_wrlock(&srcdir->inode->nestlock);
   }
-  mds->locker->include_snap_rdlocks(rdlocks, srcdir->inode);
+  mds->locker->include_snap_rdlocks(srcdir->inode, lov);
 
   // straydn?
   if (straydn) {
-    wrlocks.insert(&straydn->get_dir()->inode->filelock);
-    wrlocks.insert(&straydn->get_dir()->inode->nestlock);
-    xlocks.insert(&straydn->lock);
+    lov.add_wrlock(&straydn->get_dir()->inode->filelock);
+    lov.add_wrlock(&straydn->get_dir()->inode->nestlock);
+    lov.add_xlock(&straydn->lock);
   }
 
   // xlock versionlock on dentries if there are witnesses.
@@ -7182,36 +7183,35 @@ void Server::handle_client_rename(MDRequestRef& mdr)
     // this ensures the srcdn and destdn can be traversed to by the witnesses.
     for (int i= 0; i<(int)srctrace.size(); i++) {
       if (srctrace[i]->is_auth() && srctrace[i]->is_projected())
-         xlocks.insert(&srctrace[i]->versionlock);
+         lov.add_xlock(&srctrace[i]->versionlock);
     }
     for (int i=0; i<(int)desttrace.size(); i++) {
       if (desttrace[i]->is_auth() && desttrace[i]->is_projected())
-         xlocks.insert(&desttrace[i]->versionlock);
+         lov.add_xlock(&desttrace[i]->versionlock);
     }
     // xlock srci and oldin's primary dentries, so witnesses can call
     // open_remote_ino() with 'want_locked=true' when the srcdn or destdn
     // is traversed.
     if (srcdnl->is_remote())
-      xlocks.insert(&srci->get_projected_parent_dn()->lock);
+      lov.add_xlock(&srci->get_projected_parent_dn()->lock);
     if (destdnl->is_remote())
-      xlocks.insert(&oldin->get_projected_parent_dn()->lock);
+      lov.add_xlock(&oldin->get_projected_parent_dn()->lock);
   }
 
   // we need to update srci's ctime.  xlock its least contended lock to do that...
-  xlocks.insert(&srci->linklock);
-  xlocks.insert(&srci->snaplock);
+  lov.add_xlock(&srci->linklock);
+  lov.add_xlock(&srci->snaplock);
 
   if (oldin) {
     // xlock oldin (for nlink--)
-    xlocks.insert(&oldin->linklock);
-    xlocks.insert(&oldin->snaplock);
+    lov.add_xlock(&oldin->linklock);
+    lov.add_xlock(&oldin->snaplock);
     if (oldin->is_dir())
-       rdlocks.insert(&oldin->filelock);   // to verify it's empty
+       lov.add_rdlock(&oldin->filelock);   // to verify it's empty
   }
 
   CInode *auth_pin_freeze = !srcdn->is_auth() && srcdnl->is_primary() ? srci : NULL;
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks,
-                                 &remote_wrlocks, auth_pin_freeze))
+  if (!mds->locker->acquire_locks(mdr, lov, auth_pin_freeze))
     return;
 
   if (linkmerge)
@@ -8050,12 +8050,16 @@ void Server::_rename_apply(MDRequestRef& mdr, CDentry *srcdn, CDentry *destdn, C
       /* hack: add an auth pin for each xlock we hold. These were
        * remote xlocks previously but now they're local and
        * we're going to try and unpin when we xlock_finish. */
-      for (set<SimpleLock *>::iterator i = mdr->xlocks.begin();
-         i !=  mdr->xlocks.end();
-         ++i)
-       if ((*i)->get_parent() == destdnl->get_inode() &&
-           !(*i)->is_locallock())
-         mds->locker->xlock_import(*i);
+
+      for (auto i = mdr->locks.lower_bound(&destdnl->get_inode()->versionlock);
+          i !=  mdr->locks.end();
+          ++i) {
+       SimpleLock *lock = i->lock;
+       if (lock->get_parent() != destdnl->get_inode())
+         break;
+       if (i->is_xlock() && !lock->is_locallock())
+         mds->locker->xlock_import(lock);
+      }
       
       // hack: fix auth bit
       in->state_set(CInode::STATE_AUTH);
@@ -8466,14 +8470,17 @@ void Server::_commit_slave_rename(MDRequestRef& mdr, int r,
     if (mdr->more()->is_inode_exporter) {
       // drop our pins
       // we exported, clear out any xlocks that we moved to another MDS
-      set<SimpleLock*>::iterator i = mdr->xlocks.begin();
-      while (i != mdr->xlocks.end()) {
-        SimpleLock *lock = *i++;
 
+      for (auto i = mdr->locks.lower_bound(&in->versionlock);
+          i !=  mdr->locks.end(); ) {
+       SimpleLock *lock = i->lock;
+       if (lock->get_parent() != in)
+         break;
        // we only care about xlocks on the exported inode
-       if (lock->get_parent() == in &&
-           !lock->is_locallock())
-         mds->locker->xlock_export(lock, mdr.get());
+       if (i->is_xlock() && !lock->is_locallock())
+         mds->locker->xlock_export(i++, mdr.get());
+       else
+         ++i;
       }
 
       map<client_t,Capability::Import> peer_imported;
@@ -9093,9 +9100,9 @@ void Server::handle_client_lssnap(MDRequestRef& mdr)
   dout(10) << "lssnap on " << *diri << dendl;
 
   // lock snap
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  mds->locker->include_snap_rdlocks(rdlocks, diri);
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  MutationImpl::LockOpVec lov;
+  mds->locker->include_snap_rdlocks(diri, lov);
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_READ))
@@ -9234,13 +9241,13 @@ void Server::handle_client_mksnap(MDRequestRef& mdr)
   dout(10) << "mksnap " << snapname << " on " << *diri << dendl;
 
   // lock snap
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
-  mds->locker->include_snap_rdlocks(rdlocks, diri);
-  rdlocks.erase(&diri->snaplock);
-  xlocks.insert(&diri->snaplock);
+  mds->locker->include_snap_rdlocks(diri, lov);
+  lov.erase_rdlock(&diri->snaplock);
+  lov.add_xlock(&diri->snaplock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
@@ -9393,12 +9400,12 @@ void Server::handle_client_rmsnap(MDRequestRef& mdr)
   snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
   dout(10) << " snapname " << snapname << " is " << snapid << dendl;
 
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-  mds->locker->include_snap_rdlocks(rdlocks, diri);
-  rdlocks.erase(&diri->snaplock);
-  xlocks.insert(&diri->snaplock);
+  MutationImpl::LockOpVec lov;
+  mds->locker->include_snap_rdlocks(diri, lov);
+  lov.erase_rdlock(&diri->snaplock);
+  lov.add_xlock(&diri->snaplock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
@@ -9540,13 +9547,13 @@ void Server::handle_client_renamesnap(MDRequestRef& mdr)
   dout(10) << " snapname " << srcname << " is " << snapid << dendl;
 
   // lock snap
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  MutationImpl::LockOpVec lov;
 
-  mds->locker->include_snap_rdlocks(rdlocks, diri);
-  rdlocks.erase(&diri->snaplock);
-  xlocks.insert(&diri->snaplock);
+  mds->locker->include_snap_rdlocks(diri, lov);
+  lov.erase_rdlock(&diri->snaplock);
+  lov.add_xlock(&diri->snaplock);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+  if (!mds->locker->acquire_locks(mdr, lov))
     return;
 
   if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
index f319e3426df45c82da10fc756bd440eb7328a72d..c53f7271d185f46b45ea087015235b53e1846fd8 100644 (file)
@@ -191,16 +191,14 @@ public:
   void journal_allocated_inos(MDRequestRef& mdr, EMetaBlob *blob);
   void apply_allocated_inos(MDRequestRef& mdr, Session *session);
 
-  CInode* rdlock_path_pin_ref(MDRequestRef& mdr, int n, set<SimpleLock*>& rdlocks, bool want_auth,
-                             bool no_want_auth=false,
-                             file_layout_t **layout=NULL,
+  CInode* rdlock_path_pin_ref(MDRequestRef& mdr, int n, MutationImpl::LockOpVec& lov,
+                             bool want_auth, bool no_want_auth=false,
+                             file_layout_t **layout=nullptr,
                              bool no_lookup=false);
   CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
-                                    set<SimpleLock*>& rdlocks,
-                                    set<SimpleLock*>& wrlocks,
-                                   set<SimpleLock*>& xlocks, bool okexist,
-                                   bool mustexist, bool alwaysxlock,
-                                   file_layout_t **layout=NULL);
+                                   MutationImpl::LockOpVec& lov,
+                                   bool okexist, bool mustexist, bool alwaysxlock,
+                                   file_layout_t **layout=nullptr);
 
   CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr);
 
@@ -229,14 +227,10 @@ public:
                           file_layout_t *layout);
   void handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
                         file_layout_t *dir_layout,
-                        set<SimpleLock*> rdlocks,
-                        set<SimpleLock*> wrlocks,
-                        set<SimpleLock*> xlocks);
+                        MutationImpl::LockOpVec& lov);
   void handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
                            file_layout_t *dir_layout,
-                           set<SimpleLock*> rdlocks,
-                           set<SimpleLock*> wrlocks,
-                           set<SimpleLock*> xlocks);
+                           MutationImpl::LockOpVec& lov);
   void handle_client_setxattr(MDRequestRef& mdr);
   void handle_client_removexattr(MDRequestRef& mdr);