]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: bracket LOCK|AUTH -> PREXLOCK transition with start/finish_locking
authorSage Weil <sage@newdream.net>
Thu, 1 Sep 2011 23:32:24 +0000 (16:32 -0700)
committerSage Weil <sage@newdream.net>
Wed, 7 Sep 2011 15:44:59 +0000 (08:44 -0700)
Unlike other lock transitions, we are moving to an _unstable_ state and
then taking our (x)lock.  That means that if we don't actually finish for
some reason (lock order changes, request is canceled, whatever) we leave
things in an unstable state--in this case, PREXLOCK, where nothing else
will touch it.

Call cancel_lock in drop_locks (or in acquire_locks when the order changes)
to clean up after an aborted lock attempt.

Original but reproduced (though not easily) by multimds collection task
fsstress_thrash_subtrees.yaml.

Fixes: #1425
Signed-off-by: Sage Weil <sage@newdream.net>
src/mds/Locker.cc
src/mds/Locker.h

index 66fea23493c7e4d376a1340dbe91e8b36dbe6c1d..ca380e593d5bac2a5168feecbf91c5de21a519f9 100644 (file)
@@ -402,8 +402,11 @@ bool Locker::acquire_locks(MDRequest *mdr,
       if (need_issue)
        issue_set.insert((CInode*)stray->get_parent());
     }
-      
+
     // lock
+    if (mdr->locking && *p != mdr->locking) {
+      cancel_locking(mdr, &issue_set);
+    }
     if (xlocks.count(*p)) {
       if (!xlock_start(*p, mdr)) 
        goto out;
@@ -491,6 +494,24 @@ void Locker::_drop_non_rdlocks(Mutation *mut, set<CInode*> *pneed_issue)
   }
 }
 
+void Locker::cancel_locking(Mutation *mut, set<CInode*> *pneed_issue)
+{
+  SimpleLock *lock = mut->locking;
+  assert(lock);
+  dout(10) << "cancel_locking " << *lock << " on " << *mut << dendl;
+
+  if (lock->get_type() != CEPH_LOCK_DN) {
+    bool need_issue = false;
+    if (lock->get_state() == LOCK_PREXLOCK)
+      _finish_xlock(lock, &need_issue);
+    if (lock->is_stable())
+      eval(lock, &need_issue);
+    if (need_issue)
+      pneed_issue->insert((CInode *)lock->get_parent());
+  }
+  mut->finish_locking(lock);
+}
+
 void Locker::drop_locks(Mutation *mut, set<CInode*> *pneed_issue)
 {
   // leftover locks
@@ -498,6 +519,8 @@ void Locker::drop_locks(Mutation *mut, set<CInode*> *pneed_issue)
   if (!pneed_issue)
     pneed_issue = &my_need_issue;
 
+  if (mut->locking)
+    cancel_locking(mut, pneed_issue);
   _drop_non_rdlocks(mut, pneed_issue);
   _drop_rdlocks(mut, pneed_issue);
 
@@ -1304,6 +1327,7 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequest *mut)
        lock->get_xlock(mut, client);
        mut->xlocks.insert(lock);
        mut->locks.insert(lock);
+       mut->finish_locking(lock);
        return true;
       }
       
@@ -1311,10 +1335,12 @@ bool Locker::xlock_start(SimpleLock *lock, MDRequest *mut)
                                  lock->get_xlock_by_client() == client))
        break;
 
-      if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE)
+      if (lock->get_state() == LOCK_LOCK || lock->get_state() == LOCK_XLOCKDONE) {
+       mut->start_locking(lock);
        simple_xlock(lock);
-      else
+      } else {
        simple_lock(lock);
+      }
     }
     
     lock->add_waiter(SimpleLock::WAIT_WR|SimpleLock::WAIT_STABLE, new C_MDS_RetryRequest(mdcache, mut));
index 35de701cdef8e97dcfaa0cb5f8a7a9136b8dd87b..a1cf59e3185ddc30b83bc6bb09e9a1ff03561d94 100644 (file)
@@ -90,6 +90,7 @@ public:
                     set<SimpleLock*> &xlocks,
                     map<SimpleLock*,int> *remote_wrlocks=NULL);
 
+  void cancel_locking(Mutation *mut, set<CInode*> *pneed_issue);
   void drop_locks(Mutation *mut, set<CInode*> *pneed_issue=0);
   void set_xlocks_done(Mutation *mut);
   void drop_non_rdlocks(Mutation *mut, set<CInode*> *pneed_issue=0);