]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: acquire_locks in callers, not rdlock_path_* helpers
authorSage Weil <sage@newdream.net>
Sun, 12 Apr 2009 21:12:13 +0000 (14:12 -0700)
committerSage Weil <sage@newdream.net>
Mon, 13 Apr 2009 17:24:42 +0000 (10:24 -0700)
This way we only call acquire_locks from one place, and avoid weird lock
reordering issues.

src/mds/Server.cc
src/mds/Server.h

index 44c3b0092361cccb206f47471e2110194b328bc6..46e4ed5d759e65ffcad166ecba59b1bbe5ef33dc 100644 (file)
@@ -1472,6 +1472,7 @@ CDir *Server::traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, file
 
 
 CInode* Server::rdlock_path_pin_ref(MDRequest *mdr,
+                                   set<SimpleLock*> &rdlocks,
                                    bool want_auth, bool rdlock_dft)
 {
   dout(10) << "rdlock_path_pin_ref " << *mdr << dendl;
@@ -1526,23 +1527,17 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr,
     mdr->auth_pin(ref);
   }
 
-  // lock the path
-  set<SimpleLock*> rdlocks, empty;
-
   for (int i=0; i<(int)trace.size(); i++) 
     rdlocks.insert(&trace[i]->lock);
   if (rdlock_dft)
     rdlocks.insert(&ref->dirfragtreelock);
   mds->locker->include_snap_rdlocks(rdlocks, ref);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, empty, empty))
-    return 0;
-  
   // set and pin ref
   mdr->pin(ref);
   mdr->ref = ref;
   mdr->trace.swap(trace);
-  
+
   return ref;
 }
 
@@ -1553,7 +1548,9 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr,
  * create null dentry in place (or use existing if okexist).
  * get rdlocks on traversed dentries, xlock on new dentry.
  */
-CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist)
+CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr,
+                                         set<SimpleLock*>& rdlocks, set<SimpleLock*>& wrlocks, set<SimpleLock*>& xlocks,
+                                         bool okexist, bool mustexist)
 {
   MClientRequest *req = mdr->client_request;
 
@@ -1619,8 +1616,6 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus
   }
 
   // -- lock --
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-
   for (int i=0; i<(int)trace.size(); i++) 
     rdlocks.insert(&trace[i]->lock);
   if (dn->get_linkage(client, mdr)->is_null())
@@ -1631,9 +1626,6 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus
   wrlocks.insert(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
   mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode);
 
-  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
-    return 0;
-
   // save the locked trace.
   mdr->trace.swap(trace);
 
@@ -1694,17 +1686,10 @@ CDir* Server::try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr)
 void Server::handle_client_stat(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  CInode *ref = rdlock_path_pin_ref(mdr, false);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *ref = rdlock_path_pin_ref(mdr, rdlocks, false);
   if (!ref) return;
 
-  // which inode locks do I want?
-  /* note: this works because we include existing locks in our lists,
-     and because all new locks are on inodes and sort to the right of
-     the dentry rdlocks previous acquired by rdlock_path_pin_ref(). */
-  set<SimpleLock*> rdlocks = mdr->rdlocks;
-  set<SimpleLock*> wrlocks = mdr->wrlocks;
-  set<SimpleLock*> xlocks = mdr->xlocks;
-
   /*
    * if client currently holds the EXCL cap on a field, do not rdlock
    * it; client's stat() will result in valid info if _either_ EXCL
@@ -1818,7 +1803,8 @@ public:
 void Server::handle_client_setattr(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  CInode *cur = rdlock_path_pin_ref(mdr, true);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true);
   if (!cur) return;
 
   if (mdr->ref_snapid != CEPH_NOSNAP) {
@@ -1833,16 +1819,11 @@ void Server::handle_client_setattr(MDRequest *mdr)
   __u32 mask = req->head.args.setattr.mask;
 
   // xlock inode
-  set<SimpleLock*> rdlocks = mdr->rdlocks;
-  set<SimpleLock*> wrlocks = mdr->wrlocks;
-  set<SimpleLock*> xlocks = mdr->xlocks;
-
   if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID))
     xlocks.insert(&cur->authlock);
   if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE))
     xlocks.insert(&cur->filelock);
 
-  mds->locker->include_snap_rdlocks(rdlocks, cur);
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
@@ -1910,7 +1891,8 @@ void Server::handle_client_setattr(MDRequest *mdr)
 void Server::handle_client_setlayout(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  CInode *cur = rdlock_path_pin_ref(mdr, true);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true);
   if (!cur) return;
 
   if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) {
@@ -1928,14 +1910,7 @@ void Server::handle_client_setlayout(MDRequest *mdr)
     return;
   }
 
-
-  // write
-  set<SimpleLock*> rdlocks = mdr->rdlocks;
-  set<SimpleLock*> wrlocks = mdr->wrlocks;
-  set<SimpleLock*> xlocks = mdr->xlocks;
   xlocks.insert(&cur->filelock);
-  mds->locker->include_snap_rdlocks(rdlocks, cur);
-
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
@@ -1983,7 +1958,8 @@ public:
 void Server::handle_client_setxattr(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  CInode *cur = rdlock_path_pin_ref(mdr, true);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true);
   if (!cur) return;
 
   if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) {
@@ -1991,13 +1967,7 @@ void Server::handle_client_setxattr(MDRequest *mdr)
     return;
   }
 
-  // write
-  set<SimpleLock*> rdlocks = mdr->rdlocks;
-  set<SimpleLock*> wrlocks = mdr->wrlocks;
-  set<SimpleLock*> xlocks = mdr->xlocks;
   xlocks.insert(&cur->xattrlock);
-  mds->locker->include_snap_rdlocks(rdlocks, cur);
-
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
@@ -2043,7 +2013,8 @@ void Server::handle_client_setxattr(MDRequest *mdr)
 void Server::handle_client_removexattr(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  CInode *cur = rdlock_path_pin_ref(mdr, true);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true);
   if (!cur) return;
 
   if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) {
@@ -2051,13 +2022,7 @@ void Server::handle_client_removexattr(MDRequest *mdr)
     return;
   }
 
-  // write
-  set<SimpleLock*> rdlocks = mdr->rdlocks;
-  set<SimpleLock*> wrlocks = mdr->wrlocks;
-  set<SimpleLock*> xlocks = mdr->xlocks;
   xlocks.insert(&cur->xattrlock);
-  mds->locker->include_snap_rdlocks(rdlocks, cur);
-
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
 
@@ -2099,7 +2064,8 @@ void Server::handle_client_readdir(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
   int client = req->get_orig_source().num();
-  CInode *diri = rdlock_path_pin_ref(mdr, false, true);  // rdlock dirfragtreelock!
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *diri = rdlock_path_pin_ref(mdr, rdlocks, false, true);  // rdlock dirfragtreelock!
   if (!diri) return;
 
   // it's a directory, right?
@@ -2110,6 +2076,9 @@ void Server::handle_client_readdir(MDRequest *mdr)
     return;
   }
 
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
+
   // which frag?
   frag_t fg = (__u32)req->head.args.readdir.frag;
 
@@ -2309,9 +2278,11 @@ public:
 void Server::handle_client_mknod(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false);
   if (!dn) return;
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
 
   snapid_t follows = dn->get_dir()->inode->find_snaprealm()->get_newest_seq();
   mdr->now = g_clock.real_now();
@@ -2352,9 +2323,11 @@ void Server::handle_client_mknod(MDRequest *mdr)
 void Server::handle_client_mkdir(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false);
   if (!dn) return;
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
 
   // new inode
   SnapRealm *realm = dn->get_dir()->inode->find_snaprealm();
@@ -2414,9 +2387,11 @@ void Server::handle_client_mkdir(MDRequest *mdr)
 void Server::handle_client_symlink(MDRequest *mdr)
 {
   MClientRequest *req = mdr->client_request;
-  
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false);
   if (!dn) return;
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
 
   mdr->now = g_clock.real_now();
   snapid_t follows = dn->get_dir()->inode->find_snaprealm()->get_newest_seq();
@@ -2463,13 +2438,11 @@ void Server::handle_client_link(MDRequest *mdr)
          << " to " << req->get_filepath2()
          << dendl;
 
-  // traverse to dest dir, make sure it's ours.
-  const filepath &linkpath = req->get_filepath();
-  const string &dname = linkpath.last_dentry();
-  vector<CDentry*> linktrace;
-  CDir *dir = traverse_to_auth_dir(mdr, linktrace, linkpath);
-  if (!dir) return;
-  dout(7) << "handle_client_link link " << dname << " in " << *dir << dendl;
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false);
+  if (!dn) return;
+  CDir *dir = dn->get_dir();
+  dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl;
   
   // traverse to link target
   filepath targetpath = req->get_filepath2();
@@ -2490,23 +2463,11 @@ void Server::handle_client_link(MDRequest *mdr)
     return;
   }
   
-  // get/make null link dentry
-  CDentry *dn = prepare_null_dentry(mdr, dir, dname, false);
-  if (!dn) return;
-
-  // create lock lists
-  set<SimpleLock*> rdlocks, wrlocks, xlocks;
-
-  for (int i=0; i<(int)linktrace.size(); i++)
-    rdlocks.insert(&linktrace[i]->lock);
-  xlocks.insert(&dn->lock);
-  wrlocks.insert(&dn->get_dir()->inode->filelock);
-  wrlocks.insert(&dn->get_dir()->inode->nestlock);
+  // add locks for source inode
   for (int i=0; i<(int)targettrace.size(); i++)
     rdlocks.insert(&targettrace[i]->lock);
   xlocks.insert(&targeti->linklock);
   mds->locker->include_snap_rdlocks(rdlocks, targeti);
-  mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode);
 
   if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
     return;
@@ -4735,8 +4696,12 @@ void Server::handle_client_open(MDRequest *mdr)
 
   dout(7) << "open on " << req->get_filepath() << dendl;
   
-  CInode *cur = rdlock_path_pin_ref(mdr, need_auth);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, need_auth);
   if (!cur) return;
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
+
 
   // can only open a dir with mode FILE_MODE_PIN, at least for now.
   if (cur->inode.is_dir()) cmode = CEPH_FILE_MODE_PIN;
@@ -4772,10 +4737,6 @@ void Server::handle_client_open(MDRequest *mdr)
        mdr->session->have_completed_request(req->get_reqid().tid))) {
     assert(cur->is_auth());
 
-    // xlock file size
-    set<SimpleLock*> rdlocks = mdr->rdlocks;
-    set<SimpleLock*> wrlocks = mdr->wrlocks;
-    set<SimpleLock*> xlocks = mdr->xlocks;
     wrlocks.insert(&cur->filelock);
     if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
       return;
@@ -4793,7 +4754,11 @@ void Server::handle_client_open(MDRequest *mdr)
       handle_client_opent(mdr, cmode);
       return;
     }
-  } 
+  } else {
+    if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+      return;
+  }
+
 
   // sync filelock if snapped.
   //  this makes us wait for writers to flushsnaps, ensuring we get accurate metadata,
@@ -4947,8 +4912,12 @@ void Server::handle_client_openc(MDRequest *mdr)
   dout(7) << "open w/ O_CREAT on " << req->get_filepath() << dendl;
   
   bool excl = (req->head.args.open.flags & O_EXCL);
-  CDentry *dn = rdlock_path_xlock_dentry(mdr, !excl, false);
+  set<SimpleLock*> rdlocks, wrlocks, xlocks;
+  CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, !excl, false);
   if (!dn) return;
+  if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+    return;
+
   CDentry::linkage_t *dnl = dn->get_projected_linkage();
 
   if (!dnl->is_null()) {
index 2b37630d2f421d6151dc6f02f7a01c5d48216d34..b278ce8fabe7206b830a0440cf7db5542a98f128 100644 (file)
@@ -109,8 +109,9 @@ public:
   void journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob);
   void apply_allocated_inos(MDRequest *mdr);
 
-  CInode* rdlock_path_pin_ref(MDRequest *mdr, bool want_auth, bool rdlock_dft=false);
-  CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist);
+  CInode* rdlock_path_pin_ref(MDRequest *mdr, set<SimpleLock*>& rdlocks, bool want_auth, bool rdlock_dft=false);
+  CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, set<SimpleLock*>& rdlocks, set<SimpleLock*>& wrlocks, 
+                                   set<SimpleLock*>& xlocks, bool okexist, bool mustexist);
 
   CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr);