* - they also define the lock ordering by the MDS
* - a few of these are internal to the mds
*/
-#define CEPH_LOCK_DVERSION 1
-#define CEPH_LOCK_DN 2
-#define CEPH_LOCK_IVERSION 16 /* mds internal */
-#define CEPH_LOCK_ISNAP 32
-#define CEPH_LOCK_IFILE 64
-#define CEPH_LOCK_IAUTH 128
-#define CEPH_LOCK_ILINK 256
-#define CEPH_LOCK_IDFT 512 /* dir frag tree */
-#define CEPH_LOCK_INEST 1024 /* mds internal */
-#define CEPH_LOCK_IXATTR 2048
-#define CEPH_LOCK_IFLOCK 4096 /* advisory file locks */
-#define CEPH_LOCK_INO 8192 /* immutable inode bits; not a lock */
-#define CEPH_LOCK_IPOLICY 16384 /* policy lock on dirs. MDS internal */
+#define CEPH_LOCK_DN (1 << 0)
+#define CEPH_LOCK_DVERSION (1 << 1)
+#define CEPH_LOCK_ISNAP (1 << 4) /* snapshot lock. MDS internal */
+#define CEPH_LOCK_IPOLICY (1 << 5) /* policy lock on dirs. MDS internal */
+#define CEPH_LOCK_IFILE (1 << 6)
+#define CEPH_LOCK_INEST (1 << 7) /* mds internal */
+#define CEPH_LOCK_IDFT (1 << 8) /* dir frag tree */
+#define CEPH_LOCK_IAUTH (1 << 9)
+#define CEPH_LOCK_ILINK (1 << 10)
+#define CEPH_LOCK_IXATTR (1 << 11)
+#define CEPH_LOCK_IFLOCK (1 << 12) /* advisory file locks */
+#define CEPH_LOCK_IVERSION (1 << 13) /* mds internal */
+
+#define CEPH_LOCK_IFIRST CEPH_LOCK_ISNAP
+
/* client_session ops */
enum {
}
}
-
-
-
-void Locker::include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov)
+bool Locker::try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
+ int n, bool want_layout)
{
+ dout(10) << __func__ << " " << *mdr << " " << *in << dendl;
// rdlock ancestor snaps
- CInode *t = in;
- while (t->get_projected_parent_dn()) {
- t = t->get_projected_parent_dn()->get_dir()->get_inode();
- lov.add_rdlock(&t->snaplock);
- }
- lov.add_rdlock(&in->snaplock);
-}
+ bool found_locked = false;
+ bool found_layout = false;
+
+ if (want_layout)
+ ceph_assert(n == 0);
+
+ client_t client = mdr->get_client();
-void Locker::include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
- file_layout_t **layout)
-{
- //rdlock ancestor snaps
CInode *t = in;
- lov.add_rdlock(&in->snaplock);
- lov.add_rdlock(&in->policylock);
- bool found_layout = false;
- while (t) {
- lov.add_rdlock(&t->snaplock);
- if (!found_layout) {
- lov.add_rdlock(&t->policylock);
+ while (true) {
+ if (!found_locked && mdr->is_rdlocked(&t->snaplock))
+ found_locked = true;
+
+ if (!found_locked) {
+ if (!t->snaplock.can_rdlock(client)) {
+ t->snaplock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
+ goto failed;
+ }
+ t->snaplock.get_rdlock();
+ mdr->locks.emplace(&t->snaplock, MutationImpl::LockOp::RDLOCK);
+ dout(20) << " got rdlock on " << t->snaplock << " " << *t << dendl;
+ }
+ if (want_layout && !found_layout) {
+ if (!mdr->is_rdlocked(&t->policylock)) {
+ if (!t->policylock.can_rdlock(client)) {
+ t->policylock.add_waiter(SimpleLock::WAIT_RD, new C_MDS_RetryRequest(mdcache, mdr));
+ goto failed;
+ }
+ t->policylock.get_rdlock();
+ mdr->locks.emplace(&t->policylock, MutationImpl::LockOp::RDLOCK);
+ dout(20) << " got rdlock on " << t->policylock << " " << *t << dendl;
+ }
if (t->get_projected_inode()->has_layout()) {
- *layout = &t->get_projected_inode()->layout;
- found_layout = true;
+ mdr->dir_layout = t->get_projected_inode()->layout;
+ found_layout = true;
}
}
- if (t->get_projected_parent_dn() &&
- t->get_projected_parent_dn()->get_dir())
- t = t->get_projected_parent_dn()->get_dir()->get_inode();
- else t = NULL;
+ CDentry* pdn = t->get_projected_parent_dn();
+ if (!pdn)
+ break;
+ t = pdn->get_dir()->get_inode();
}
+
+ return true;
+
+failed:
+ dout(10) << __func__ << " failed" << dendl;
+
+ drop_locks(mdr.get(), nullptr);
+ mdr->drop_local_auth_pins();
+ return false;
}
struct MarkEventOnDestruct {
CInode *auth_pin_freeze,
bool auth_pin_nonblocking)
{
- if (mdr->done_locking &&
- !mdr->is_slave()) { // not on slaves! master requests locks piecemeal.
- dout(10) << "acquire_locks " << *mdr << " - done locking" << dendl;
- return true; // at least we had better be!
- }
dout(10) << "acquire_locks " << *mdr << dendl;
MarkEventOnDestruct marker(mdr, "failed to acquire_locks");
lov.add_xlock(&dn->versionlock, i + 1);
}
}
- if (lock->get_type() > CEPH_LOCK_IVERSION) {
+ if (lock->get_type() >= CEPH_LOCK_IFIRST && lock->get_type() != CEPH_LOCK_IVERSION) {
// inode version lock?
CInode *in = static_cast<CInode*>(object);
if (!in->is_auth())
}
}
- mdr->done_locking = true;
mdr->set_mds_stamp(ceph_clock_now());
result = true;
marker.message = "acquired locks";
if (pneed_issue == &my_need_issue)
issue_caps_set(*pneed_issue);
- mut->done_locking = false;
+ mut->locking_state = 0;
}
void Locker::drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue)
if (lock->get_type() != CEPH_LOCK_DN &&
lock->get_type() != CEPH_LOCK_ISNAP &&
+ lock->get_type() != CEPH_LOCK_IPOLICY &&
p->is_freezing()) {
dout(7) << "try_eval " << *lock << " freezing, waiting on " << *p << dendl;
p->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_Eval(this, p, lock->get_type()));
ceph_assert(!lock->is_stable());
if (lock->get_type() != CEPH_LOCK_DN &&
lock->get_type() != CEPH_LOCK_ISNAP &&
+ lock->get_type() != CEPH_LOCK_IPOLICY &&
lock->get_num_rdlocks() == 0 &&
lock->get_num_wrlocks() == 0 &&
!lock->is_leased() &&
if (lock->get_parent()->is_freezing_or_frozen()) {
// dentry/snap lock in unreadable state can block path traverse
if ((lock->get_type() != CEPH_LOCK_DN &&
- lock->get_type() != CEPH_LOCK_ISNAP) ||
+ lock->get_type() != CEPH_LOCK_ISNAP &&
+ lock->get_type() != CEPH_LOCK_IPOLICY) ||
lock->get_state() == LOCK_SYNC ||
lock->get_parent()->is_frozen())
return;
void nudge_log(SimpleLock *lock);
- void include_snap_rdlocks(CInode *in, MutationImpl::LockOpVec& lov);
- void include_snap_rdlocks_wlayout(CInode *in, MutationImpl::LockOpVec& lov,
- file_layout_t **layout);
-
bool acquire_locks(MDRequestRef& mdr,
MutationImpl::LockOpVec& lov,
CInode *auth_pin_freeze=NULL,
bool auth_pin_nonblocking=false);
+ bool try_rdlock_snap_layout(CInode *in, MDRequestRef& mdr,
+ int n=0, bool want_layout=false);
+
void notify_freeze_waiter(MDSCacheObject *o);
void cancel_locking(MutationImpl *mut, std::set<CInode*> *pneed_issue);
void drop_locks(MutationImpl *mut, std::set<CInode*> *pneed_issue=0);
bool last_xlocked = (flags & MDS_TRAVERSE_LAST_XLOCKED);
bool want_dentry = (flags & MDS_TRAVERSE_WANT_DENTRY);
bool want_auth = (flags & MDS_TRAVERSE_WANT_AUTH);
+ bool rdlock_snap = (flags & (MDS_TRAVERSE_RDLOCK_SNAP | MDS_TRAVERSE_RDLOCK_SNAP2));
if (forward)
ceph_assert(mdr); // forward requires a request
return 1;
}
+ if (rdlock_snap) {
+ int n = (flags & MDS_TRAVERSE_RDLOCK_SNAP2) ? 1 : 0;
+ if ((n == 0 && !(mdr->locking_state & MutationImpl::SNAP_LOCKED)) ||
+ (n == 1 && !(mdr->locking_state & MutationImpl::SNAP2_LOCKED))) {
+ bool want_layout = (flags & MDS_TRAVERSE_WANT_DIRLAYOUT);
+ if (!mds->locker->try_rdlock_snap_layout(cur, mdr, n, want_layout))
+ return 1;
+ }
+ }
+
// start trace
if (pdnvec)
pdnvec->clear();
if (pin)
*pin = cur;
+ MutationImpl::LockOpVec lov;
+
unsigned depth = 0;
while (depth < path.depth()) {
dout(12) << "traverse: path seg depth " << depth << " '" << path[depth]
snapid = realm->resolve_snapname(path[depth], cur->ino());
dout(10) << "traverse: snap " << path[depth] << " -> " << snapid << dendl;
if (!snapid) {
- CInode *t = cur;
- while (t) {
- // if snaplock isn't readable, it's possible that other mds is creating
- // snapshot, but snap update message hasn't been received.
- if (!t->snaplock.can_read(client)) {
- dout(10) << " non-readable snaplock on " << *t << dendl;
- t->snaplock.add_waiter(SimpleLock::WAIT_RD, cf.build());
- return 1;
- }
- CDentry *pdn = t->get_projected_parent_dn();
- t = pdn ? pdn->get_dir()->get_inode() : NULL;
- }
+ if (pdnvec)
+ pdnvec->clear(); // do not confuse likes of rdlock_path_pin_ref();
return -ENOENT;
}
mdr->snapid = snapid;
return 1;
}
+ if (rdlock_snap && !(want_dentry && depth == path.depth() - 1)) {
+ lov.clear();
+ lov.add_rdlock(&cur->snaplock);
+ if (!mds->locker->acquire_locks(mdr, lov)) {
+ dout(10) << "traverse: failed to rdlock " << cur->snaplock << " " << *cur << dendl;
+ return 1;
+ }
+ }
+
// add to trace, continue.
touch_inode(cur);
if (pin)
dout(10) << "path_traverse finish on snapid " << snapid << dendl;
if (mdr)
ceph_assert(mdr->snapid == snapid);
+
+ if (flags & MDS_TRAVERSE_RDLOCK_SNAP)
+ mdr->locking_state |= MutationImpl::SNAP_LOCKED;
+ else if (flags & MDS_TRAVERSE_RDLOCK_SNAP2)
+ mdr->locking_state |= MutationImpl::SNAP2_LOCKED;
+
return 0;
}
// rollback slave requests is tricky. just let the request proceed.
if (mdr->has_more() &&
(!mdr->more()->witnessed.empty() || !mdr->more()->waiting_on_slave.empty())) {
- if (!mdr->done_locking) {
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
ceph_assert(mdr->more()->witnessed.empty());
mdr->aborted = true;
dout(10) << "request_kill " << *mdr << " -- waiting for slave reply, delaying" << dendl;
}
MutationImpl::LockOpVec lov;
- mds->locker->include_snap_rdlocks(in, lov);
- lov.erase_rdlock(&in->snaplock);
lov.add_xlock(&in->snaplock);
-
if (!mds->locker->acquire_locks(mdr, lov))
return;
static const int MDS_TRAVERSE_LAST_XLOCKED = (1 << 1);
static const int MDS_TRAVERSE_WANT_DENTRY = (1 << 2);
static const int MDS_TRAVERSE_WANT_AUTH = (1 << 3);
-
+static const int MDS_TRAVERSE_RDLOCK_SNAP = (1 << 4);
+static const int MDS_TRAVERSE_RDLOCK_SNAP2 = (1 << 5);
+static const int MDS_TRAVERSE_WANT_DIRLAYOUT = (1 << 6);
// flags for predirty_journal_parents()
static const int PREDIRTY_PRIMARY = 1; // primary dn, adjust nested accounting
map<CInode*, map<client_t,Capability::Export> >& peer_exports,
list<ScatterLock*>& updated_scatterlocks)
{
+ CInode *in;
+ bool added = false;
DECODE_START(1, blp);
dout(15) << __func__ << " on " << *dn << dendl;
decode(ino, blp);
decode(last, blp);
- bool added = false;
- CInode *in = cache->get_inode(ino, last);
+ in = cache->get_inode(ino, last);
if (!in) {
in = new CInode(mds->mdcache, true, 1, last);
added = true;
// caps
decode_import_inode_caps(in, true, blp, peer_exports);
+ DECODE_FINISH(blp);
+
// link before state -- or not! -sage
if (dn->get_linkage()->get_inode() != in) {
ceph_assert(!dn->get_linkage()->get_inode());
in->snaplock.get_state() != LOCK_SYNC)
mds->locker->try_eval(&in->snaplock, NULL);
- DECODE_FINISH(blp);
+ if (in->policylock.is_stable() &&
+ in->policylock.get_state() != LOCK_SYNC)
+ mds->locker->try_eval(&in->policylock, NULL);
}
void Migrator::decode_import_inode_caps(CInode *in, bool auth_cap,
// if this flag is set, do not attempt to acquire further locks.
// (useful for wrlock, which may be a moving auth target)
- bool done_locking = false;
+ enum {
+ SNAP_LOCKED = 1,
+ SNAP2_LOCKED = 2,
+ PATH_LOCKED = 4,
+ ALL_LOCKED = 8,
+ };
+ int locking_state = 0;
+
bool committing = false;
bool aborted = false;
bool killed = false;
// -- i am a client (master) request
cref_t<MClientRequest> client_request; // client request (if any)
+ file_layout_t dir_layout;
// store up to two sets of dn vectors, inode pointers, for request path1 and path2.
vector<CDentry*> dn[2];
- CDentry *straydn;
CInode *in[2];
+ CDentry *straydn;
snapid_t snapid;
CInode *tracei;
if (straydn->get_name() == straydname)
return straydn;
- ceph_assert(!mdr->done_locking);
mdr->unpin(straydn);
}
bool want_auth,
bool no_want_auth, /* for readdir, who doesn't want auth _even_if_ it's
a snapped dir */
- file_layout_t **layout,
+ bool want_layout,
bool no_lookup) // true if we cannot return a null dentry lease
{
const filepath& refpath = n ? mdr->get_filepath2() : mdr->get_filepath();
dout(10) << "rdlock_path_pin_ref " << *mdr << " " << refpath << dendl;
- if (mdr->done_locking)
+ if (mdr->locking_state & MutationImpl::PATH_LOCKED)
return mdr->in[n];
if (!no_want_auth && refpath.is_last_snap())
// traverse
CF_MDS_MDRContextFactory cf(mdcache, mdr);
- int flags = 0;
+ int flags = n == 0 ? MDS_TRAVERSE_RDLOCK_SNAP : MDS_TRAVERSE_RDLOCK_SNAP2;
if (want_auth)
flags |= MDS_TRAVERSE_WANT_AUTH;
+ if (want_layout)
+ flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[n], &mdr->in[n]);
if (r > 0)
return nullptr; // delayed
for (int i=0; i<(int)mdr->dn[n].size(); i++)
lov.add_rdlock(&mdr->dn[n][i]->lock);
- if (layout)
- mds->locker->include_snap_rdlocks_wlayout(ref, lov, layout);
- else
- mds->locker->include_snap_rdlocks(ref, lov);
// set and pin ref
mdr->pin(ref);
CDentry* Server::rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
MutationImpl::LockOpVec& lov,
bool okexist, bool alwaysxlock,
- file_layout_t **layout)
+ bool want_layout)
{
const filepath& refpath = n ? mdr->get_filepath2() : mdr->get_filepath();
dout(10) << "rdlock_path_xlock_dentry " << *mdr << " " << refpath << dendl;
- if (mdr->done_locking)
+ if (mdr->locking_state & MutationImpl::PATH_LOCKED)
return mdr->dn[n].back();
CF_MDS_MDRContextFactory cf(mdcache, mdr);
int flags = MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_WANT_AUTH;
+ if (n == 0)
+ flags |= MDS_TRAVERSE_RDLOCK_SNAP;
+ else
+ flags |= MDS_TRAVERSE_RDLOCK_SNAP2;
+ if (want_layout)
+ flags |= MDS_TRAVERSE_WANT_DIRLAYOUT;
int r = mdcache->path_traverse(mdr, cf, refpath, flags, &mdr->dn[n]);
if (r > 0)
return nullptr; // delayed
lov.add_rdlock(&dn->lock); // existing dn, rdlock
lov.add_wrlock(&dn->get_dir()->inode->filelock); // also, wrlock on dir mtime
lov.add_wrlock(&dn->get_dir()->inode->nestlock); // also, wrlock on dir mtime
- if (layout)
- mds->locker->include_snap_rdlocks_wlayout(dn->get_dir()->inode, lov, layout);
- else
- mds->locker->include_snap_rdlocks(dn->get_dir()->inode, lov);
return dn;
}
ref->filelock.get_num_wrlocks() > 0 ||
!ref->filelock.can_read(mdr->get_client())) {
lov.add_rdlock(&ref->filelock);
- mdr->done_locking = false;
+ mdr->locking_state &= ~MutationImpl::ALL_LOCKED;
}
}
if (cur->is_frozen() || cur->state_test(CInode::STATE_EXPORTINGCAPS)) {
ceph_assert(!need_auth);
- mdr->done_locking = false;
+ mdr->locking_state &= ~MutationImpl::PATH_LOCKED;
CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
return;
}
MutationImpl::LockOpVec lov;
- file_layout_t *dir_layout = nullptr;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov,
- !excl, false, &dir_layout);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, !excl, false, true);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
respond_to_request(mdr, -EROFS);
}
// set layout
file_layout_t layout;
- if (dir_layout)
- layout = *dir_layout;
+ if (mdr->dir_layout != file_layout_t())
+ layout = mdr->dir_layout;
else
layout = mdcache->default_file_layout;
{
const cref_t<MClientRequest> &req = mdr->client_request;
MutationImpl::LockOpVec lov;
- file_layout_t *dir_layout = nullptr;
- CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
+ CInode *cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, true);
if (!cur) return;
if (mdr->snapid != CEPH_NOSNAP) {
file_layout_t layout;
if (old_pi->has_layout())
layout = old_pi->layout;
- else if (dir_layout)
- layout = *dir_layout;
+ else if (mdr->dir_layout != file_layout_t())
+ layout = mdr->dir_layout;
else
layout = mdcache->default_file_layout;
}
void Server::handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
- file_layout_t *dir_layout,
MutationImpl::LockOpVec& lov)
{
const cref_t<MClientRequest> &req = mdr->client_request;
file_layout_t layout;
if (cur->get_projected_inode()->has_layout())
layout = cur->get_projected_inode()->layout;
- else if (dir_layout)
- layout = *dir_layout;
+ else if (mdr->dir_layout != file_layout_t())
+ layout = mdr->dir_layout;
else
layout = mdcache->default_file_layout;
}
void Server::handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
- file_layout_t *dir_layout,
MutationImpl::LockOpVec& lov)
{
const cref_t<MClientRequest> &req = mdr->client_request;
// null/none value (empty string, means default layout). Is equivalent
// to a setxattr with empty string: pass through the empty payload of
// the rmxattr request to do this.
- handle_set_vxattr(mdr, cur, dir_layout, lov);
+ handle_set_vxattr(mdr, cur, lov);
return;
}
MutationImpl::LockOpVec lov;
CInode *cur;
- file_layout_t *dir_layout = NULL;
if (name.compare(0, 15, "ceph.dir.layout") == 0)
- cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, true);
else
cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
// magic ceph.* namespace?
if (name.compare(0, 5, "ceph.") == 0) {
- handle_set_vxattr(mdr, cur, dir_layout, lov);
+ handle_set_vxattr(mdr, cur, lov);
return;
}
std::string name(req->get_path2());
MutationImpl::LockOpVec lov;
- file_layout_t *dir_layout = nullptr;
CInode *cur;
if (name == "ceph.dir.layout")
- cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, &dir_layout);
+ cur = rdlock_path_pin_ref(mdr, 0, lov, true, false, true);
else
cur = rdlock_path_pin_ref(mdr, 0, lov, true);
if (!cur)
}
if (name.compare(0, 5, "ceph.") == 0) {
- handle_remove_vxattr(mdr, cur, dir_layout, lov);
+ handle_remove_vxattr(mdr, cur, lov);
return;
}
const cref_t<MClientRequest> &req = mdr->client_request;
client_t client = mdr->get_client();
MutationImpl::LockOpVec lov;
- file_layout_t *dir_layout = nullptr;
- CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false,
- &dir_layout);
+ CDentry *dn = rdlock_path_xlock_dentry(mdr, 0, lov, false, false, true);
if (!dn) return;
if (mdr->snapid != CEPH_NOSNAP) {
respond_to_request(mdr, -EROFS);
// set layout
file_layout_t layout;
- if (dir_layout && S_ISREG(mode))
- layout = *dir_layout;
+ if (S_ISREG(mode) && mdr->dir_layout != file_layout_t())
+ layout = mdr->dir_layout;
else
layout = mdcache->default_file_layout;
lov.add_xlock(&straydn->lock);
}
- mds->locker->include_snap_rdlocks(diri, lov);
+ // FIXME
+ // mds->locker->include_snap_rdlocks(diri, lov);
lov.add_xlock(&in->snaplock);
if (in->is_dir())
lov.add_rdlock(&in->filelock); // to verify it's empty
lov.add_wrlock(&srcdir->inode->filelock);
lov.add_wrlock(&srcdir->inode->nestlock);
}
- mds->locker->include_snap_rdlocks(srcdir->inode, lov);
+ // FIXME
+ // mds->locker->include_snap_rdlocks(srcdir->inode, lov);
// straydn?
if (straydn) {
dout(10) << "lssnap on " << *diri << dendl;
// lock snap
- MutationImpl::LockOpVec lov;
- mds->locker->include_snap_rdlocks(diri, lov);
- if (!mds->locker->acquire_locks(mdr, lov))
+ if (!mds->locker->try_rdlock_snap_layout(diri, mdr))
return;
if (!check_access(mdr, diri, MAY_READ))
dout(10) << "mksnap " << snapname << " on " << *diri << dendl;
// lock snap
- MutationImpl::LockOpVec lov;
-
- mds->locker->include_snap_rdlocks(diri, lov);
- lov.erase_rdlock(&diri->snaplock);
- lov.add_xlock(&diri->snaplock);
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
+ lov.add_xlock(&diri->snaplock);
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
+ if (CDentry *pdn = diri->get_projected_parent_dn(); pdn) {
+ if (!mds->locker->try_rdlock_snap_layout(pdn->get_dir()->get_inode(), mdr))
+ return;
+ }
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
return;
snapid_t snapid = diri->snaprealm->resolve_snapname(snapname, diri->ino());
dout(10) << " snapname " << snapname << " is " << snapid << dendl;
- MutationImpl::LockOpVec lov;
- mds->locker->include_snap_rdlocks(diri, lov);
- lov.erase_rdlock(&diri->snaplock);
- lov.add_xlock(&diri->snaplock);
-
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
+ lov.add_xlock(&diri->snaplock);
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
+ if (CDentry *pdn = diri->get_projected_parent_dn(); pdn) {
+ if (!mds->locker->try_rdlock_snap_layout(pdn->get_dir()->get_inode(), mdr))
+ return;
+ }
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
return;
dout(10) << " snapname " << srcname << " is " << snapid << dendl;
// lock snap
- MutationImpl::LockOpVec lov;
-
- mds->locker->include_snap_rdlocks(diri, lov);
- lov.erase_rdlock(&diri->snaplock);
- lov.add_xlock(&diri->snaplock);
-
- if (!mds->locker->acquire_locks(mdr, lov))
- return;
+ if (!(mdr->locking_state & MutationImpl::ALL_LOCKED)) {
+ MutationImpl::LockOpVec lov;
+ lov.add_xlock(&diri->snaplock);
+ if (!mds->locker->acquire_locks(mdr, lov))
+ return;
+ if (CDentry *pdn = diri->get_projected_parent_dn(); pdn) {
+ if (!mds->locker->try_rdlock_snap_layout(pdn->get_dir()->get_inode(), mdr))
+ return;
+ }
+ mdr->locking_state |= MutationImpl::ALL_LOCKED;
+ }
if (!check_access(mdr, diri, MAY_WRITE|MAY_SNAPSHOT))
return;
CInode* rdlock_path_pin_ref(MDRequestRef& mdr, int n, MutationImpl::LockOpVec& lov,
bool want_auth, bool no_want_auth=false,
- file_layout_t **layout=nullptr,
- bool no_lookup=false);
+ bool want_layout=false, bool no_lookup=false);
CDentry* rdlock_path_xlock_dentry(MDRequestRef& mdr, int n,
MutationImpl::LockOpVec& lov,
bool okexist, bool alwaysxlock,
- file_layout_t **layout=nullptr);
+ bool want_layout=false);
CDir* try_open_auth_dirfrag(CInode *diri, frag_t fg, MDRequestRef& mdr);
string value,
file_layout_t *layout);
void handle_set_vxattr(MDRequestRef& mdr, CInode *cur,
- file_layout_t *dir_layout,
MutationImpl::LockOpVec& lov);
void handle_remove_vxattr(MDRequestRef& mdr, CInode *cur,
- file_layout_t *dir_layout,
MutationImpl::LockOpVec& lov);
void handle_client_setxattr(MDRequestRef& mdr);
void handle_client_removexattr(MDRequestRef& mdr);
case CEPH_LOCK_INEST: return "inest";
case CEPH_LOCK_IXATTR: return "ixattr";
case CEPH_LOCK_ISNAP: return "isnap";
- case CEPH_LOCK_INO: return "ino";
case CEPH_LOCK_IFLOCK: return "iflock";
case CEPH_LOCK_IPOLICY: return "ipolicy";
- default: ceph_abort(); return std::string_view();
+ default: return "unknown";
}
}