inc_last_seq();
return last_sent;
}
- void confirm_receipt(ceph_seq_t seq, unsigned caps) {
- bool was_revoking = (_issued & ~_pending);
+ int confirm_receipt(ceph_seq_t seq, unsigned caps) {
+ int was_revoking = (_issued & ~_pending);
if (seq == last_sent) {
_revokes.clear();
_issued = caps;
item_client_revoking_caps.remove_myself();
maybe_clear_notable();
}
- //check_rdcaps_list();
+ return was_revoking & ~_issued; // return revoked
}
// we may get a release racing with revocations, which means our revokes will be ignored
// by the client. clean them out of our _revokes history so we don't wait on them.
set_wanted(wanted() | otherwanted);
}
- void revoke() {
+ int revoke() {
if (revoking())
- confirm_receipt(last_sent, pending());
+ return confirm_receipt(last_sent, pending());
+ return 0;
}
// serializers
xlist<Capability*>::item item_client_revoking_caps;
elist<MDLockCache*> lock_caches;
+ int get_lock_cache_allowed() const { return lock_cache_allowed; }
+ void set_lock_cache_allowed(int c) { lock_cache_allowed |= c; }
+ void clear_lock_cache_allowed(int c) { lock_cache_allowed &= ~c; }
+
private:
void calc_issued() {
_issued = _pending;
int suppress = 0;
unsigned state = 0;
+
+ int lock_cache_allowed = 0;
};
WRITE_CLASS_ENCODER(Capability::Export)
mds->queue_waiter(new C_MDL_DropCache(this, lock_cache));
}
+int Locker::get_cap_bit_for_lock_cache(int op)
+{
+ switch(op) {
+ case CEPH_MDS_OP_CREATE:
+ return CEPH_CAP_DIR_CREATE;
+ case CEPH_MDS_OP_UNLINK:
+ return CEPH_CAP_DIR_UNLINK;
+ default:
+ ceph_assert(0 == "unsupported operation");
+ return 0;
+ }
+}
+
void Locker::invalidate_lock_cache(MDLockCache *lock_cache)
{
ceph_assert(lock_cache->item_cap_lock_cache.is_on_list());
- ceph_assert(!lock_cache->invalidating);
- lock_cache->invalidating = true;
- lock_cache->detach_all();
- // XXX check issued caps
- lock_cache->item_cap_lock_cache.remove_myself();
- put_lock_cache(lock_cache);
+ if (lock_cache->invalidating) {
+ ceph_assert(!lock_cache->client_cap);
+ } else {
+ lock_cache->invalidating = true;
+ lock_cache->detach_all();
+ }
+
+ Capability *cap = lock_cache->client_cap;
+ if (cap) {
+ int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+ cap->clear_lock_cache_allowed(cap_bit);
+ if (cap->issued() & cap_bit)
+ issue_caps(lock_cache->get_dir_inode(), cap);
+ else
+ cap = nullptr;
+ }
+
+ if (!cap) {
+ lock_cache->item_cap_lock_cache.remove_myself();
+ put_lock_cache(lock_cache);
+ }
+}
+
+void Locker::eval_lock_caches(Capability *cap)
+{
+ for (auto p = cap->lock_caches.begin(); !p.end(); ) {
+ MDLockCache *lock_cache = *p;
+ ++p;
+ if (!lock_cache->invalidating)
+ continue;
+ int cap_bit = get_cap_bit_for_lock_cache(lock_cache->opcode);
+ if (!(cap->issued() & cap_bit)) {
+ lock_cache->item_cap_lock_cache.remove_myself();
+ put_lock_cache(lock_cache);
+ }
+ }
}
// ask lock caches to release auth pins
return;
}
+ for (auto p = cap->lock_caches.begin(); !p.end(); ++p) {
+ if ((*p)->opcode == opcode) {
+ dout(10) << " lock cache already exists for " << ceph_mds_op_name(opcode) << ", noop" << dendl;
+ return;
+ }
+ }
+
set<MDSCacheObject*> ancestors;
for (CInode *in = diri; ; ) {
CDentry *pdn = in->get_projected_parent_dn();
}
auto lock_cache = new MDLockCache(cap, opcode);
+ cap->set_lock_cache_allowed(get_cap_bit_for_lock_cache(opcode));
for (auto dir : dfv) {
// prevent subtree migration
// add in any xlocker-only caps (for locks this client is the xlocker for)
allowed |= xlocker_allowed & in->get_xlocker_mask(it->first);
+ if (in->is_dir()) {
+ allowed &= ~CEPH_CAP_ANY_DIR_OPS;
+ if (allowed & CEPH_CAP_FILE_EXCL)
+ allowed |= cap->get_lock_cache_allowed();
+ }
if ((in->inode.inline_data.version != CEPH_INLINE_NONE &&
cap->is_noinline()) ||
int issued = cap->issued();
CInode *in = cap->get_inode();
dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
- cap->revoke();
+ int revoked = cap->revoke();
+ if (revoked & CEPH_CAP_ANY_DIR_OPS)
+ eval_lock_caches(cap);
if (in->is_auth() &&
in->inode.client_ranges.count(cap->get_client()))
bool Locker::should_defer_client_cap_frozen(CInode *in)
{
+ if (in->is_frozen())
+ return true;
+
/*
- * This policy needs to be AT LEAST as permissive as allowing a client request
- * to go forward, or else a client request can release something, the release
- * gets deferred, but the request gets processed and deadlocks because when the
- * caps can't get revoked.
+ * This policy needs to be AT LEAST as permissive as allowing a client
+ * request to go forward, or else a client request can release something,
+ * the release gets deferred, but the request gets processed and deadlocks
+ * because when the caps can't get revoked.
*
- * Currently, a request wait if anything locked is freezing (can't
- * auth_pin), which would avoid any deadlock with cap release. Thus @in
- * _MUST_ be in the lock/auth_pin set.
- *
- * auth_pins==0 implies no unstable lock and not auth pinnned by
- * client request, otherwise continue even it's freezing.
+ * No auth_pin implies that there is no unstable lock and @in is not auth
+ * pinnned by client request. If parent dirfrag is auth pinned by a lock
+ * cache, later request from lock cache owner may forcibly auth pin the @in.
*/
- return (in->is_freezing() && in->get_num_auth_pins() == 0) || in->is_frozen();
+ if (in->is_freezing() && in->get_num_auth_pins() == 0) {
+ CDir* dir = in->get_parent_dir();
+ if (!dir || !dir->is_auth_pinned_by_lock_cache())
+ return true;
+ }
+ return false;
}
void Locker::handle_client_caps(const cref_t<MClientCaps> &m)
caps &= cap->issued();
}
- cap->confirm_receipt(m->get_seq(), caps);
+ int revoked = cap->confirm_receipt(m->get_seq(), caps);
dout(10) << " follows " << follows
<< " retains " << ccap_string(m->get_caps())
<< " dirty " << ccap_string(dirty)
<< " on " << *in << dendl;
+ if (revoked & CEPH_CAP_ANY_DIR_OPS)
+ eval_lock_caches(cap);
// missing/skipped snapflush?
// The client MAY send a snapflush if it is issued WR/EXCL caps, but
dout(10) << " confirming not issued caps " << ccap_string(caps & ~cap->issued()) << dendl;
caps &= cap->issued();
}
- cap->confirm_receipt(seq, caps);
+ int revoked = cap->confirm_receipt(seq, caps);
+ if (revoked & CEPH_CAP_ANY_DIR_OPS)
+ eval_lock_caches(cap);
if (!in->client_need_snapflush.empty() &&
(cap->issued() & CEPH_CAP_ANY_FILE_WR) == 0) {
while (!cap->lock_caches.empty()) {
MDLockCache* lock_cache = cap->lock_caches.front();
+ lock_cache->client_cap = nullptr;
invalidate_lock_cache(lock_cache);
}