From df14710318bc4f84e3345c4edf5cbcddef68e894 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sun, 12 Apr 2009 14:12:13 -0700 Subject: [PATCH] mds: acquire_locks in callers, not rdlock_path_* helpers This way we only call acquire_locks from one place, and avoid weird lock reordering issues. --- src/mds/Server.cc | 137 ++++++++++++++++++---------------------------- src/mds/Server.h | 5 +- 2 files changed, 56 insertions(+), 86 deletions(-) diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 44c3b0092361c..46e4ed5d759e6 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -1472,6 +1472,7 @@ CDir *Server::traverse_to_auth_dir(MDRequest *mdr, vector &trace, file CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, + set &rdlocks, bool want_auth, bool rdlock_dft) { dout(10) << "rdlock_path_pin_ref " << *mdr << dendl; @@ -1526,23 +1527,17 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, mdr->auth_pin(ref); } - // lock the path - set rdlocks, empty; - for (int i=0; i<(int)trace.size(); i++) rdlocks.insert(&trace[i]->lock); if (rdlock_dft) rdlocks.insert(&ref->dirfragtreelock); mds->locker->include_snap_rdlocks(rdlocks, ref); - if (!mds->locker->acquire_locks(mdr, rdlocks, empty, empty)) - return 0; - // set and pin ref mdr->pin(ref); mdr->ref = ref; mdr->trace.swap(trace); - + return ref; } @@ -1553,7 +1548,9 @@ CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, * create null dentry in place (or use existing if okexist). * get rdlocks on traversed dentries, xlock on new dentry. */ -CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist) +CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, + set& rdlocks, set& wrlocks, set& xlocks, + bool okexist, bool mustexist) { MClientRequest *req = mdr->client_request; @@ -1619,8 +1616,6 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus } // -- lock -- - set rdlocks, wrlocks, xlocks; - for (int i=0; i<(int)trace.size(); i++) rdlocks.insert(&trace[i]->lock); if (dn->get_linkage(client, mdr)->is_null()) @@ -1631,9 +1626,6 @@ CDentry* Server::rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mus wrlocks.insert(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode); - if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) - return 0; - // save the locked trace. mdr->trace.swap(trace); @@ -1694,17 +1686,10 @@ CDir* Server::try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr) void Server::handle_client_stat(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - CInode *ref = rdlock_path_pin_ref(mdr, false); + set rdlocks, wrlocks, xlocks; + CInode *ref = rdlock_path_pin_ref(mdr, rdlocks, false); if (!ref) return; - // which inode locks do I want? - /* note: this works because we include existing locks in our lists, - and because all new locks are on inodes and sort to the right of - the dentry rdlocks previous acquired by rdlock_path_pin_ref(). */ - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; - /* * if client currently holds the EXCL cap on a field, do not rdlock * it; client's stat() will result in valid info if _either_ EXCL @@ -1818,7 +1803,8 @@ public: void Server::handle_client_setattr(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - CInode *cur = rdlock_path_pin_ref(mdr, true); + set rdlocks, wrlocks, xlocks; + CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true); if (!cur) return; if (mdr->ref_snapid != CEPH_NOSNAP) { @@ -1833,16 +1819,11 @@ void Server::handle_client_setattr(MDRequest *mdr) __u32 mask = req->head.args.setattr.mask; // xlock inode - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; - if (mask & (CEPH_SETATTR_MODE|CEPH_SETATTR_UID|CEPH_SETATTR_GID)) xlocks.insert(&cur->authlock); if (mask & (CEPH_SETATTR_MTIME|CEPH_SETATTR_ATIME|CEPH_SETATTR_SIZE)) xlocks.insert(&cur->filelock); - mds->locker->include_snap_rdlocks(rdlocks, cur); if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -1910,7 +1891,8 @@ void Server::handle_client_setattr(MDRequest *mdr) void Server::handle_client_setlayout(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - CInode *cur = rdlock_path_pin_ref(mdr, true); + set rdlocks, wrlocks, xlocks; + CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true); if (!cur) return; if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) { @@ -1928,14 +1910,7 @@ void Server::handle_client_setlayout(MDRequest *mdr) return; } - - // write - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; xlocks.insert(&cur->filelock); - mds->locker->include_snap_rdlocks(rdlocks, cur); - if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -1983,7 +1958,8 @@ public: void Server::handle_client_setxattr(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - CInode *cur = rdlock_path_pin_ref(mdr, true); + set rdlocks, wrlocks, xlocks; + CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true); if (!cur) return; if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) { @@ -1991,13 +1967,7 @@ void Server::handle_client_setxattr(MDRequest *mdr) return; } - // write - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; xlocks.insert(&cur->xattrlock); - mds->locker->include_snap_rdlocks(rdlocks, cur); - if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -2043,7 +2013,8 @@ void Server::handle_client_setxattr(MDRequest *mdr) void Server::handle_client_removexattr(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - CInode *cur = rdlock_path_pin_ref(mdr, true); + set rdlocks, wrlocks, xlocks; + CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, true); if (!cur) return; if (mdr->ref_snapid != CEPH_NOSNAP || cur->is_root()) { @@ -2051,13 +2022,7 @@ void Server::handle_client_removexattr(MDRequest *mdr) return; } - // write - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; xlocks.insert(&cur->xattrlock); - mds->locker->include_snap_rdlocks(rdlocks, cur); - if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -2099,7 +2064,8 @@ void Server::handle_client_readdir(MDRequest *mdr) { MClientRequest *req = mdr->client_request; int client = req->get_orig_source().num(); - CInode *diri = rdlock_path_pin_ref(mdr, false, true); // rdlock dirfragtreelock! + set rdlocks, wrlocks, xlocks; + CInode *diri = rdlock_path_pin_ref(mdr, rdlocks, false, true); // rdlock dirfragtreelock! if (!diri) return; // it's a directory, right? @@ -2110,6 +2076,9 @@ void Server::handle_client_readdir(MDRequest *mdr) return; } + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + // which frag? frag_t fg = (__u32)req->head.args.readdir.frag; @@ -2309,9 +2278,11 @@ public: void Server::handle_client_mknod(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - - CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false); + set rdlocks, wrlocks, xlocks; + CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false); if (!dn) return; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; snapid_t follows = dn->get_dir()->inode->find_snaprealm()->get_newest_seq(); mdr->now = g_clock.real_now(); @@ -2352,9 +2323,11 @@ void Server::handle_client_mknod(MDRequest *mdr) void Server::handle_client_mkdir(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - - CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false); + set rdlocks, wrlocks, xlocks; + CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false); if (!dn) return; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; // new inode SnapRealm *realm = dn->get_dir()->inode->find_snaprealm(); @@ -2414,9 +2387,11 @@ void Server::handle_client_mkdir(MDRequest *mdr) void Server::handle_client_symlink(MDRequest *mdr) { MClientRequest *req = mdr->client_request; - - CDentry *dn = rdlock_path_xlock_dentry(mdr, false, false); + set rdlocks, wrlocks, xlocks; + CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false); if (!dn) return; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; mdr->now = g_clock.real_now(); snapid_t follows = dn->get_dir()->inode->find_snaprealm()->get_newest_seq(); @@ -2463,13 +2438,11 @@ void Server::handle_client_link(MDRequest *mdr) << " to " << req->get_filepath2() << dendl; - // traverse to dest dir, make sure it's ours. - const filepath &linkpath = req->get_filepath(); - const string &dname = linkpath.last_dentry(); - vector linktrace; - CDir *dir = traverse_to_auth_dir(mdr, linktrace, linkpath); - if (!dir) return; - dout(7) << "handle_client_link link " << dname << " in " << *dir << dendl; + set rdlocks, wrlocks, xlocks; + CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, false, false); + if (!dn) return; + CDir *dir = dn->get_dir(); + dout(7) << "handle_client_link link " << dn->get_name() << " in " << *dir << dendl; // traverse to link target filepath targetpath = req->get_filepath2(); @@ -2490,23 +2463,11 @@ void Server::handle_client_link(MDRequest *mdr) return; } - // get/make null link dentry - CDentry *dn = prepare_null_dentry(mdr, dir, dname, false); - if (!dn) return; - - // create lock lists - set rdlocks, wrlocks, xlocks; - - for (int i=0; i<(int)linktrace.size(); i++) - rdlocks.insert(&linktrace[i]->lock); - xlocks.insert(&dn->lock); - wrlocks.insert(&dn->get_dir()->inode->filelock); - wrlocks.insert(&dn->get_dir()->inode->nestlock); + // add locks for source inode for (int i=0; i<(int)targettrace.size(); i++) rdlocks.insert(&targettrace[i]->lock); xlocks.insert(&targeti->linklock); mds->locker->include_snap_rdlocks(rdlocks, targeti); - mds->locker->include_snap_rdlocks(rdlocks, dn->get_dir()->inode); if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -4735,8 +4696,12 @@ void Server::handle_client_open(MDRequest *mdr) dout(7) << "open on " << req->get_filepath() << dendl; - CInode *cur = rdlock_path_pin_ref(mdr, need_auth); + set rdlocks, wrlocks, xlocks; + CInode *cur = rdlock_path_pin_ref(mdr, rdlocks, need_auth); if (!cur) return; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + // can only open a dir with mode FILE_MODE_PIN, at least for now. if (cur->inode.is_dir()) cmode = CEPH_FILE_MODE_PIN; @@ -4772,10 +4737,6 @@ void Server::handle_client_open(MDRequest *mdr) mdr->session->have_completed_request(req->get_reqid().tid))) { assert(cur->is_auth()); - // xlock file size - set rdlocks = mdr->rdlocks; - set wrlocks = mdr->wrlocks; - set xlocks = mdr->xlocks; wrlocks.insert(&cur->filelock); if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) return; @@ -4793,7 +4754,11 @@ void Server::handle_client_open(MDRequest *mdr) handle_client_opent(mdr, cmode); return; } - } + } else { + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + } + // sync filelock if snapped. // this makes us wait for writers to flushsnaps, ensuring we get accurate metadata, @@ -4947,8 +4912,12 @@ void Server::handle_client_openc(MDRequest *mdr) dout(7) << "open w/ O_CREAT on " << req->get_filepath() << dendl; bool excl = (req->head.args.open.flags & O_EXCL); - CDentry *dn = rdlock_path_xlock_dentry(mdr, !excl, false); + set rdlocks, wrlocks, xlocks; + CDentry *dn = rdlock_path_xlock_dentry(mdr, rdlocks, wrlocks, xlocks, !excl, false); if (!dn) return; + if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks)) + return; + CDentry::linkage_t *dnl = dn->get_projected_linkage(); if (!dnl->is_null()) { diff --git a/src/mds/Server.h b/src/mds/Server.h index 2b37630d2f421..b278ce8fabe72 100644 --- a/src/mds/Server.h +++ b/src/mds/Server.h @@ -109,8 +109,9 @@ public: void journal_allocated_inos(MDRequest *mdr, EMetaBlob *blob); void apply_allocated_inos(MDRequest *mdr); - CInode* rdlock_path_pin_ref(MDRequest *mdr, bool want_auth, bool rdlock_dft=false); - CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist); + CInode* rdlock_path_pin_ref(MDRequest *mdr, set& rdlocks, bool want_auth, bool rdlock_dft=false); + CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, set& rdlocks, set& wrlocks, + set& xlocks, bool okexist, bool mustexist); CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequest *mdr); -- 2.39.5