]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add pg helpers for managing backoffs
authorSage Weil <sage@redhat.com>
Wed, 1 Feb 2017 22:27:20 +0000 (17:27 -0500)
committerSage Weil <sage@redhat.com>
Fri, 10 Feb 2017 22:59:50 +0000 (17:59 -0500)
Link Backoff into Session, PG.  Tear them down on session
reset.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/CMakeLists.txt
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/Session.cc [new file with mode: 0644]
src/osd/Session.h

index 681d9fd36edd726d4b750c69b96a9a8092e264ec..ecc944b182b7d24a9b93e626c57b5166b40eb494 100644 (file)
@@ -16,6 +16,7 @@ set(osd_srcs
   OSDCap.cc
   Watch.cc
   ClassHandler.cc
+  Session.cc
   SnapMapper.cc
   ScrubStore.cc
   osd_types.cc
index 5b57864467d15f572902e7e8810b188c50893e49..d1191944b72f1e826e60762a584eee2e75360a64 100644 (file)
@@ -4945,6 +4945,9 @@ bool OSD::ms_handle_reset(Connection *con)
     return false;
   session->wstate.reset(con);
   session->con.reset(NULL);  // break con <-> session ref cycle
+  // note that we break session->con *before* the session_handle_reset
+  // cleanup below.  this avoids a race between us and
+  // PG::add_backoff, split_backoff, etc.
   session_handle_reset(session);
   session->put();
   return true;
index 858fe753fa559ac3f3144f97cdc639303e2048f4..e9f4bbcbe9c50f80fb3e93ee9defb42bf49e1a47 100644 (file)
@@ -1532,6 +1532,8 @@ private:
       clear_session_waiting_on_pg(session, i->first);
     }    
 
+    session->clear_backoffs();
+
     /* Messages have connection refs, we need to clear the
      * connection->session->message->connection
      * cycles which result.
index c5ad57b9a0634df71758e2e037ed78edddc9dab5..aaccffb521cb4be3947b07e028fe7ad3d13ff92c 100644 (file)
@@ -47,6 +47,7 @@
 #include "messages/MOSDECSubOpReadReply.h"
 #include "messages/MOSDPGUpdateLogMissing.h"
 #include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MOSDBackoff.h"
 
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDRepOp.h"
@@ -250,6 +251,7 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   pg_stats_publish_valid(false),
   osr(osd->osr_registry.lookup_or_create(p, (stringify(p)))),
   finish_sync_event(NULL),
+  backoff_lock("PG::backoff_lock"),
   scrub_after_recovery(false),
   active_pushes(0),
   recovery_state(this),
@@ -2314,6 +2316,147 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
   dirty_big_info = true;
 }
 
+void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
+{
+  ConnectionRef con = s->con;
+  if (!con)   // OSD::ms_handle_reset clears s->con without a lock
+    return;
+  Backoff *b = s->have_backoff(begin);
+  if (b) {
+    derr << __func__ << " already have backoff for " << s << " begin " << begin
+        << " " << *b << dendl;
+    ceph_abort();
+  }
+  Mutex::Locker l(backoff_lock);
+  {
+    Mutex::Locker l(s->backoff_lock);
+    b = new Backoff(this, s, ++s->backoff_seq, begin, end);
+    auto& ls = s->backoffs[begin];
+    if (ls.empty()) {
+      ++s->backoff_count;
+    }
+    assert(s->backoff_count == (int)s->backoffs.size());
+    ls.insert(b);
+    backoffs[begin].insert(b);
+    dout(10) << __func__ << " session " << s << " added " << *b << dendl;
+  }
+  con->send_message(
+    new MOSDBackoff(
+      CEPH_OSD_BACKOFF_OP_BLOCK,
+      b->id,
+      begin,
+      end,
+      get_osdmap()->get_epoch()));
+}
+
+void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
+{
+  dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
+  vector<BackoffRef> bv;
+  {
+    Mutex::Locker l(backoff_lock);
+    auto p = backoffs.lower_bound(begin);
+    while (p != backoffs.end()) {
+      int r = cmp_bitwise(p->first, end);
+      dout(20) << __func__ << " ? " << r << " " << p->first
+              << " " << p->second << dendl;
+      // note: must still examine begin=end=p->first case
+      if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) {
+       break;
+      }
+      dout(20) << __func__ << " checking " << p->first
+              << " " << p->second << dendl;
+      auto q = p->second.begin();
+      while (q != p->second.end()) {
+       dout(20) << __func__ << " checking  " << *q << dendl;
+       int r = cmp_bitwise((*q)->begin, begin);
+       if (r == 0 || (r > 0 &&
+                      cmp_bitwise((*q)->end, end) < 0)) {
+         bv.push_back(*q);
+         q = p->second.erase(q);
+       } else {
+         ++q;
+       }
+      }
+      if (p->second.empty()) {
+       p = backoffs.erase(p);
+      } else {
+       ++p;
+      }
+    }
+  }
+  for (auto b : bv) {
+    Mutex::Locker l(b->lock);
+    dout(10) << __func__ << " " << *b << dendl;
+    if (b->session) {
+      assert(b->pg == this);
+      ConnectionRef con = b->session->con;
+      if (con) {   // OSD::ms_handle_reset clears s->con without a lock
+       con->send_message(
+         new MOSDBackoff(
+           CEPH_OSD_BACKOFF_OP_UNBLOCK,
+           b->id,
+           b->begin,
+           b->end,
+           get_osdmap()->get_epoch()));
+      }
+      if (b->is_new()) {
+       b->state = Backoff::STATE_DELETING;
+      } else {
+       b->session->rm_backoff(b);
+       b->session.reset();
+      }
+      b->pg.reset();
+    }
+  }
+}
+
+void PG::clear_backoffs()
+{
+  dout(10) << __func__ << " " << dendl;
+  map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> ls;
+  {
+    Mutex::Locker l(backoff_lock);
+    ls.swap(backoffs);
+  }
+  for (auto& p : ls) {
+    for (auto& b : p.second) {
+      Mutex::Locker l(b->lock);
+      dout(10) << __func__ << " " << *b << dendl;
+      if (b->session) {
+       assert(b->pg == this);
+       if (b->is_new()) {
+         b->state = Backoff::STATE_DELETING;
+       } else {
+         b->session->rm_backoff(b);
+         b->session.reset();
+       }
+       b->pg.reset();
+      }
+    }
+  }
+}
+
+// called by Session::clear_backoffs()
+void PG::rm_backoff(BackoffRef b)
+{
+  dout(10) << __func__ << " " << *b << dendl;
+  Mutex::Locker l(backoff_lock);
+  assert(b->lock.is_locked_by_me());
+  assert(b->pg == this);
+  auto p = backoffs.find(b->begin);
+  // may race with release_backoffs()
+  if (p != backoffs.end()) {
+    auto q = p->second.find(b);
+    if (q != p->second.end()) {
+      p->second.erase(q);
+      if (p->second.empty()) {
+       backoffs.erase(p);
+      }
+    }
+  }
+}
+
 void PG::clear_recovery_state() 
 {
   dout(10) << "clear_recovery_state" << dendl;
index 260aa3cf2a7e49ab3dc1f0ed873a8b14a6cbb28e..216195ff8af4be1026435ad09a3f3e4a7c81e02a 100644 (file)
@@ -34,6 +34,7 @@
 #include "osd_types.h"
 #include "include/xlist.h"
 #include "SnapMapper.h"
+#include "Session.h"
 
 #include "PGLog.h"
 #include "OSDMap.h"
@@ -1083,6 +1084,29 @@ public:
 
   friend class C_OSD_RepModify_Commit;
 
+  // -- backoff --
+  Mutex backoff_lock;  // orders inside Backoff::lock
+  map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> backoffs;
+
+  void add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end);
+  void release_backoffs(const hobject_t& begin, const hobject_t& end);
+  void release_backoffs(const hobject_t& o) {
+    release_backoffs(o, o);
+  }
+  void clear_backoffs();
+
+  void add_pg_backoff(SessionRef s) {
+    hobject_t begin = info.pgid.pgid.get_hobj_start();
+    hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+    add_backoff(s, begin, end);
+  }
+  void release_pg_backoffs() {
+    hobject_t begin = info.pgid.pgid.get_hobj_start();
+    hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+    release_backoffs(begin, end);
+  }
+
+  void rm_backoff(BackoffRef b);
 
   // -- scrub --
   struct Scrubber {
@@ -2167,6 +2191,7 @@ public:
   bool       is_activating() const { return state_test(PG_STATE_ACTIVATING); }
   bool       is_peering() const { return state_test(PG_STATE_PEERING); }
   bool       is_down() const { return state_test(PG_STATE_DOWN); }
+  bool       is_incomplete() const { return state_test(PG_STATE_INCOMPLETE); }
   bool       is_clean() const { return state_test(PG_STATE_CLEAN); }
   bool       is_degraded() const { return state_test(PG_STATE_DEGRADED); }
   bool       is_undersized() const { return state_test(PG_STATE_UNDERSIZED); }
diff --git a/src/osd/Session.cc b/src/osd/Session.cc
new file mode 100644 (file)
index 0000000..8f93ea9
--- /dev/null
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "PG.h"
+#include "Session.h"
+
+#include "common/debug.h"
+
+#define dout_context cct
+#define dout_subsys ceph_subsys_osd
+
+void Session::clear_backoffs()
+{
+  map<hobject_t,set<BackoffRef>,hobject_t::BitwiseComparator> ls;
+  {
+    Mutex::Locker l(backoff_lock);
+    ls.swap(backoffs);
+    backoff_count = 0;
+  }
+  for (auto& p : ls) {
+    for (auto& b : p.second) {
+      Mutex::Locker l(b->lock);
+      if (b->pg) {
+       assert(b->session == this);
+       assert(b->is_new() || b->is_acked());
+       b->pg->rm_backoff(b);
+       b->pg.reset();
+       b->session.reset();
+      } else if (b->session) {
+       assert(b->session == this);
+       assert(b->is_deleting());
+       b->session.reset();
+      }
+    }
+  }
+}
+
+void Session::ack_backoff(
+  CephContext *cct,
+  uint64_t id,
+  const hobject_t& begin,
+  const hobject_t& end)
+{
+  Mutex::Locker l(backoff_lock);
+  // NOTE that ack may be for [a,c] but osd may now have [a,b) and
+  // [b,c) due to a PG split.
+  auto p = backoffs.lower_bound(begin);
+  while (p != backoffs.end()) {
+    // note: must still examine begin=end=p->first case
+    int r = cmp_bitwise(p->first, end);
+    if (r > 0 || (r == 0 && cmp_bitwise(begin, end) < 0)) {
+      break;
+    }
+    auto q = p->second.begin();
+    while (q != p->second.end()) {
+      Backoff *b = (*q).get();
+      if (b->id == id) {
+       if (b->is_new()) {
+         b->state = Backoff::STATE_ACKED;
+         dout(20) << __func__ << " now " << *b << dendl;
+       } else if (b->is_deleting()) {
+         dout(20) << __func__ << " deleting " << *b << dendl;
+         q = p->second.erase(q);
+         continue;
+       }
+      }
+      ++q;
+    }
+    if (p->second.empty()) {
+      dout(20) << __func__ << " clearing bin " << p->first << dendl;
+      p = backoffs.erase(p);
+      --backoff_count;
+    } else {
+      ++p;
+    }
+  }
+  assert(backoff_count == (int)backoffs.size());
+}
index aa862c299a62ca8d8cecd5c52808b0139fd38cae..63a73c30073e6b8acf7da4bb6dbc188f562676cd 100644 (file)
 #define CEPH_OSD_SESSION_H
 
 #include "common/RefCountedObj.h"
+#include "common/Mutex.h"
+#include "include/Spinlock.h"
+#include "OSDCap.h"
+#include "Watch.h"
+#include "OSDMap.h"
+
+struct Session;
+typedef boost::intrusive_ptr<Session> SessionRef;
+struct Backoff;
+typedef boost::intrusive_ptr<Backoff> BackoffRef;
+class PG;
+typedef boost::intrusive_ptr<PG> PGRef;
+
+/*
+ * A Backoff represents one instance of either a PG or an OID
+ * being plugged at the client.  It's refcounted and linked from
+ * the PG {pg_oid}_backoffs map and from the client Session
+ * object.
+ *
+ * The Backoff has a lock that protects it's internal fields.
+ *
+ * The PG has a backoff_lock that protects it's maps to Backoffs.
+ * This lock is *inside* of Backoff::lock.
+ *
+ * The Session has a backoff_lock that protects it's map of pg and
+ * oid backoffs.  This lock is *inside* the Backoff::lock *and*
+ * PG::backoff_lock.
+ *
+ * That's
+ *
+ *    Backoff::lock
+ *       PG::backoff_lock
+ *         Session::backoff_lock
+ *
+ * When the Session goes away, we move our backoff lists aside,
+ * then we lock each of the Backoffs we
+ * previously referenced and clear the Session* pointer.  If the PG
+ * is still linked, we unlink it, too.
+ *
+ * When the PG clears the backoff, it will send an unblock message
+ * if the Session* is still non-null, and unlink the session.
+ *
+ */
+
+struct Backoff : public RefCountedObject {
+  enum {
+    STATE_NEW = 1,     ///< backoff in flight to client
+    STATE_ACKED = 2,   ///< backoff acked
+    STATE_DELETING = 3 ///< backoff deleted, but un-acked
+  };
+  std::atomic_int state = {STATE_NEW};
+  uint64_t id = 0;     ///< unique id (within the Session)
+
+  bool is_new() const {
+    return state.load() == STATE_NEW;
+  }
+  bool is_acked() const {
+    return state.load() == STATE_ACKED;
+  }
+  bool is_deleting() const {
+    return state.load() == STATE_DELETING;
+  }
+  const char *get_state_name() const {
+    switch (state.load()) {
+    case STATE_NEW: return "new";
+    case STATE_ACKED: return "acked";
+    case STATE_DELETING: return "deleting";
+    default: return "???";
+    }
+  }
+
+  Mutex lock;
+  // NOTE: the owning PG and session are either
+  //   - *both* set, or
+  //   - both null (teardown), or
+  //   - only session is set (and state == DELETING)
+  PGRef pg;             ///< owning pg
+  SessionRef session;   ///< owning session
+  hobject_t begin, end; ///< [) range to block, unless ==, then single obj
+
+  Backoff(PGRef pg, SessionRef s,
+         uint64_t i,
+         const hobject_t& b, const hobject_t& e)
+    : RefCountedObject(g_ceph_context, 0),
+      id(i),
+      lock("Backoff::lock"),
+      pg(pg),
+      session(s),
+      begin(b),
+      end(e) {}
+
+  friend ostream& operator<<(ostream& out, const Backoff& b) {
+    return out << "Backoff(" << &b << " " << b.id
+              << " " << b.get_state_name()
+              << " [" << b.begin << "," << b.end << ") "
+              << " session " << b.session
+              << " pg " << b.pg << ")";
+  }
+};
+
+
 
 struct Session : public RefCountedObject {
   EntityName entity_name;
@@ -35,20 +136,76 @@ struct Session : public RefCountedObject {
   Spinlock received_map_lock;
   epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
 
+  /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
+  Mutex backoff_lock;
+  std::atomic_int backoff_count= {0};  ///< simple count of backoffs
+  map<hobject_t,set<BackoffRef>, hobject_t::BitwiseComparator> backoffs;
+
+  std::atomic<uint64_t> backoff_seq = {0};
+
   explicit Session(CephContext *cct) :
     RefCountedObject(cct),
     auid(-1), con(0),
     wstate(cct),
     session_dispatch_lock("Session::session_dispatch_lock"),
-    last_sent_epoch(0), received_map_epoch(0)
+    last_sent_epoch(0), received_map_epoch(0),
+    backoff_lock("Session::backoff_lock")
     {}
   void maybe_reset_osdmap() {
     if (waiting_for_pg.empty()) {
       osdmap.reset();
     }
   }
-};
 
-typedef boost::intrusive_ptr<Session> SessionRef;
+  void ack_backoff(
+    CephContext *cct,
+    uint64_t id,
+    const hobject_t& start,
+    const hobject_t& end);
+
+  Backoff *have_backoff(const hobject_t& oid) {
+    if (backoff_count.load()) {
+      Mutex::Locker l(backoff_lock);
+      assert(backoff_count == (int)backoffs.size());
+      auto p = backoffs.lower_bound(oid);
+      if (p != backoffs.begin() &&
+         cmp_bitwise(p->first, oid) > 0) {
+       --p;
+      }
+      if (p != backoffs.end()) {
+       int r = cmp_bitwise(oid, p->first);
+       if (r == 0 || r > 0) {
+         for (auto& q : p->second) {
+           if (r == 0 || cmp_bitwise(oid, q->end) < 0) {
+             return &(*q);
+           }
+         }
+       }
+      }
+    }
+    return nullptr;
+  }
+
+  // called by PG::release_*_backoffs and PG::clear_backoffs()
+  void rm_backoff(BackoffRef b) {
+    Mutex::Locker l(backoff_lock);
+    assert(b->lock.is_locked_by_me());
+    assert(b->session == this);
+    auto p = backoffs.find(b->begin);
+    // may race with clear_backoffs()
+    if (p != backoffs.end()) {
+      auto q = p->second.find(b);
+      if (q != p->second.end()) {
+       p->second.erase(q);
+       if (p->second.empty()) {
+         backoffs.erase(p);
+         --backoff_count;
+       }
+      }
+    }
+    assert(backoff_count == (int)backoffs.size());
+  }
+  void clear_backoffs();
+};
 
 #endif