const Capability& operator=(const Capability& other) = delete;
int pending() const {
- return is_valid() ? _pending : (_pending & CEPH_CAP_PIN);
+ return _pending;
}
int issued() const {
- return is_valid() ? _issued : (_issued & CEPH_CAP_PIN);
+ return _issued;
}
-
- ceph_seq_t issue(unsigned c) {
- revalidate();
+ int revoking() const {
+ return _issued & ~_pending;
+ }
+ ceph_seq_t issue(unsigned c, bool reval=false) {
+ if (reval)
+ revalidate();
if (_pending & ~c) {
// revoking (and maybe adding) bits. note caps prior to this revocation
inc_last_seq();
return last_sent;
}
- ceph_seq_t issue_norevoke(unsigned c) {
- revalidate();
+ ceph_seq_t issue_norevoke(unsigned c, bool reval=false) {
+ if (reval)
+ revalidate();
_pending |= c;
_issued |= c;
- //check_rdcaps_list();
+ clear_new();
+
inc_last_seq();
return last_sent;
}
bool is_notable() const { return state & STATE_NOTABLE; }
bool is_stale() const;
+ bool is_valid() const;
bool is_new() const { return state & STATE_NEW; }
void mark_new() { state |= STATE_NEW; }
void clear_new() { state &= ~STATE_NEW; }
void inc_last_seq() { last_sent++; }
ceph_seq_t get_last_seq() const {
- if (!is_valid() && (_pending & ~CEPH_CAP_PIN))
- return last_sent + 1;
return last_sent;
}
ceph_seq_t get_last_issue() const { return last_issue; }
return Export(cap_id, wanted(), issued(), pending(), client_follows, get_last_seq(), mseq+1, last_issue_stamp);
}
void merge(const Export& other, bool auth_cap) {
- if (!is_stale()) {
- // issued + pending
- int newpending = other.pending | pending();
- if (other.issued & ~newpending)
- issue(other.issued | newpending);
- else
- issue(newpending);
- last_issue_stamp = other.last_issue_stamp;
- } else {
- issue(CEPH_CAP_PIN);
- }
+ // issued + pending
+ int newpending = other.pending | pending();
+ if (other.issued & ~newpending)
+ issue(other.issued | newpending);
+ else
+ issue(newpending);
+ last_issue_stamp = other.last_issue_stamp;
client_follows = other.client_follows;
mseq = other.mseq;
}
void merge(int otherwanted, int otherissued) {
- if (!is_stale()) {
- // issued + pending
- int newpending = pending();
- if (otherissued & ~newpending)
- issue(otherissued | newpending);
- else
- issue(newpending);
- } else {
- issue(CEPH_CAP_PIN);
- }
+ // issued + pending
+ int newpending = pending();
+ if (otherissued & ~newpending)
+ issue(otherissued | newpending);
+ else
+ issue(newpending);
// wanted
set_wanted(wanted() | otherwanted);
}
void revoke() {
- if (pending() & ~CEPH_CAP_PIN)
- issue(CEPH_CAP_PIN);
- confirm_receipt(last_sent, CEPH_CAP_PIN);
+ if (revoking())
+ confirm_receipt(last_sent, pending());
}
// serializers
}
}
- bool is_valid() const;
void revalidate();
void mark_notable();
issue_caps(*p);
}
+class C_Locker_RevokeStaleCap : public LockerContext {
+ CInode *in;
+ client_t client;
+public:
+ C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) :
+ LockerContext(l), in(i), client(c) {
+ in->get(CInode::PIN_PTRWAITER);
+ }
+ void finish(int r) override {
+ locker->revoke_stale_cap(in, client);
+ in->put(CInode::PIN_PTRWAITER);
+ }
+};
+
int Locker::issue_caps(CInode *in, Capability *only_cap)
{
// allowed caps are determined by the lock mode.
it = in->client_caps.begin();
for (; it != in->client_caps.end(); ++it) {
Capability *cap = it->second;
- if (cap->is_stale())
- continue;
// do not issue _new_ bits when size|mtime is projected
int allowed;
if (!(pending & ~allowed)) {
// skip if suppress or new, and not revocation
- if (cap->is_new() || cap->is_suppress()) {
- dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl;
+ if (cap->is_new() || cap->is_suppress() || cap->is_stale()) {
+ dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl;
+ continue;
+ }
+ } else {
+ ceph_assert(!cap->is_new());
+ if (cap->is_stale()) {
+ dout(20) << " revoke stale cap from client." << it->first << dendl;
+ ceph_assert(!cap->is_valid());
+ cap->issue(allowed & pending, false);
+ mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first));
continue;
}
}
// are there caps that the client _wants_ and can have, but aren't pending?
// or do we need to revoke?
- if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps
- (pending & ~allowed)) { // need to revoke ~allowed caps.
+ if ((pending & ~allowed) || // need to revoke ~allowed caps.
+ ((wanted & allowed) & ~pending) || // missing wanted+allowed caps
+ !cap->is_valid()) { // after stale->resume circle
// issue
nissued++;
int before = pending;
long seq;
if (pending & ~allowed)
- seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new.
+ seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new.
else
- seq = cap->issue((wanted|likes) & allowed);
+ seq = cap->issue((wanted|likes) & allowed, true);
int after = cap->pending();
- if (cap->is_new()) {
- // haven't send caps to client yet
- if (before & ~after)
- cap->confirm_receipt(seq, after);
- } else {
- dout(7) << " sending MClientCaps to client." << it->first
- << " seq " << cap->get_last_seq()
- << " new pending " << ccap_string(after) << " was " << ccap_string(before)
- << dendl;
-
- int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
- if (op == CEPH_CAP_OP_REVOKE) {
- revoking_caps.push_back(&cap->item_revoking_caps);
- revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
- cap->set_last_revoke_stamp(ceph_clock_now());
- cap->reset_num_revoke_warnings();
- }
-
- MClientCaps *m = new MClientCaps(op, in->ino(),
- in->find_snaprealm()->inode->ino(),
- cap->get_cap_id(), cap->get_last_seq(),
- after, wanted, 0,
- cap->get_mseq(),
- mds->get_osd_epoch_barrier());
- in->encode_cap_message(m, cap);
+ dout(7) << " sending MClientCaps to client." << it->first
+ << " seq " << seq << " new pending " << ccap_string(after)
+ << " was " << ccap_string(before) << dendl;
- mds->send_message_client_counted(m, it->first);
+ int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT;
+ if (op == CEPH_CAP_OP_REVOKE) {
+ revoking_caps.push_back(&cap->item_revoking_caps);
+ revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps);
+ cap->set_last_revoke_stamp(ceph_clock_now());
+ cap->reset_num_revoke_warnings();
}
+
+ MClientCaps *m = new MClientCaps(op, in->ino(),
+ in->find_snaprealm()->inode->ino(),
+ cap->get_cap_id(), cap->get_last_seq(),
+ after, wanted, 0,
+ cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
+ in->encode_cap_message(m, cap);
+
+ mds->send_message_client_counted(m, cap->get_session());
}
if (only_cap)
check_inode_max_size(in);
}
-void Locker::revoke_stale_caps(Session *session)
+
+void Locker::revoke_stale_cap(CInode *in, client_t client)
+{
+ dout(7) << __func__ << " client." << client << " on " << *in << dendl;
+ Capability *cap = in->get_client_cap(client);
+ if (!cap)
+ return;
+
+ if (cap->revoking() & CEPH_CAP_ANY_WR) {
+ std::stringstream ss;
+ mds->evict_client(client.v, false, g_conf->mds_session_blacklist_on_timeout, ss, nullptr);
+ return;
+ }
+
+ cap->revoke();
+
+ if (in->is_auth() && in->inode.client_ranges.count(cap->get_client()))
+ in->state_set(CInode::STATE_NEEDSRECOVER);
+
+ if (in->state_test(CInode::STATE_EXPORTINGCAPS))
+ return;
+
+ if (!in->filelock.is_stable())
+ eval_gather(&in->filelock);
+ if (!in->linklock.is_stable())
+ eval_gather(&in->linklock);
+ if (!in->authlock.is_stable())
+ eval_gather(&in->authlock);
+ if (!in->xattrlock.is_stable())
+ eval_gather(&in->xattrlock);
+
+ if (in->is_auth())
+ try_eval(in, CEPH_CAP_LOCKS);
+ else
+ request_inode_file_caps(in);
+}
+
+bool Locker::revoke_stale_caps(Session *session)
{
dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl;
+ // invalidate all caps
+ session->inc_cap_gen();
+
std::vector<CInode*> to_eval;
for (auto p = session->caps.begin(); !p.end(); ) {
break;
}
- int issued = cap->issued();
- if (!(issued & ~CEPH_CAP_PIN))
+ int revoking = cap->revoking();
+ if (!revoking)
continue;
+ if (revoking & CEPH_CAP_ANY_WR)
+ return false;
+
+ int issued = cap->issued();
CInode *in = cap->get_inode();
dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl;
cap->revoke();
to_eval.push_back(in);
}
- // invalidate the rest
- session->inc_cap_gen();
-
for (auto in : to_eval) {
if (in->state_test(CInode::STATE_EXPORTINGCAPS))
continue;
else
request_inode_file_caps(in);
}
+
+ return true;
}
void Locker::resume_stale_caps(Session *session)
remove_client_cap(in, cap);
}
-void Locker::remove_client_cap(CInode *in, Capability *cap)
+void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill)
{
client_t client = cap->get_client();
// clean out any pending snapflush state
if (in->is_auth()) {
// make sure we clear out the client byte range
if (in->get_projected_inode()->client_ranges.count(client) &&
- !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray
- check_inode_max_size(in);
+ !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray
+ if (kill)
+ in->state_set(CInode::STATE_NEEDSRECOVER);
+ else
+ check_inode_max_size(in);
+ }
} else {
request_inode_file_caps(in);
}