OSDCap.cc
Watch.cc
ClassHandler.cc
+ Session.cc
SnapMapper.cc
ScrubStore.cc
osd_types.cc
return false;
session->wstate.reset(con);
session->con.reset(NULL); // break con <-> session ref cycle
+ // note that we break session->con *before* the session_handle_reset
+ // cleanup below. this avoids a race between us and
+ // PG::add_backoff, split_backoff, etc.
session_handle_reset(session);
session->put();
return true;
clear_session_waiting_on_pg(session, i->first);
}
+ session->clear_backoffs();
+
/* Messages have connection refs, we need to clear the
* connection->session->message->connection
* cycles which result.
#include "messages/MOSDECSubOpReadReply.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MOSDBackoff.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDRepOp.h"
pg_stats_publish_valid(false),
osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
finish_sync_event(NULL),
+ backoff_lock("PG::backoff_lock"),
scrub_after_recovery(false),
active_pushes(0),
recovery_state(this),
dirty_big_info = true;
}
+void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
+{
+ ConnectionRef con = s->con;
+ if (!con) // OSD::ms_handle_reset clears s->con without a lock
+ return;
+ Backoff *b = s->have_backoff(begin);
+ if (b) {
+ derr << __func__ << " already have backoff for " << s << " begin " << begin
+ << " " << *b << dendl;
+ ceph_abort();
+ }
+ Mutex::Locker l(backoff_lock);
+ {
+ Mutex::Locker l(s->backoff_lock);
+ b = new Backoff(this, s, ++s->backoff_seq, begin, end);
+ auto& ls = s->backoffs[begin];
+ if (ls.empty()) {
+ ++s->backoff_count;
+ }
+ assert(s->backoff_count == (int)s->backoffs.size());
+ ls.insert(b);
+ backoffs[begin].insert(b);
+ dout(10) << __func__ << " session " << s << " added " << *b << dendl;
+ }
+ con->send_message(
+ new MOSDBackoff(
+ CEPH_OSD_BACKOFF_OP_BLOCK,
+ b->id,
+ begin,
+ end,
+ get_osdmap()->get_epoch()));
+}
+
+void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
+{
+ dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
+ vector<BackoffRef> bv;
+ {
+ Mutex::Locker l(backoff_lock);
+ auto p = backoffs.lower_bound(begin);
+ while (p != backoffs.end()) {
+ int r = cmp_bitwise(p->first, end);
+ dout(20) << __func__ << " ? " << r << " " << p->first
+ << " " << p->second << dendl;
+ // note: must still examine begin=end=p->first case
+ if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) {
+ break;
+ }
+ dout(20) << __func__ << " checking " << p->first
+ << " " << p->second << dendl;
+ auto q = p->second.begin();
+ while (q != p->second.end()) {
+ dout(20) << __func__ << " checking " << *q << dendl;
+ int r = cmp_bitwise((*q)->begin, begin);
+ if (r == 0 || (r > 0 &&
+ cmp_bitwise((*q)->end, end) < 0)) {
+ bv.push_back(*q);
+ q = p->second.erase(q);
+ } else {
+ ++q;
+ }
+ }
+ if (p->second.empty()) {
+ p = backoffs.erase(p);
+ } else {
+ ++p;
+ }
+ }
+ }
+ for (auto b : bv) {
+ Mutex::Locker l(b->lock);
+ dout(10) << __func__ << " " << *b << dendl;
+ if (b->session) {
+ assert(b->pg == this);
+ ConnectionRef con = b->session->con;
+ if (con) { // OSD::ms_handle_reset clears s->con without a lock
+ con->send_message(
+ new MOSDBackoff(
+ CEPH_OSD_BACKOFF_OP_UNBLOCK,
+ b->id,
+ b->begin,
+ b->end,
+ get_osdmap()->get_epoch()));
+ }
+ if (b->is_new()) {
+ b->state = Backoff::STATE_DELETING;
+ } else {
+ b->session->rm_backoff(b);
+ b->session.reset();
+ }
+ b->pg.reset();
+ }
+ }
+}
+
+void PG::clear_backoffs()
+{
+ dout(10) << __func__ << " " << dendl;
+ map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> ls;
+ {
+ Mutex::Locker l(backoff_lock);
+ ls.swap(backoffs);
+ }
+ for (auto& p : ls) {
+ for (auto& b : p.second) {
+ Mutex::Locker l(b->lock);
+ dout(10) << __func__ << " " << *b << dendl;
+ if (b->session) {
+ assert(b->pg == this);
+ if (b->is_new()) {
+ b->state = Backoff::STATE_DELETING;
+ } else {
+ b->session->rm_backoff(b);
+ b->session.reset();
+ }
+ b->pg.reset();
+ }
+ }
+ }
+}
+
+// called by Session::clear_backoffs()
+void PG::rm_backoff(BackoffRef b)
+{
+ dout(10) << __func__ << " " << *b << dendl;
+ Mutex::Locker l(backoff_lock);
+ assert(b->lock.is_locked_by_me());
+ assert(b->pg == this);
+ auto p = backoffs.find(b->begin);
+ // may race with release_backoffs()
+ if (p != backoffs.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
+ if (p->second.empty()) {
+ backoffs.erase(p);
+ }
+ }
+ }
+}
+
void PG::clear_recovery_state()
{
dout(10) << "clear_recovery_state" << dendl;
#include "osd_types.h"
#include "include/xlist.h"
#include "SnapMapper.h"
+#include "Session.h"
#include "PGLog.h"
#include "OSDMap.h"
friend class C_OSD_RepModify_Commit;
+ // -- backoff --
+ Mutex backoff_lock; // orders inside Backoff::lock
+ map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> backoffs;
+
+ void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end);
+ void release_backoffs(const hobject_t& begin, const hobject_t& end);
+ void release_backoffs(const hobject_t& o) {
+ release_backoffs(o, o);
+ }
+ void clear_backoffs();
+
+ void add_pg_backoff(SessionRef s) {
+ hobject_t begin = info.pgid.pgid.get_hobj_start();
+ hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+ add_backoff(s, begin, end);
+ }
+ void release_pg_backoffs() {
+ hobject_t begin = info.pgid.pgid.get_hobj_start();
+ hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+ release_backoffs(begin, end);
+ }
+
+ void rm_backoff(BackoffRef b);
// -- scrub --
struct Scrubber {
bool is_activating() const { return state_test(PG_STATE_ACTIVATING); }
bool is_peering() const { return state_test(PG_STATE_PEERING); }
bool is_down() const { return state_test(PG_STATE_DOWN); }
+ bool is_incomplete() const { return state_test(PG_STATE_INCOMPLETE); }
bool is_clean() const { return state_test(PG_STATE_CLEAN); }
bool is_degraded() const { return state_test(PG_STATE_DEGRADED); }
bool is_undersized() const { return state_test(PG_STATE_UNDERSIZED); }
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PG.h"
+#include "Session.h"
+
+#include "common/debug.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+
+void Session::clear_backoffs()
+{
+ map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> ls;
+ {
+ Mutex::Locker l(backoff_lock);
+ ls.swap(backoffs);
+ backoff_count = 0;
+ }
+ for (auto& p : ls) {
+ for (auto& b : p.second) {
+ Mutex::Locker l(b->lock);
+ if (b->pg) {
+ assert(b->session == this);
+ assert(b->is_new() || b->is_acked());
+ b->pg->rm_backoff(b);
+ b->pg.reset();
+ b->session.reset();
+ } else if (b->session) {
+ assert(b->session == this);
+ assert(b->is_deleting());
+ b->session.reset();
+ }
+ }
+ }
+}
+
+void Session::ack_backoff(
+ CephContext *cct,
+ uint64_t id,
+ const hobject_t& begin,
+ const hobject_t& end)
+{
+ Mutex::Locker l(backoff_lock);
+ // NOTE that ack may be for [a,c] but osd may now have [a,b) and
+ // [b,c) due to a PG split.
+ auto p = backoffs.lower_bound(begin);
+ while (p != backoffs.end()) {
+ // note: must still examine begin=end=p->first case
+ int r = cmp_bitwise(p->first, end);
+ if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) {
+ break;
+ }
+ auto q = p->second.begin();
+ while (q != p->second.end()) {
+ Backoff *b = (*q).get();
+ if (b->id == id) {
+ if (b->is_new()) {
+ b->state = Backoff::STATE_ACKED;
+ dout(20) << __func__ << " now " << *b << dendl;
+ } else if (b->is_deleting()) {
+ dout(20) << __func__ << " deleting " << *b << dendl;
+ q = p->second.erase(q);
+ continue;
+ }
+ }
+ ++q;
+ }
+ if (p->second.empty()) {
+ dout(20) << __func__ << " clearing bin " << p->first << dendl;
+ p = backoffs.erase(p);
+ --backoff_count;
+ } else {
+ ++p;
+ }
+ }
+ assert(backoff_count == (int)backoffs.size());
+}
#define CEPH_OSD_SESSION_H
#include "common/RefCountedObj.h"
+#include "common/Mutex.h"
+#include "include/Spinlock.h"
+#include "OSDCap.h"
+#include "Watch.h"
+#include "OSDMap.h"
+
+struct Session;
+typedef boost::intrusive_ptr<Session> SessionRef;
+struct Backoff;
+typedef boost::intrusive_ptr<Backoff> BackoffRef;
+class PG;
+typedef boost::intrusive_ptr<PG> PGRef;
+
+/*
+ * A Backoff represents one instance of either a PG or an OID
+ * being plugged at the client. It's refcounted and linked from
+ * the PG {pg_oid}_backoffs map and from the client Session
+ * object.
+ *
+ * The Backoff has a lock that protects it's internal fields.
+ *
+ * The PG has a backoff_lock that protects it's maps to Backoffs.
+ * This lock is *inside* of Backoff::lock.
+ *
+ * The Session has a backoff_lock that protects it's map of pg and
+ * oid backoffs. This lock is *inside* the Backoff::lock *and*
+ * PG::backoff_lock.
+ *
+ * That's
+ *
+ * Backoff::lock
+ * PG::backoff_lock
+ * Session::backoff_lock
+ *
+ * When the Session goes away, we move our backoff lists aside,
+ * then we lock each of the Backoffs we
+ * previously referenced and clear the Session* pointer. If the PG
+ * is still linked, we unlink it, too.
+ *
+ * When the PG clears the backoff, it will send an unblock message
+ * if the Session* is still non-null, and unlink the session.
+ *
+ */
+
+struct Backoff : public RefCountedObject {
+ enum {
+ STATE_NEW = 1, ///< backoff in flight to client
+ STATE_ACKED = 2, ///< backoff acked
+ STATE_DELETING = 3 ///< backoff deleted, but un-acked
+ };
+ std::atomic_int state = {STATE_NEW};
+ uint64_t id = 0; ///< unique id (within the Session)
+
+ bool is_new() const {
+ return state.load() == STATE_NEW;
+ }
+ bool is_acked() const {
+ return state.load() == STATE_ACKED;
+ }
+ bool is_deleting() const {
+ return state.load() == STATE_DELETING;
+ }
+ const char *get_state_name() const {
+ switch (state.load()) {
+ case STATE_NEW: return "new";
+ case STATE_ACKED: return "acked";
+ case STATE_DELETING: return "deleting";
+ default: return "???";
+ }
+ }
+
+ Mutex lock;
+ // NOTE: the owning PG and session are either
+ // - *both* set, or
+ // - both null (teardown), or
+ // - only session is set (and state == DELETING)
+ PGRef pg; ///< owning pg
+ SessionRef session; ///< owning session
+ hobject_t begin, end; ///< [) range to block, unless ==, then single obj
+
+ Backoff(PGRef pg, SessionRef s,
+ uint64_t i,
+ const hobject_t& b, const hobject_t& e)
+ : RefCountedObject(g_ceph_context, 0),
+ id(i),
+ lock("Backoff::lock"),
+ pg(pg),
+ session(s),
+ begin(b),
+ end(e) {}
+
+ friend ostream& operator<<(ostream& out, const Backoff& b) {
+ return out << "Backoff(" << &b << " " << b.id
+ << " " << b.get_state_name()
+ << " [" << b.begin << "," << b.end << ") "
+ << " session " << b.session
+ << " pg " << b.pg << ")";
+ }
+};
+
+
struct Session : public RefCountedObject {
EntityName entity_name;
Spinlock received_map_lock;
epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
+ /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
+ Mutex backoff_lock;
+ std::atomic_int backoff_count= {0}; ///< simple count of backoffs
+ map<hobject_t,set<BackoffRef>, hobject_t::BitwiseComparator> backoffs;
+
+ std::atomic<uint64_t> backoff_seq = {0};
+
explicit Session(CephContext *cct) :
RefCountedObject(cct),
auid(-1), con(0),
wstate(cct),
session_dispatch_lock("Session::session_dispatch_lock"),
- last_sent_epoch(0), received_map_epoch(0)
+ last_sent_epoch(0), received_map_epoch(0),
+ backoff_lock("Session::backoff_lock")
{}
void maybe_reset_osdmap() {
if (waiting_for_pg.empty()) {
osdmap.reset();
}
}
-};
-typedef boost::intrusive_ptr<Session> SessionRef;
+ void ack_backoff(
+ CephContext *cct,
+ uint64_t id,
+ const hobject_t& start,
+ const hobject_t& end);
+
+ Backoff *have_backoff(const hobject_t& oid) {
+ if (backoff_count.load()) {
+ Mutex::Locker l(backoff_lock);
+ assert(backoff_count == (int)backoffs.size());
+ auto p = backoffs.lower_bound(oid);
+ if (p != backoffs.begin() &&
+ cmp_bitwise(p->first, oid) > 0) {
+ --p;
+ }
+ if (p != backoffs.end()) {
+ int r = cmp_bitwise(oid, p->first);
+ if (r == 0 || r > 0) {
+ for (auto& q : p->second) {
+ if (r == 0 || cmp_bitwise(oid, q->end) < 0) {
+ return &(*q);
+ }
+ }
+ }
+ }
+ }
+ return nullptr;
+ }
+
+ // called by PG::release_*_backoffs and PG::clear_backoffs()
+ void rm_backoff(BackoffRef b) {
+ Mutex::Locker l(backoff_lock);
+ assert(b->lock.is_locked_by_me());
+ assert(b->session == this);
+ auto p = backoffs.find(b->begin);
+ // may race with clear_backoffs()
+ if (p != backoffs.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
+ if (p->second.empty()) {
+ backoffs.erase(p);
+ --backoff_count;
+ }
+ }
+ }
+ assert(backoff_count == (int)backoffs.size());
+ }
+ void clear_backoffs();
+};
#endif