ceph_assert(lock_cache->invalidating);
+ /* The lock cache is only removed from the cap's list when the cap is removed
+ * or the lock cache is finally deleted.
+ */
+ lock_cache->item_cap_lock_cache.remove_myself();
lock_cache->detach_locks();
CInode *diri = lock_cache->get_dir_inode();
mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
}
-int Locker::get_cap_bit_for_lock_cache(int op)
-{
- switch(op) {
- case CEPH_MDS_OP_CREATE:
- return CEPH_CAP_DIR_CREATE;
- case CEPH_MDS_OP_UNLINK:
- return CEPH_CAP_DIR_UNLINK;
- default:
- ceph_assert(0 == "unsupported operation");
- return 0;
- }
-}
-
void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
{
dout(15) << __func__ << ": " << *lock_cache << dendl;
- ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
- if (lock_cache->invalidating) {
- ceph_assert(!lock_cache->client_cap);
- } else {
+ if (!lock_cache->invalidating) {
lock_cache->invalidating = true;
lock_cache->detach_dirfrags();
}
Capability *cap = lock_cache->client_cap;
if (cap) {
- int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+ int cap_bit = lock_cache->get_cap_bit();
cap->clear_lock_cache_allowed(cap_bit);
- if (cap->issued() & cap_bit)
+ if (cap->issued() & cap_bit) {
issue_caps(lock_cache->get_dir_inode(), cap);
- else
+ } else {
cap = nullptr;
+ }
+ } else {
+ /* cap is removed but the lock_cache may not yet be 0 ref */
+ lock_cache->item_cap_lock_cache.remove_myself();
}
-
if (!cap) {
- lock_cache->item_cap_lock_cache.remove_myself();
- put_lock_cache(lock_cache);
+ /* the cap is removed or we lost relevant op rights */
+ if (lock_cache->cap_ref) {
+ put_lock_cache(lock_cache);
+ lock_cache->cap_ref = false;
+ }
+ /* N.B.: the lock cache may still be associated with the cap even when
+ * invalidated so a new lock cache is not created.
+ */
}
}
for (auto p = cap->lock_caches.begin(); !p.end(); ) {
MDLockCache *lock_cache = *p;
++p;
- if (!lock_cache->invalidating)
- continue;
- int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+ int cap_bit = lock_cache->get_cap_bit();
if (!(cap->issued() & cap_bit)) {
- lock_cache->item_cap_lock_cache.remove_myself();
- put_lock_cache(lock_cache);
+ dout(20) << __func__ << ": lost " << ccap_string(cap_bit) << " on " << *lock_cache << dendl;
+ invalidate_lock_cache(lock_cache);
}
}
}
client_t client = mdr->get_client();
int opcode = mdr->client_request->get_op();
- dout(10) << "create_lock_cache for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
+ dout(10) << __func__ << ": for client." << client << "/" << ceph_mds_op_name(opcode)<< " on " << *diri << dendl;
if (!diri->is_auth()) {
dout(10) << " dir inode is not auth, noop" << dendl;
}
}
+ int cap_bit = MDLockCache::get_cap_bit_for_lock_cache(opcode);
+ if (!(cap->issued() & cap_bit)) {
+ dout(10) << " client cap lacks rights for lock cache: " << ccap_string(cap_bit) << dendl;
+ return;
+ }
+
set<MDSCacheObject*> ancestors;
for (CInode *in = diri; ; ) {
CDentry *pdn = in->get_projected_parent_dn();
auto lock_cache = new MDLockCache(cap, opcode);
if (dir_layout)
lock_cache->set_dir_layout(*dir_layout);
- cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
+ cap->set_lock_cache_allowed(lock_cache->get_cap_bit());
for (auto dir : dfv) {
// prevent subtree migration
}
lock_cache->attach_locks();
- lock_cache->ref++;
+ dout(20) << __func__ << ": created " << *lock_cache << dendl;
+
mdr->lock_cache = lock_cache;
+ lock_cache->ref++; /* for mdr */
}
bool Locker::find_and_attach_lock_cache(const MDRequestRef& mdr, CInode *diri)
int opcode = mdr->client_request->get_op();
for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
MDLockCache *lock_cache = *p;
- if (lock_cache->opcode == opcode) {
- dout(10) << "found lock cache for " << ceph_mds_op_name(opcode) << " on " << *diri << dendl;
+ if (lock_cache->opcode == opcode && lock_cache->attachable()) {
+ dout(10) << "found lock cache " << *lock_cache << " on " << *diri << dendl;
mdr->lock_cache = lock_cache;
- mdr->lock_cache->ref++;
+ mdr->lock_cache->ref++; /* for mdr */
return true;
}
}
int Locker::issue_caps(CInode *in, Capability *only_cap)
{
+ dout(20) << __func__ << ": " << *in;
+ if (only_cap) {
+ *_dout << " for " << only_cap->get_client();
+ }
+ *_dout << dendl;
+
// count conflicts with
int nissued = 0;
int all_allowed = -1, loner_allowed = -1, xlocker_allowed = -1;