From 32afacb32fbb9b926c144eafb137c484f908349f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 10 Feb 2017 18:55:55 -0500 Subject: [PATCH] osd: dispatch OSDBackoff messages into PG workqueue Signed-off-by: Sage Weil --- src/osd/OSD.cc | 56 +++++++++++++++++++++++++++++++++++++++++ src/osd/OSD.h | 2 ++ src/osd/PrimaryLogPG.cc | 21 ++++++++++++++++ src/osd/PrimaryLogPG.h | 2 ++ 4 files changed, 81 insertions(+) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index d1191944b72..46a712e87dc 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -66,6 +66,7 @@ #include "messages/MOSDMarkMeDown.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDBackoff.h" #include "messages/MOSDRepOp.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDSubOp.h" @@ -6306,6 +6307,10 @@ epoch_t op_required_epoch(OpRequestRef op) MOSDOp *m = static_cast(op->get_req()); return m->get_map_epoch(); } + case CEPH_MSG_OSD_BACKOFF: { + MOSDBackoff *m = static_cast(op->get_req()); + return m->osd_epoch; + } case MSG_OSD_SUBOP: return replica_op_required_epoch(op); case MSG_OSD_REPOP: @@ -6421,6 +6426,9 @@ bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap) case CEPH_MSG_OSD_OP: handle_op(op, osdmap); break; + case CEPH_MSG_OSD_BACKOFF: + handle_backoff(op, osdmap); + break; // for replication etc. case MSG_OSD_SUBOP: handle_replica_op(op, osdmap); @@ -8846,6 +8854,54 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) } } +void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap) +{ + MOSDBackoff *m = static_cast(op->get_req()); + Session *s = static_cast(m->get_connection()->get_priv()); + dout(10) << __func__ << " " << *m << " session " << s << dendl; + assert(s); + s->put(); + + if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) { + dout(10) << __func__ << " unrecognized op, ignoring" << dendl; + return; + } + + // map hobject range to PG(s) + bool queued = false; + hobject_t pos = m->begin; + do { + pg_t _pgid(pos.get_hash(), pos.pool); + if (osdmap->have_pg_pool(pos.pool)) { + _pgid = osdmap->raw_pg_to_pg(_pgid); + } + if (!osdmap->have_pg_pool(_pgid.pool())) { + // missing pool -- drop + return; + } + spg_t pgid; + if (osdmap->get_primary_shard(_pgid, &pgid)) { + dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl; + PGRef pg = get_pg_or_queue_for_pg(pgid, op, s); + if (pg) { + if (!queued) { + enqueue_op(pg, op); + queued = true; + } else { + // use a fresh OpRequest + m->get(); // take a ref for the new OpRequest + OpRequestRef newop(op_tracker.create_request(m)); + newop->mark_event("duplicated original op for another pg"); + enqueue_op(pg, newop); + } + } + } + // advance + pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num()); + dout(20) << __func__ << " next pg " << pos << dendl; + } while (cmp_bitwise(pos, m->end) < 0); +} + template void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index e9f4bbcbe9c..af5a847636b 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -2370,6 +2370,7 @@ protected: bool ms_can_fast_dispatch(Message *m) const { switch (m->get_type()) { case CEPH_MSG_OSD_OP: + case CEPH_MSG_OSD_BACKOFF: case MSG_OSD_SUBOP: case MSG_OSD_REPOP: case MSG_OSD_SUBOPREPLY: @@ -2468,6 +2469,7 @@ private: void handle_scrub(struct MOSDScrub *m); void handle_osd_ping(class MOSDPing *m); void handle_op(OpRequestRef& op, OSDMapRef& osdmap); + void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap); template void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap); diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 18480d21b05..98d7559890f 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -30,6 +30,7 @@ #include "common/perf_counters.h" #include "messages/MOSDOp.h" +#include "messages/MOSDBackoff.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" #include "messages/MOSDPGTrim.h" @@ -1598,6 +1599,26 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo src_oloc.key = oid.name; } +void PrimaryLogPG::handle_backoff(OpRequestRef& op) +{ + MOSDBackoff *m = static_cast(op->get_req()); + SessionRef session((Session *)m->get_connection()->get_priv()); + if (!session) + return; // drop it. + session->put(); // get_priv takes a ref, and so does the SessionRef + hobject_t begin = info.pgid.pgid.get_hobj_start(); + hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); + if (cmp_bitwise(begin, m->begin) < 0) { + begin = m->begin; + } + if (cmp_bitwise(end, m->end) > 0) { + end = m->end; + } + dout(10) << __func__ << " backoff ack id " << m->id + << " [" << begin << "," << end << ")" << dendl; + session->ack_backoff(cct, m->id, begin, end); +} + void PrimaryLogPG::do_request( OpRequestRef& op, ThreadPool::TPHandle &handle) diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 8e3f19c6410..a0b0ec3d013 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -1369,6 +1369,8 @@ public: ThreadPool::TPHandle &handle) override; void do_backfill(OpRequestRef op) override; + void handle_backoff(OpRequestRef& op); + OpContextUPtr trim_object(bool first, const hobject_t &coid); void snap_trimmer(epoch_t e) override; void kick_snap_trim() override; -- 2.39.5