From dcae1ea2d30398f7b6493a74b482e964a21fcfeb Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 27 Feb 2019 20:51:38 +0800 Subject: [PATCH] mds: change how mds revoke stale caps - Only revokes conflicting caps from stale client. - If stale client holds conflicting CEPH_CAP_ANY_WR, blacklist and kill it. Fixes: https://tracker.ceph.com/issues/38326 Signed-off-by: "Yan, Zheng" --- src/mds/CInode.cc | 10 +-- src/mds/Capability.cc | 18 ++--- src/mds/Capability.h | 64 ++++++++---------- src/mds/Locker.cc | 150 +++++++++++++++++++++++++++++------------- src/mds/Locker.h | 5 +- src/mds/MDSRank.h | 4 ++ src/mds/Server.cc | 14 ++-- 7 files changed, 160 insertions(+), 105 deletions(-) diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 249f7f0b0300..12a871563c1e 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -3506,24 +3506,24 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, int likes = get_caps_liked(); int allowed = get_caps_allowed_for_client(session, cap, file_i); issue = (cap->wanted() | likes) & allowed; - cap->issue_norevoke(issue); + cap->issue_norevoke(issue, true); issue = cap->pending(); dout(10) << "encode_inodestat issuing " << ccap_string(issue) << " seq " << cap->get_last_seq() << dendl; } else if (cap && cap->is_new() && !dir_realm) { // alway issue new caps to client, otherwise the caps get lost ceph_assert(cap->is_stale()); - issue = cap->pending() | CEPH_CAP_PIN; - cap->issue_norevoke(issue); + ceph_assert(!cap->pending()); + issue = CEPH_CAP_PIN; + cap->issue_norevoke(issue, true); dout(10) << "encode_inodestat issuing " << ccap_string(issue) << " seq " << cap->get_last_seq() - << "(stale|new caps)" << dendl; + << "(stale&new caps)" << dendl; } if (issue) { cap->set_last_issue(); cap->set_last_issue_stamp(ceph_clock_now()); - cap->clear_new(); ecap.caps = issue; ecap.wanted = cap->wanted(); ecap.cap_id = cap->get_cap_id(); diff --git a/src/mds/Capability.cc b/src/mds/Capability.cc index 842afd7fed5b..df17f3e825ea 100644 --- a/src/mds/Capability.cc +++ b/src/mds/Capability.cc @@ -159,6 +159,8 @@ Capability::Capability(CInode *i, Session *s, uint64_t id) : if (session) { session->touch_cap_bottom(this); cap_gen = session->get_cap_gen(); + if (session->is_stale()) + --cap_gen; // not valid auto& conn = session->get_connection(); if (conn) { @@ -189,20 +191,8 @@ bool Capability::is_valid() const void Capability::revalidate() { - if (is_valid()) - return; - - if (_pending & ~CEPH_CAP_PIN) - inc_last_seq(); - - bool was_revoking = _issued & ~_pending; - _pending = _issued = CEPH_CAP_PIN; - _revokes.clear(); - - cap_gen = session->get_cap_gen(); - - if (was_revoking) - maybe_clear_notable(); + if (!is_valid()) + cap_gen = session->get_cap_gen(); } void Capability::mark_notable() diff --git a/src/mds/Capability.h b/src/mds/Capability.h index fba7d259ce09..cb76be60d23e 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -130,14 +130,17 @@ public: const Capability& operator=(const Capability& other) = delete; int pending() const { - return is_valid() ? _pending : (_pending & CEPH_CAP_PIN); + return _pending; } int issued() const { - return is_valid() ? _issued : (_issued & CEPH_CAP_PIN); + return _issued; } - - ceph_seq_t issue(unsigned c) { - revalidate(); + int revoking() const { + return _issued & ~_pending; + } + ceph_seq_t issue(unsigned c, bool reval=false) { + if (reval) + revalidate(); if (_pending & ~c) { // revoking (and maybe adding) bits. note caps prior to this revocation @@ -162,12 +165,14 @@ public: inc_last_seq(); return last_sent; } - ceph_seq_t issue_norevoke(unsigned c) { - revalidate(); + ceph_seq_t issue_norevoke(unsigned c, bool reval=false) { + if (reval) + revalidate(); _pending |= c; _issued |= c; - //check_rdcaps_list(); + clear_new(); + inc_last_seq(); return last_sent; } @@ -245,6 +250,7 @@ public: bool is_notable() const { return state & STATE_NOTABLE; } bool is_stale() const; + bool is_valid() const; bool is_new() const { return state & STATE_NEW; } void mark_new() { state |= STATE_NEW; } void clear_new() { state &= ~STATE_NEW; } @@ -284,8 +290,6 @@ public: void inc_last_seq() { last_sent++; } ceph_seq_t get_last_seq() const { - if (!is_valid() && (_pending & ~CEPH_CAP_PIN)) - return last_sent + 1; return last_sent; } ceph_seq_t get_last_issue() const { return last_issue; } @@ -300,17 +304,13 @@ public: return Export(cap_id, wanted(), issued(), pending(), client_follows, get_last_seq(), mseq+1, last_issue_stamp, state); } void merge(const Export& other, bool auth_cap) { - if (!is_stale()) { - // issued + pending - int newpending = other.pending | pending(); - if (other.issued & ~newpending) - issue(other.issued | newpending); - else - issue(newpending); - last_issue_stamp = other.last_issue_stamp; - } else { - issue(CEPH_CAP_PIN); - } + // issued + pending + int newpending = other.pending | pending(); + if (other.issued & ~newpending) + issue(other.issued | newpending); + else + issue(newpending); + last_issue_stamp = other.last_issue_stamp; client_follows = other.client_follows; @@ -324,25 +324,20 @@ public: mseq = other.mseq; } void merge(int otherwanted, int otherissued) { - if (!is_stale()) { - // issued + pending - int newpending = pending(); - if (otherissued & ~newpending) - issue(otherissued | newpending); - else - issue(newpending); - } else { - issue(CEPH_CAP_PIN); - } + // issued + pending + int newpending = pending(); + if (otherissued & ~newpending) + issue(otherissued | newpending); + else + issue(newpending); // wanted set_wanted(wanted() | otherwanted); } void revoke() { - if (pending() & ~CEPH_CAP_PIN) - issue(CEPH_CAP_PIN); - confirm_receipt(last_sent, CEPH_CAP_PIN); + if (revoking()) + confirm_receipt(last_sent, pending()); } // serializers @@ -395,7 +390,6 @@ private: } } - bool is_valid() const; void revalidate(); void mark_notable(); diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 2d5bfb88501c..0294e575245e 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -1966,6 +1966,20 @@ void Locker::issue_caps_set(set& inset) issue_caps(*p); } +class C_Locker_RevokeStaleCap : public LockerContext { + CInode *in; + client_t client; +public: + C_Locker_RevokeStaleCap(Locker *l, CInode *i, client_t c) : + LockerContext(l), in(i), client(c) { + in->get(CInode::PIN_PTRWAITER); + } + void finish(int r) override { + locker->revoke_stale_cap(in, client); + in->put(CInode::PIN_PTRWAITER); + } +}; + int Locker::issue_caps(CInode *in, Capability *only_cap) { // allowed caps are determined by the lock mode. @@ -1999,8 +2013,6 @@ int Locker::issue_caps(CInode *in, Capability *only_cap) it = in->client_caps.begin(); for (; it != in->client_caps.end(); ++it) { Capability *cap = &it->second; - if (cap->is_stale()) - continue; // do not issue _new_ bits when size|mtime is projected int allowed; @@ -2029,8 +2041,17 @@ int Locker::issue_caps(CInode *in, Capability *only_cap) if (!(pending & ~allowed)) { // skip if suppress or new, and not revocation - if (cap->is_new() || cap->is_suppress()) { - dout(20) << " !revoke and new|suppressed, skipping client." << it->first << dendl; + if (cap->is_new() || cap->is_suppress() || cap->is_stale()) { + dout(20) << " !revoke and new|suppressed|stale, skipping client." << it->first << dendl; + continue; + } + } else { + ceph_assert(!cap->is_new()); + if (cap->is_stale()) { + dout(20) << " revoke stale cap from client." << it->first << dendl; + ceph_assert(!cap->is_valid()); + cap->issue(allowed & pending, false); + mds->queue_waiter_front(new C_Locker_RevokeStaleCap(this, in, it->first)); continue; } } @@ -2041,8 +2062,9 @@ int Locker::issue_caps(CInode *in, Capability *only_cap) // are there caps that the client _wants_ and can have, but aren't pending? // or do we need to revoke? - if (((wanted & allowed) & ~pending) || // missing wanted+allowed caps - (pending & ~allowed)) { // need to revoke ~allowed caps. + if ((pending & ~allowed) || // need to revoke ~allowed caps. + ((wanted & allowed) & ~pending) || // missing wanted+allowed caps + !cap->is_valid()) { // after stale->resume circle // issue nissued++; @@ -2051,40 +2073,33 @@ int Locker::issue_caps(CInode *in, Capability *only_cap) int before = pending; long seq; if (pending & ~allowed) - seq = cap->issue((wanted|likes) & allowed & pending); // if revoking, don't issue anything new. + seq = cap->issue((wanted|likes) & allowed & pending, true); // if revoking, don't issue anything new. else - seq = cap->issue((wanted|likes) & allowed); + seq = cap->issue((wanted|likes) & allowed, true); int after = cap->pending(); - if (cap->is_new()) { - // haven't send caps to client yet - if (before & ~after) - cap->confirm_receipt(seq, after); - } else { - dout(7) << " sending MClientCaps to client." << it->first - << " seq " << cap->get_last_seq() - << " new pending " << ccap_string(after) << " was " << ccap_string(before) - << dendl; - - int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; - if (op == CEPH_CAP_OP_REVOKE) { - revoking_caps.push_back(&cap->item_revoking_caps); - revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); - cap->set_last_revoke_stamp(ceph_clock_now()); - cap->reset_num_revoke_warnings(); - } + dout(7) << " sending MClientCaps to client." << it->first + << " seq " << seq << " new pending " << ccap_string(after) + << " was " << ccap_string(before) << dendl; - auto m = make_message(op, in->ino(), - in->find_snaprealm()->inode->ino(), - cap->get_cap_id(), - cap->get_last_seq(), - after, wanted, 0, - cap->get_mseq(), - mds->get_osd_epoch_barrier()); - in->encode_cap_message(m, cap); - - mds->send_message_client_counted(m, cap->get_session()); + int op = (before & ~after) ? CEPH_CAP_OP_REVOKE : CEPH_CAP_OP_GRANT; + if (op == CEPH_CAP_OP_REVOKE) { + revoking_caps.push_back(&cap->item_revoking_caps); + revoking_caps_by_client[cap->get_client()].push_back(&cap->item_client_revoking_caps); + cap->set_last_revoke_stamp(ceph_clock_now()); + cap->reset_num_revoke_warnings(); } + + auto m = make_message(op, in->ino(), + in->find_snaprealm()->inode->ino(), + cap->get_cap_id(), + cap->get_last_seq(), + after, wanted, 0, + cap->get_mseq(), + mds->get_osd_epoch_barrier()); + in->encode_cap_message(m, cap); + + mds->send_message_client_counted(m, cap->get_session()); } if (only_cap) @@ -2116,10 +2131,50 @@ void Locker::issue_truncate(CInode *in) check_inode_max_size(in); } -void Locker::revoke_stale_caps(Session *session) + +void Locker::revoke_stale_cap(CInode *in, client_t client) +{ + dout(7) << __func__ << " client." << client << " on " << *in << dendl; + Capability *cap = in->get_client_cap(client); + if (!cap) + return; + + if (cap->revoking() & CEPH_CAP_ANY_WR) { + std::stringstream ss; + mds->evict_client(client.v, false, g_conf()->mds_session_blacklist_on_timeout, ss, nullptr); + return; + } + + cap->revoke(); + + if (in->is_auth() && in->inode.client_ranges.count(cap->get_client())) + in->state_set(CInode::STATE_NEEDSRECOVER); + + if (in->state_test(CInode::STATE_EXPORTINGCAPS)) + return; + + if (!in->filelock.is_stable()) + eval_gather(&in->filelock); + if (!in->linklock.is_stable()) + eval_gather(&in->linklock); + if (!in->authlock.is_stable()) + eval_gather(&in->authlock); + if (!in->xattrlock.is_stable()) + eval_gather(&in->xattrlock); + + if (in->is_auth()) + try_eval(in, CEPH_CAP_LOCKS); + else + request_inode_file_caps(in); +} + +bool Locker::revoke_stale_caps(Session *session) { dout(10) << "revoke_stale_caps for " << session->info.inst.name << dendl; + // invalidate all caps + session->inc_cap_gen(); + std::vector to_eval; for (auto p = session->caps.begin(); !p.end(); ) { @@ -2132,10 +2187,14 @@ void Locker::revoke_stale_caps(Session *session) break; } - int issued = cap->issued(); - if (!(issued & ~CEPH_CAP_PIN)) + int revoking = cap->revoking(); + if (!revoking) continue; + if (revoking & CEPH_CAP_ANY_WR) + return false; + + int issued = cap->issued(); CInode *in = cap->get_inode(); dout(10) << " revoking " << ccap_string(issued) << " on " << *in << dendl; cap->revoke(); @@ -2149,9 +2208,6 @@ void Locker::revoke_stale_caps(Session *session) to_eval.push_back(in); } - // invalidate the rest - session->inc_cap_gen(); - for (auto in : to_eval) { if (in->state_test(CInode::STATE_EXPORTINGCAPS)) continue; @@ -2170,6 +2226,8 @@ void Locker::revoke_stale_caps(Session *session) else request_inode_file_caps(in); } + + return true; } void Locker::resume_stale_caps(Session *session) @@ -3576,7 +3634,7 @@ void Locker::_do_cap_release(client_t client, inodeno_t ino, uint64_t cap_id, remove_client_cap(in, cap); } -void Locker::remove_client_cap(CInode *in, Capability *cap) +void Locker::remove_client_cap(CInode *in, Capability *cap, bool kill) { client_t client = cap->get_client(); // clean out any pending snapflush state @@ -3591,8 +3649,12 @@ void Locker::remove_client_cap(CInode *in, Capability *cap) if (in->is_auth()) { // make sure we clear out the client byte range if (in->get_projected_inode()->client_ranges.count(client) && - !(in->inode.nlink == 0 && !in->is_any_caps())) // unless it's unlink + stray - check_inode_max_size(in); + !(in->inode.nlink == 0 && !in->is_any_caps())) { // unless it's unlink + stray + if (kill) + in->state_set(CInode::STATE_NEEDSRECOVER); + else + check_inode_max_size(in); + } } else { request_inode_file_caps(in); } diff --git a/src/mds/Locker.h b/src/mds/Locker.h index dd8494816deb..f1e867a50962 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -179,7 +179,7 @@ public: void kick_cap_releases(MDRequestRef& mdr); void kick_issue_caps(CInode *in, client_t client, ceph_seq_t seq); - void remove_client_cap(CInode *in, Capability *cap); + void remove_client_cap(CInode *in, Capability *cap, bool kill=false); std::vector get_late_revoking_clients(double timeout) const; @@ -244,7 +244,8 @@ public: int issue_caps(CInode *in, Capability *only_cap=0); void issue_caps_set(std::set& inset); void issue_truncate(CInode *in); - void revoke_stale_caps(Session *session); + void revoke_stale_cap(CInode *in, client_t client); + bool revoke_stale_caps(Session *session); void resume_stale_caps(Session *session); void remove_stale_leases(Session *session); diff --git a/src/mds/MDSRank.h b/src/mds/MDSRank.h index f052d74ad632..0575c3a37797 100644 --- a/src/mds/MDSRank.h +++ b/src/mds/MDSRank.h @@ -316,6 +316,10 @@ class MDSRank { finished_queue.push_back(c); progress_thread.signal(); } + void queue_waiter_front(MDSContext *c) { + finished_queue.push_front(c); + progress_thread.signal(); + } void queue_waiters(MDSContext::vec& ls) { MDSContext::vec v; v.swap(ls); diff --git a/src/mds/Server.cc b/src/mds/Server.cc index b23dbd89bfdb..476843da0e64 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -732,7 +732,7 @@ void Server::_session_logged(Session *session, uint64_t state_seq, bool open, ve Capability *cap = session->caps.front(); CInode *in = cap->get_inode(); dout(20) << " killing capability " << ccap_string(cap->issued()) << " on " << *in << dendl; - mds->locker->remove_client_cap(in, cap); + mds->locker->remove_client_cap(in, cap, true); } while (!session->leases.empty()) { ClientLease *r = session->leases.front(); @@ -998,10 +998,14 @@ void Server::find_idle_sessions() for (auto session : new_stale) { mds->sessionmap.set_state(session, Session::STATE_STALE); - mds->locker->revoke_stale_caps(session); - mds->locker->remove_stale_leases(session); - mds->send_message_client(make_message(CEPH_SESSION_STALE, session->get_push_seq()), session); - finish_flush_session(session, session->get_push_seq()); + if (mds->locker->revoke_stale_caps(session)) { + mds->locker->remove_stale_leases(session); + finish_flush_session(session, session->get_push_seq()); + auto m = make_message(CEPH_SESSION_STALE, session->get_push_seq()); + mds->send_message_client(m, session); + } else { + to_evict.push_back(session); + } } } -- 2.47.3