}
if (!object->is_auth()) {
- ceph_assert(!mdr->lock_cache);
+ if (mdr->lock_cache) { // debug
+ ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
+ CDentry *dn = mdr->dn[0].back();
+ ceph_assert(dn->get_projected_linkage()->is_remote());
+ }
+
if (object->is_ambiguous_auth()) {
// wait
dout(10) << " ambiguous auth, waiting to authpin " << *object << dendl;
+ mdr->disable_lock_cache();
drop_locks(mdr.get());
mdr->drop_local_auth_pins();
marker.message = "waiting for single auth, object is being migrated";
} else {
ceph_assert(0 == "unknown type of lock parent");
}
- ceph_assert(dir->get_inode() == mdr->lock_cache->get_dir_inode());
- /* forcibly auth pin if lock cache is used */
- continue;
+ if (dir->get_inode() == mdr->lock_cache->get_dir_inode()) {
+ // forcibly auth pin if there is lock cache on parent dir
+ continue;
+ }
+
+ { // debug
+ ceph_assert(mdr->lock_cache->opcode == CEPH_MDS_OP_UNLINK);
+ CDentry *dn = mdr->dn[0].back();
+ ceph_assert(dn->get_projected_linkage()->is_remote());
+ }
}
// wait
+ mdr->disable_lock_cache();
drop_locks(mdr.get());
mdr->drop_local_auth_pins();
if (auth_pin_nonblocking) {
}
}
-void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri)
+void Locker::create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout)
{
if (mdr->lock_cache)
return;
}
auto lock_cache = new MDLockCache(cap, opcode);
+ if (dir_layout)
+ lock_cache->set_dir_layout(*dir_layout);
cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
for (auto dir : dfv) {
void drop_locks_for_fragment_unfreeze(MutationImpl *mut);
int get_cap_bit_for_lock_cache(int op);
- void create_lock_cache(MDRequestRef& mdr, CInode *diri);
+ void create_lock_cache(MDRequestRef& mdr, CInode *diri, file_layout_t *dir_layout=nullptr);
bool find_and_attach_lock_cache(MDRequestRef& mdr, CInode *diri);
void invalidate_lock_caches(CDir *dir);
void invalidate_lock_caches(SimpleLock *lock);
return 1;
}
- if (rdlock_snap) {
+ if (flags & MDS_TRAVERSE_CHECK_LOCKCACHE)
+ mds->locker->find_and_attach_lock_cache(mdr, cur);
+
+ if (mdr && mdr->lock_cache) {
+ if (flags & MDS_TRAVERSE_WANT_DIRLAYOUT)
+ mdr->dir_layout = mdr->lock_cache->get_dir_layout();
+ } else if (rdlock_snap) {
int n = (flags & MDS_TRAVERSE_RDLOCK_SNAP2) ? 1 : 0;
if ((n == 0 && !(mdr->locking_state & MutationImpl::SNAP_LOCKED)) ||
(n == 1 && !(mdr->locking_state & MutationImpl::SNAP2_LOCKED))) {
MutationImpl::LockOpVec lov;
- unsigned depth = 0;
- while (depth < path.depth()) {
+ for (unsigned depth = 0; depth < path.depth(); ) {
dout(12) << "traverse: path seg depth " << depth << " '" << path[depth]
<< "' snapid " << snapid << dendl;
if (rdlock_path) {
lov.clear();
if (xlock_dentry && depth == path.depth() - 1) {
- lov.add_wrlock(&cur->filelock);
- lov.add_wrlock(&cur->nestlock);
- if (rdlock_authlock)
- lov.add_rdlock(&cur->authlock);
-
+ if (depth > 0 || !mdr->lock_cache) {
+ lov.add_wrlock(&cur->filelock);
+ lov.add_wrlock(&cur->nestlock);
+ if (rdlock_authlock)
+ lov.add_rdlock(&cur->authlock);
+ }
lov.add_xlock(&dn->lock);
} else {
+ // force client to flush async dir operation if necessary
+ if (cur->filelock.is_cached())
+ lov.add_wrlock(&cur->filelock);
lov.add_rdlock(&dn->lock);
}
if (!mds->locker->acquire_locks(mdr, lov)) {
if (rdlock_path) {
lov.clear();
if (xlock_dentry) {
- lov.add_wrlock(&cur->filelock);
- lov.add_wrlock(&cur->nestlock);
- if (rdlock_authlock)
- lov.add_rdlock(&cur->authlock);
+ if (depth > 0 || !mdr->lock_cache) {
+ lov.add_wrlock(&cur->filelock);
+ lov.add_wrlock(&cur->nestlock);
+ if (rdlock_authlock)
+ lov.add_rdlock(&cur->authlock);
+ }
lov.add_xlock(&dn->lock);
} else {
+ // force client to flush async dir operation if necessary
+ if (cur->filelock.is_cached())
+ lov.add_wrlock(&cur->filelock);
lov.add_rdlock(&dn->lock);
}
if (!mds->locker->acquire_locks(mdr, lov)) {
static const int MDS_TRAVERSE_RDLOCK_PATH = (1 << 7);
static const int MDS_TRAVERSE_XLOCK_DENTRY = (1 << 8);
static const int MDS_TRAVERSE_RDLOCK_AUTHLOCK = (1 << 9);
+static const int MDS_TRAVERSE_CHECK_LOCKCACHE = (1 << 10);
// flags for predirty_journal_parents()
lock_set locks; // full ordering
MDLockCache* lock_cache = nullptr;
+ bool lock_cache_disabled = false;
+
+ void disable_lock_cache() {
+ lock_cache_disabled = true;
+ }
lock_iterator emplace_lock(SimpleLock *l, unsigned f=0, mds_rank_t t=MDS_RANK_NONE) {
last_locked = l;
CInode *diri;
Capability *client_cap;
int opcode;
+ file_layout_t dir_layout;
elist<MDLockCache*>::item item_cap_lock_cache;
}
CInode *get_dir_inode() { return diri; }
+ void set_dir_layout(file_layout_t& layout) {
+ dir_layout = layout;
+ }
+ const file_layout_t& get_dir_layout() const {
+ return dir_layout;
+ }
+
void attach_locks();
void attach_dirfrags(std::vector<CDir*>&& dfv);
void detach_all();
mds_rank_t from = mds_rank_t(ack->get_source().num());
if (ack->is_req_blocked()) {
+ mdr->disable_lock_cache();
// slave auth pin is blocked, drop locks to avoid deadlock
mds->locker->drop_locks(mdr.get(), nullptr);
return;
int flags = MDS_TRAVERSE_RDLOCK_SNAP | MDS_TRAVERSE_RDLOCK_PATH |
MDS_TRAVERSE_WANT_DENTRY | MDS_TRAVERSE_XLOCK_DENTRY |
MDS_TRAVERSE_WANT_AUTH;
+ if (refpath.depth() == 1 && !mdr->lock_cache_disabled)
+ flags |= MDS_TRAVERSE_CHECK_LOCKCACHE;
if (create)
flags |= MDS_TRAVERSE_RDLOCK_AUTHLOCK;
if (want_layout)
if (!check_fragment_space(mdr, dir))
return;
+ if (mdr->dn[0].size() == 1)
+ mds->locker->create_lock_cache(mdr, diri, &mdr->dir_layout);
+
// create inode.
CInode *in = prepare_new_inode(mdr, dn->get_dir(), inodeno_t(req->head.ino),
req->head.args.open.mode | S_IFREG, &layout);
if ((mode & S_IFMT) == 0)
mode |= S_IFREG;
+ mdr->disable_lock_cache();
CDentry *dn = rdlock_path_xlock_dentry(mdr, true, false, S_ISREG(mode));
if (!dn)
return;
{
const cref_t<MClientRequest> &req = mdr->client_request;
+ mdr->disable_lock_cache();
CDentry *dn = rdlock_path_xlock_dentry(mdr, true);
if (!dn)
return;
void Server::handle_client_symlink(MDRequestRef& mdr)
{
+ mdr->disable_lock_cache();
CDentry *dn = rdlock_path_xlock_dentry(mdr, true);
if (!dn)
return;
// rmdir or unlink?
bool rmdir = (req->get_op() == CEPH_MDS_OP_RMDIR);
+ if (rmdir)
+ mdr->disable_lock_cache();
CDentry *dn = rdlock_path_xlock_dentry(mdr, false, true);
if (!dn)
return;
return; // we're waiting for a witness.
}
+ if (!rmdir && dnl->is_primary() && mdr->dn[0].size() == 1)
+ mds->locker->create_lock_cache(mdr, diri);
+
// ok!
if (dnl->is_remote() && !dnl->get_inode()->is_auth())
_link_remote(mdr, false, dn, dnl->get_inode());