From: Sage Weil Date: Wed, 1 Feb 2017 22:27:20 +0000 (-0500) Subject: osd: add pg helpers for managing backoffs X-Git-Tag: v12.0.1~441^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cefb43dd143e6dc7307b73baf16723e71c0dbee9;p=ceph.git osd: add pg helpers for managing backoffs Link Backoff into Session, PG. Tear them down on session reset. Signed-off-by: Sage Weil --- diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 681d9fd36edd..ecc944b182b7 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -16,6 +16,7 @@ set(osd_srcs OSDCap.cc Watch.cc ClassHandler.cc + Session.cc SnapMapper.cc ScrubStore.cc osd_types.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 5b57864467d1..d1191944b72f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4945,6 +4945,9 @@ bool OSD::ms_handle_reset(Connection *con) return false; session->wstate.reset(con); session->con.reset(NULL); // break con <-> session ref cycle + // note that we break session->con *before* the session_handle_reset + // cleanup below. this avoids a race between us and + // PG::add_backoff, split_backoff, etc. session_handle_reset(session); session->put(); return true; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 858fe753fa55..e9f4bbcbe9c5 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1532,6 +1532,8 @@ private: clear_session_waiting_on_pg(session, i->first); } + session->clear_backoffs(); + /* Messages have connection refs, we need to clear the * connection->session->message->connection * cycles which result. diff --git a/src/osd/PG.cc b/src/osd/PG.cc index c5ad57b9a063..aaccffb521cb 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -47,6 +47,7 @@ #include "messages/MOSDECSubOpReadReply.h" #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MOSDBackoff.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDRepOp.h" @@ -250,6 +251,7 @@ PG::PG(OSDService *o, OSDMapRef curmap, pg_stats_publish_valid(false), osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))), finish_sync_event(NULL), + backoff_lock("PG::backoff_lock"), scrub_after_recovery(false), active_pushes(0), recovery_state(this), @@ -2314,6 +2316,147 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) dirty_big_info = true; } +void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end) +{ + ConnectionRef con = s->con; + if (!con) // OSD::ms_handle_reset clears s->con without a lock + return; + Backoff *b = s->have_backoff(begin); + if (b) { + derr << __func__ << " already have backoff for " << s << " begin " << begin + << " " << *b << dendl; + ceph_abort(); + } + Mutex::Locker l(backoff_lock); + { + Mutex::Locker l(s->backoff_lock); + b = new Backoff(this, s, ++s->backoff_seq, begin, end); + auto& ls = s->backoffs[begin]; + if (ls.empty()) { + ++s->backoff_count; + } + assert(s->backoff_count == (int)s->backoffs.size()); + ls.insert(b); + backoffs[begin].insert(b); + dout(10) << __func__ << " session " << s << " added " << *b << dendl; + } + con->send_message( + new MOSDBackoff( + CEPH_OSD_BACKOFF_OP_BLOCK, + b->id, + begin, + end, + get_osdmap()->get_epoch())); +} + +void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) +{ + dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl; + vector bv; + { + Mutex::Locker l(backoff_lock); + auto p = backoffs.lower_bound(begin); + while (p != backoffs.end()) { + int r = cmp_bitwise(p->first, end); + dout(20) << __func__ << " ? " << r << " " << p->first + << " " << p->second << dendl; + // note: must still examine begin=end=p->first case + if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) { + break; + } + dout(20) << __func__ << " checking " << p->first + << " " << p->second << dendl; + auto q = p->second.begin(); + while (q != p->second.end()) { + dout(20) << __func__ << " checking " << *q << dendl; + int r = cmp_bitwise((*q)->begin, begin); + if (r == 0 || (r > 0 && + cmp_bitwise((*q)->end, end) < 0)) { + bv.push_back(*q); + q = p->second.erase(q); + } else { + ++q; + } + } + if (p->second.empty()) { + p = backoffs.erase(p); + } else { + ++p; + } + } + } + for (auto b : bv) { + Mutex::Locker l(b->lock); + dout(10) << __func__ << " " << *b << dendl; + if (b->session) { + assert(b->pg == this); + ConnectionRef con = b->session->con; + if (con) { // OSD::ms_handle_reset clears s->con without a lock + con->send_message( + new MOSDBackoff( + CEPH_OSD_BACKOFF_OP_UNBLOCK, + b->id, + b->begin, + b->end, + get_osdmap()->get_epoch())); + } + if (b->is_new()) { + b->state = Backoff::STATE_DELETING; + } else { + b->session->rm_backoff(b); + b->session.reset(); + } + b->pg.reset(); + } + } +} + +void PG::clear_backoffs() +{ + dout(10) << __func__ << " " << dendl; + map,hobject_t::BitwiseComparator> ls; + { + Mutex::Locker l(backoff_lock); + ls.swap(backoffs); + } + for (auto& p : ls) { + for (auto& b : p.second) { + Mutex::Locker l(b->lock); + dout(10) << __func__ << " " << *b << dendl; + if (b->session) { + assert(b->pg == this); + if (b->is_new()) { + b->state = Backoff::STATE_DELETING; + } else { + b->session->rm_backoff(b); + b->session.reset(); + } + b->pg.reset(); + } + } + } +} + +// called by Session::clear_backoffs() +void PG::rm_backoff(BackoffRef b) +{ + dout(10) << __func__ << " " << *b << dendl; + Mutex::Locker l(backoff_lock); + assert(b->lock.is_locked_by_me()); + assert(b->pg == this); + auto p = backoffs.find(b->begin); + // may race with release_backoffs() + if (p != backoffs.end()) { + auto q = p->second.find(b); + if (q != p->second.end()) { + p->second.erase(q); + if (p->second.empty()) { + backoffs.erase(p); + } + } + } +} + void PG::clear_recovery_state() { dout(10) << "clear_recovery_state" << dendl; diff --git a/src/osd/PG.h b/src/osd/PG.h index 260aa3cf2a7e..216195ff8af4 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -34,6 +34,7 @@ #include "osd_types.h" #include "include/xlist.h" #include "SnapMapper.h" +#include "Session.h" #include "PGLog.h" #include "OSDMap.h" @@ -1083,6 +1084,29 @@ public: friend class C_OSD_RepModify_Commit; + // -- backoff -- + Mutex backoff_lock; // orders inside Backoff::lock + map,hobject_t::BitwiseComparator> backoffs; + + void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end); + void release_backoffs(const hobject_t& begin, const hobject_t& end); + void release_backoffs(const hobject_t& o) { + release_backoffs(o, o); + } + void clear_backoffs(); + + void add_pg_backoff(SessionRef s) { + hobject_t begin = info.pgid.pgid.get_hobj_start(); + hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); + add_backoff(s, begin, end); + } + void release_pg_backoffs() { + hobject_t begin = info.pgid.pgid.get_hobj_start(); + hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); + release_backoffs(begin, end); + } + + void rm_backoff(BackoffRef b); // -- scrub -- struct Scrubber { @@ -2167,6 +2191,7 @@ public: bool is_activating() const { return state_test(PG_STATE_ACTIVATING); } bool is_peering() const { return state_test(PG_STATE_PEERING); } bool is_down() const { return state_test(PG_STATE_DOWN); } + bool is_incomplete() const { return state_test(PG_STATE_INCOMPLETE); } bool is_clean() const { return state_test(PG_STATE_CLEAN); } bool is_degraded() const { return state_test(PG_STATE_DEGRADED); } bool is_undersized() const { return state_test(PG_STATE_UNDERSIZED); } diff --git a/src/osd/Session.cc b/src/osd/Session.cc new file mode 100644 index 000000000000..8f93ea95ae96 --- /dev/null +++ b/src/osd/Session.cc @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "PG.h" +#include "Session.h" + +#include "common/debug.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_osd + +void Session::clear_backoffs() +{ + map,hobject_t::BitwiseComparator> ls; + { + Mutex::Locker l(backoff_lock); + ls.swap(backoffs); + backoff_count = 0; + } + for (auto& p : ls) { + for (auto& b : p.second) { + Mutex::Locker l(b->lock); + if (b->pg) { + assert(b->session == this); + assert(b->is_new() || b->is_acked()); + b->pg->rm_backoff(b); + b->pg.reset(); + b->session.reset(); + } else if (b->session) { + assert(b->session == this); + assert(b->is_deleting()); + b->session.reset(); + } + } + } +} + +void Session::ack_backoff( + CephContext *cct, + uint64_t id, + const hobject_t& begin, + const hobject_t& end) +{ + Mutex::Locker l(backoff_lock); + // NOTE that ack may be for [a,c] but osd may now have [a,b) and + // [b,c) due to a PG split. + auto p = backoffs.lower_bound(begin); + while (p != backoffs.end()) { + // note: must still examine begin=end=p->first case + int r = cmp_bitwise(p->first, end); + if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) { + break; + } + auto q = p->second.begin(); + while (q != p->second.end()) { + Backoff *b = (*q).get(); + if (b->id == id) { + if (b->is_new()) { + b->state = Backoff::STATE_ACKED; + dout(20) << __func__ << " now " << *b << dendl; + } else if (b->is_deleting()) { + dout(20) << __func__ << " deleting " << *b << dendl; + q = p->second.erase(q); + continue; + } + } + ++q; + } + if (p->second.empty()) { + dout(20) << __func__ << " clearing bin " << p->first << dendl; + p = backoffs.erase(p); + --backoff_count; + } else { + ++p; + } + } + assert(backoff_count == (int)backoffs.size()); +} diff --git a/src/osd/Session.h b/src/osd/Session.h index aa862c299a62..63a73c30073e 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -16,6 +16,107 @@ #define CEPH_OSD_SESSION_H #include "common/RefCountedObj.h" +#include "common/Mutex.h" +#include "include/Spinlock.h" +#include "OSDCap.h" +#include "Watch.h" +#include "OSDMap.h" + +struct Session; +typedef boost::intrusive_ptr SessionRef; +struct Backoff; +typedef boost::intrusive_ptr BackoffRef; +class PG; +typedef boost::intrusive_ptr PGRef; + +/* + * A Backoff represents one instance of either a PG or an OID + * being plugged at the client. It's refcounted and linked from + * the PG {pg_oid}_backoffs map and from the client Session + * object. + * + * The Backoff has a lock that protects it's internal fields. + * + * The PG has a backoff_lock that protects it's maps to Backoffs. + * This lock is *inside* of Backoff::lock. + * + * The Session has a backoff_lock that protects it's map of pg and + * oid backoffs. This lock is *inside* the Backoff::lock *and* + * PG::backoff_lock. + * + * That's + * + * Backoff::lock + * PG::backoff_lock + * Session::backoff_lock + * + * When the Session goes away, we move our backoff lists aside, + * then we lock each of the Backoffs we + * previously referenced and clear the Session* pointer. If the PG + * is still linked, we unlink it, too. + * + * When the PG clears the backoff, it will send an unblock message + * if the Session* is still non-null, and unlink the session. + * + */ + +struct Backoff : public RefCountedObject { + enum { + STATE_NEW = 1, ///< backoff in flight to client + STATE_ACKED = 2, ///< backoff acked + STATE_DELETING = 3 ///< backoff deleted, but un-acked + }; + std::atomic_int state = {STATE_NEW}; + uint64_t id = 0; ///< unique id (within the Session) + + bool is_new() const { + return state.load() == STATE_NEW; + } + bool is_acked() const { + return state.load() == STATE_ACKED; + } + bool is_deleting() const { + return state.load() == STATE_DELETING; + } + const char *get_state_name() const { + switch (state.load()) { + case STATE_NEW: return "new"; + case STATE_ACKED: return "acked"; + case STATE_DELETING: return "deleting"; + default: return "???"; + } + } + + Mutex lock; + // NOTE: the owning PG and session are either + // - *both* set, or + // - both null (teardown), or + // - only session is set (and state == DELETING) + PGRef pg; ///< owning pg + SessionRef session; ///< owning session + hobject_t begin, end; ///< [) range to block, unless ==, then single obj + + Backoff(PGRef pg, SessionRef s, + uint64_t i, + const hobject_t& b, const hobject_t& e) + : RefCountedObject(g_ceph_context, 0), + id(i), + lock("Backoff::lock"), + pg(pg), + session(s), + begin(b), + end(e) {} + + friend ostream& operator<<(ostream& out, const Backoff& b) { + return out << "Backoff(" << &b << " " << b.id + << " " << b.get_state_name() + << " [" << b.begin << "," << b.end << ") " + << " session " << b.session + << " pg " << b.pg << ")"; + } +}; + + struct Session : public RefCountedObject { EntityName entity_name; @@ -35,20 +136,76 @@ struct Session : public RefCountedObject { Spinlock received_map_lock; epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here + /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock + Mutex backoff_lock; + std::atomic_int backoff_count= {0}; ///< simple count of backoffs + map, hobject_t::BitwiseComparator> backoffs; + + std::atomic backoff_seq = {0}; + explicit Session(CephContext *cct) : RefCountedObject(cct), auid(-1), con(0), wstate(cct), session_dispatch_lock("Session::session_dispatch_lock"), - last_sent_epoch(0), received_map_epoch(0) + last_sent_epoch(0), received_map_epoch(0), + backoff_lock("Session::backoff_lock") {} void maybe_reset_osdmap() { if (waiting_for_pg.empty()) { osdmap.reset(); } } -}; -typedef boost::intrusive_ptr SessionRef; + void ack_backoff( + CephContext *cct, + uint64_t id, + const hobject_t& start, + const hobject_t& end); + + Backoff *have_backoff(const hobject_t& oid) { + if (backoff_count.load()) { + Mutex::Locker l(backoff_lock); + assert(backoff_count == (int)backoffs.size()); + auto p = backoffs.lower_bound(oid); + if (p != backoffs.begin() && + cmp_bitwise(p->first, oid) > 0) { + --p; + } + if (p != backoffs.end()) { + int r = cmp_bitwise(oid, p->first); + if (r == 0 || r > 0) { + for (auto& q : p->second) { + if (r == 0 || cmp_bitwise(oid, q->end) < 0) { + return &(*q); + } + } + } + } + } + return nullptr; + } + + // called by PG::release_*_backoffs and PG::clear_backoffs() + void rm_backoff(BackoffRef b) { + Mutex::Locker l(backoff_lock); + assert(b->lock.is_locked_by_me()); + assert(b->session == this); + auto p = backoffs.find(b->begin); + // may race with clear_backoffs() + if (p != backoffs.end()) { + auto q = p->second.find(b); + if (q != p->second.end()) { + p->second.erase(q); + if (p->second.empty()) { + backoffs.erase(p); + --backoff_count; + } + } + } + assert(backoff_count == (int)backoffs.size()); + } + void clear_backoffs(); +}; #endif