]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: prevent new lock cache cons when invalidating an existing one
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 5 Mar 2024 17:27:00 +0000 (12:27 -0500)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 20 Mar 2024 14:56:57 +0000 (10:56 -0400)
The previous scheme invalidated a lock cache and then immediately removed it
from its Capability list. The lock cache would eventually be deleted but a new
one could be constructed shortly after. The main reason for this is that simply
invalidating the lock cache does not drive a state change in the local locks
preventing new writers. This is mostly important for acquiring the quiescelock.

This commit also corrects a bug where a MDLockCache would be created for a
given opcode type (like create) when the capability does not have the issued
cap (CEPH_CAP_DIR_CREATE). The bug would not cause any negative side-effects
but would hold locks unnecessarily when only MDS ops (and not the client
executing ops asynchronously) are acquiring the locks.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/mds/Locker.cc
src/mds/Mutation.cc
src/mds/Mutation.h

index 6a72f2d3e4dc2731a87c27e491d233e253d186d3..d0b5ac67c685fc8ba9bf74ece95375729d94c519 100644 (file)
@@ -903,6 +903,10 @@ void Locker::put_lock_cache(MDLockCache* lock_cache)
 
   ceph_assert(lock_cache->invalidating);
 
+  /* The lock cache is only removed from the cap's list when the cap is removed
+   * or the lock cache is finally deleted.
+   */
+  lock_cache->item_cap_lock_cache.remove_myself();
   lock_cache->detach_locks();
 
   CInode *diri = lock_cache->get_dir_inode();
@@ -915,44 +919,37 @@ void Locker::put_lock_cache(MDLockCache* lock_cache)
   mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
 }
 
-int Locker::get_cap_bit_for_lock_cache(int op)
-{
-  switch(op) {
-    case CEPH_MDS_OP_CREATE:
-      return CEPH_CAP_DIR_CREATE;
-    case CEPH_MDS_OP_UNLINK:
-      return CEPH_CAP_DIR_UNLINK;
-    default:
-      ceph_assert(0 == "unsupported operation");
-      return 0;
-  }
-}
-
 void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
 {
   dout(15) << __func__ << ": " << *lock_cache << dendl;
 
-  ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
-  if (lock_cache->invalidating) {
-    ceph_assert(!lock_cache->client_cap);
-  } else {
+  if (!lock_cache->invalidating) {
     lock_cache->invalidating = true;
     lock_cache->detach_dirfrags();
   }
 
   Capability *cap = lock_cache->client_cap;
   if (cap) {
-    int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+    int cap_bit = lock_cache->get_cap_bit();
     cap->clear_lock_cache_allowed(cap_bit);
-    if (cap->issued() & cap_bit)
+    if (cap->issued() & cap_bit) {
       issue_caps(lock_cache->get_dir_inode(), cap);
-    else
+    } else {
       cap = nullptr;
+    }
+  } else {
+    /* cap is removed but the lock_cache may not yet be 0 ref */
+    lock_cache->item_cap_lock_cache.remove_myself();
   }
-
   if (!cap) {
-    lock_cache->item_cap_lock_cache.remove_myself();
-    put_lock_cache(lock_cache);
+    /* the cap is removed or we lost relevant op rights */
+    if (lock_cache->cap_ref) {
+      put_lock_cache(lock_cache);
+      lock_cache->cap_ref = false;
+    }
+    /* N.B.: the lock cache may still be associated with the cap even when
+     * invalidated so a new lock cache is not created.
+     */
   }
 }
 
@@ -961,12 +958,10 @@ void Locker::eval_lock_caches(Capability *cap)
   for (auto p = cap->lock_caches.begin(); !p.end(); ) {
     MDLockCache *lock_cache = *p;
     ++p;
-    if (!lock_cache->invalidating)
-      continue;
-    int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+    int cap_bit = lock_cache->get_cap_bit();
     if (!(cap->issued() & cap_bit)) {
-      lock_cache->item_cap_lock_cache.remove_myself();
-      put_lock_cache(lock_cache);
+      dout(20) << __func__ << ": lost " << ccap_string(cap_bit) << " on " << *lock_cache << dendl;
+      invalidate_lock_cache(lock_cache);
     }
   }
 }
@@ -999,7 +994,7 @@ void Locker::create_lock_cache(const MDRequestRef& mdr, CInode *diri, file_layou
 
   client_t client = mdr->get_client();
   int opcode = mdr->client_request->get_op();
-  dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
+  dout(10) << __func__ << ": for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
 
   if (!diri->is_auth()) {
     dout(10) << " dir inode is not auth, noop" << dendl;
@@ -1024,6 +1019,12 @@ void Locker::create_lock_cache(const MDRequestRef& mdr, CInode *diri, file_layou
     }
   }
 
+  int cap_bit = MDLockCache::get_cap_bit_for_lock_cache(opcode);
+  if (!(cap->issued() & cap_bit)) {
+    dout(10) << " client cap lacks rights for lock cache: " << ccap_string(cap_bit) << dendl;
+    return;
+  }
+
   set<MDSCacheObject*> ancestors;
   for (CInode *in = diri; ; ) {
     CDentry *pdn = in->get_projected_parent_dn();
@@ -1081,7 +1082,7 @@ void Locker::create_lock_cache(const MDRequestRef& mdr, CInode *diri, file_layou
   auto lock_cache = new MDLockCache(cap, opcode);
   if (dir_layout)
     lock_cache->set_dir_layout(*dir_layout);
-  cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
+  cap->set_lock_cache_allowed(lock_cache->get_cap_bit());
 
   for (auto dir : dfv) {
     // prevent subtree migration
@@ -1135,8 +1136,10 @@ void Locker::create_lock_cache(const MDRequestRef& mdr, CInode *diri, file_layou
   }
   lock_cache->attach_locks();
 
-  lock_cache->ref++;
+  dout(20) << __func__ << ": created " << *lock_cache << dendl;
+
   mdr->lock_cache = lock_cache;
+  lock_cache->ref++; /* for mdr */
 }
 
 bool Locker::find_and_attach_lock_cache(const MDRequestRef& mdr, CInode *diri)
@@ -1151,10 +1154,10 @@ bool Locker::find_and_attach_lock_cache(const MDRequestRef& mdr, CInode *diri)
   int opcode = mdr->client_request->get_op();
   for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
     MDLockCache *lock_cache = *p;
-    if (lock_cache->opcode == opcode) {
-      dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
+    if (lock_cache->opcode == opcode && lock_cache->attachable()) {
+      dout(10) << "found lock cache " << *lock_cache << " on " << *diri << dendl;
       mdr->lock_cache = lock_cache;
-      mdr->lock_cache->ref++;
+      mdr->lock_cache->ref++; /* for mdr */
       return true;
     }
   }
@@ -2454,6 +2457,12 @@ int Locker::get_allowed_caps(CInode *in, Capability *cap,
 
 int Locker::issue_caps(CInode *in, Capability *only_cap)
 {
+  dout(20) << __func__ << ": " << *in;
+  if (only_cap) {
+    *_dout << " for " << only_cap->get_client();
+  }
+  *_dout << dendl;
+
   // count conflicts with
   int nissued = 0;
   int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;
index 6ff87183914b89ff8eb49d77adf5b4392af04f08..e94a988703f05d2c3825511d4a631f50d549924c 100644 (file)
@@ -639,3 +639,16 @@ void MDLockCache::print(std::ostream& out) const {
   }
   out << ")";
 }
+
+int MDLockCache::get_cap_bit_for_lock_cache(int opcode)
+{
+  switch(opcode) {
+    case CEPH_MDS_OP_CREATE:
+      return CEPH_CAP_DIR_CREATE;
+    case CEPH_MDS_OP_UNLINK:
+      return CEPH_CAP_DIR_UNLINK;
+    default:
+      ceph_abort("unsupported opcode");
+      return 0;
+  }
+}
index 7f985651e37d9a1c09610f3c9f815881bd1bfb56..d7d74de82dfd263fd603f83be501569da7a21ae5 100644 (file)
@@ -543,10 +543,20 @@ struct MDLockCache : public MutationImpl {
   void attach_dirfrags(std::vector<CDir*>&& dfv);
   void detach_locks();
   void detach_dirfrags();
+  /* Is the lock cache still attached to a capability and does that capability
+   * still have issued the rights (create/unlink) associated with the cap?
+   */
+  bool attachable() const {
+    return client_cap && cap_ref;
+  }
+  static int get_cap_bit_for_lock_cache(int opcode);
+  int get_cap_bit() const {
+    return get_cap_bit_for_lock_cache(opcode);
+  }
 
   CInode *diri;
   Capability *client_cap;
-  int opcode;
+  const int opcode;
   file_layout_t dir_layout;
 
   elist<MDLockCache*>::item item_cap_lock_cache;
@@ -560,6 +570,7 @@ struct MDLockCache : public MutationImpl {
 
   int ref = 1;
   bool invalidating = false;
+  bool cap_ref = true; /* does the cap still have issued&cap_bit ? */
 };
 
 typedef boost::intrusive_ptr<MutationImpl> MutationRef;