]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: dispatch OSDBackoff messages into PG workqueue
authorSage Weil <sage@redhat.com>
Fri, 10 Feb 2017 23:55:55 +0000 (18:55 -0500)
committerSage Weil <sage@redhat.com>
Fri, 10 Feb 2017 23:55:55 +0000 (18:55 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index d1191944b72f1e826e60762a584eee2e75360a64..46a712e87dce5e34720d24194341f4e7d2a4f5cf 100644 (file)
@@ -66,6 +66,7 @@
 #include "messages/MOSDMarkMeDown.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDBackoff.h"
 #include "messages/MOSDRepOp.h"
 #include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDSubOp.h"
@@ -6306,6 +6307,10 @@ epoch_t op_required_epoch(OpRequestRef op)
     MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
     return m->get_map_epoch();
   }
+  case CEPH_MSG_OSD_BACKOFF: {
+    MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+    return m->osd_epoch;
+  }
   case MSG_OSD_SUBOP:
     return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
   case MSG_OSD_REPOP:
@@ -6421,6 +6426,9 @@ bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap)
   case CEPH_MSG_OSD_OP:
     handle_op(op, osdmap);
     break;
+  case CEPH_MSG_OSD_BACKOFF:
+    handle_backoff(op, osdmap);
+    break;
     // for replication etc.
   case MSG_OSD_SUBOP:
     handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
@@ -8846,6 +8854,54 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
   }
 }
 
+void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
+{
+  MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+  Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+  dout(10) << __func__ << " " << *m << " session " << s << dendl;
+  assert(s);
+  s->put();
+
+  if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) {
+    dout(10) << __func__ << " unrecognized op, ignoring" << dendl;
+    return;
+  }
+
+  // map hobject range to PG(s)
+  bool queued = false;
+  hobject_t pos = m->begin;
+  do {
+    pg_t _pgid(pos.get_hash(), pos.pool);
+    if (osdmap->have_pg_pool(pos.pool)) {
+      _pgid = osdmap->raw_pg_to_pg(_pgid);
+    }
+    if (!osdmap->have_pg_pool(_pgid.pool())) {
+      // missing pool -- drop
+      return;
+    }
+    spg_t pgid;
+    if (osdmap->get_primary_shard(_pgid, &pgid)) {
+      dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl;
+      PGRef pg = get_pg_or_queue_for_pg(pgid, op, s);
+      if (pg) {
+       if (!queued) {
+         enqueue_op(pg, op);
+         queued = true;
+       } else {
+         // use a fresh OpRequest
+         m->get();     // take a ref for the new OpRequest
+         OpRequestRef newop(op_tracker.create_request<OpRequest, Message*>(m));
+         newop->mark_event("duplicated original op for another pg");
+         enqueue_op(pg, newop);
+       }
+      }
+    }
+    // advance
+    pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num());
+    dout(20) << __func__ << "  next pg " << pos << dendl;
+  } while (cmp_bitwise(pos, m->end) < 0);
+}
+
 template<typename T, int MSGTYPE>
 void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
 {
index e9f4bbcbe9c50f80fb3e93ee9defb42bf49e1a47..af5a847636bebbce65b620bea407ce9f907ed781 100644 (file)
@@ -2370,6 +2370,7 @@ protected:
   bool ms_can_fast_dispatch(Message *m) const {
     switch (m->get_type()) {
     case CEPH_MSG_OSD_OP:
+    case CEPH_MSG_OSD_BACKOFF:
     case MSG_OSD_SUBOP:
     case MSG_OSD_REPOP:
     case MSG_OSD_SUBOPREPLY:
@@ -2468,6 +2469,7 @@ private:
   void handle_scrub(struct MOSDScrub *m);
   void handle_osd_ping(class MOSDPing *m);
   void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
+  void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap);
 
   template <typename T, int MSGTYPE>
   void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap);
index 18480d21b058cd08a539a20679d6305cbfe69db0..98d7559890f02ebee027dc12ac0ae94acc9f64aa 100644 (file)
@@ -30,6 +30,7 @@
 #include "common/perf_counters.h"
 
 #include "messages/MOSDOp.h"
+#include "messages/MOSDBackoff.h"
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
 #include "messages/MOSDPGTrim.h"
@@ -1598,6 +1599,26 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
     src_oloc.key = oid.name;
 }
 
+void PrimaryLogPG::handle_backoff(OpRequestRef& op)
+{
+  MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+  SessionRef session((Session *)m->get_connection()->get_priv());
+  if (!session)
+    return;  // drop it.
+  session->put();  // get_priv takes a ref, and so does the SessionRef
+  hobject_t begin = info.pgid.pgid.get_hobj_start();
+  hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+  if (cmp_bitwise(begin, m->begin) < 0) {
+    begin = m->begin;
+  }
+  if (cmp_bitwise(end, m->end) > 0) {
+    end = m->end;
+  }
+  dout(10) << __func__ << " backoff ack id " << m->id
+          << " [" << begin << "," << end << ")" << dendl;
+  session->ack_backoff(cct, m->id, begin, end);
+}
+
 void PrimaryLogPG::do_request(
   OpRequestRef& op,
   ThreadPool::TPHandle &handle)
index 8e3f19c64105ec6ed6e2f8d09d9b7b168056a0aa..a0b0ec3d013503672aa99f31c4a90557dfb03c92 100644 (file)
@@ -1369,6 +1369,8 @@ public:
     ThreadPool::TPHandle &handle) override;
   void do_backfill(OpRequestRef op) override;
 
+  void handle_backoff(OpRequestRef& op);
+
   OpContextUPtr trim_object(bool first, const hobject_t &coid);
   void snap_trimmer(epoch_t e) override;
   void kick_snap_trim() override;