From: Sage Weil Date: Thu, 22 Aug 2019 03:27:45 +0000 (-0500) Subject: osd/PeeringState: send notifies via message_map (not notify_list) X-Git-Tag: v15.1.0~1584^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=49a0573aa80bc679c8fed79e4070dad7f105dfc6;p=ceph.git osd/PeeringState: send notifies via message_map (not notify_list) Queue up a separate message for each notify, and drop the old notify_list plumbing. Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 579a232ea78a..1cbbdd619917 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -9262,7 +9262,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, } else if (!is_active()) { dout(20) << __func__ << " not active" << dendl; } else { - do_notifies(ctx.notify_list, curmap); do_queries(ctx.query_map, curmap); do_infos(ctx.info_map, curmap); @@ -9294,36 +9293,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, } } -/** do_notifies - * Send an MOSDPGNotify to a primary, with a list of PGs that I have - * content for, and they are primary for. - */ - -void OSD::do_notifies( - map>& notify_list, - OSDMapRef curmap) -{ - for (auto& [osd, notifies] : notify_list) { - if (!curmap->is_up(osd)) { - dout(20) << __func__ << " skipping down osd." << osd << dendl; - continue; - } - ConnectionRef con = service.get_con_osd_cluster( - osd, curmap->get_epoch()); - if (!con) { - dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl; - continue; - } - service.maybe_share_map(con.get(), curmap); - dout(7) << __func__ << " osd." << osd - << " on " << notifies.size() << " PGs" << dendl; - MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(), - std::move(notifies)); - con->send_message(m); - } -} - - /** do_queries * send out pending queries for info | summaries */ diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c7a2750cbfee..4fa7f7db8611 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1916,8 +1916,6 @@ protected: void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, ThreadPool::TPHandle *handle = NULL); void discard_context(PeeringCtx &ctx); - void do_notifies(map>& notify_list, - OSDMapRef map); void do_queries(map >& query_map, OSDMapRef map); void do_infos(map>& info_map, diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index cd4c385ac660..1df535093ac4 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -13,6 +13,7 @@ #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGLog.h" +#include "messages/MOSDPGNotify.h" #define dout_context cct #define dout_subsys ceph_subsys_osd @@ -20,15 +21,22 @@ BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx) : query_map(std::move(ctx.query_map)), info_map(std::move(ctx.info_map)), - notify_list(std::move(ctx.notify_list)), message_map(std::move(ctx.message_map)) { ctx.query_map.clear(); ctx.info_map.clear(); - ctx.notify_list.clear(); ctx.message_map.clear(); } +void PeeringCtxWrapper::send_notify(int to, const pg_notify_t &n) +{ + vector notifies; + notifies.push_back(n); + message_map[to].push_back( + new MOSDPGNotify(n.epoch_sent, std::move(notifies)) + ); +} + void PGPool::update(CephContext *cct, OSDMapRef map) { const pg_pool_t *pi = map->get_pg_pool(id); @@ -2566,7 +2574,7 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx) update_history(query.query.history); fulfill_info(query.from, query.query, notify_info); rctx.send_notify( - notify_info.first, + notify_info.first.osd, pg_notify_t( notify_info.first.shard, pg_whoami.shard, query.query_epoch, @@ -4104,7 +4112,7 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), @@ -5690,7 +5698,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), @@ -5810,7 +5818,7 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&) DECLARE_LOCALS; if (ps->should_send_notify() && ps->get_primary().osd >= 0) { context< PeeringMachine >().send_notify( - ps->get_primary(), + ps->get_primary().osd, pg_notify_t( ps->get_primary().shard, ps->pg_whoami.shard, ps->get_osdmap_epoch(), diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 56a3835ccff8..f60f9d66bf5e 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -53,7 +53,6 @@ class PeeringCtx; struct BufferedRecoveryMessages { map > query_map; map> info_map; - map> notify_list; map> message_map; BufferedRecoveryMessages() = default; @@ -71,11 +70,6 @@ struct BufferedRecoveryMessages { ovec.reserve(ovec.size() + ivec.size()); ovec.insert(ovec.end(), ivec.begin(), ivec.end()); } - for (auto &[target, nlist] : m.notify_list) { - auto &ovec = notify_list[target]; - ovec.reserve(ovec.size() + nlist.size()); - ovec.insert(ovec.end(), nlist.begin(), nlist.end()); - } for (auto &[target, ls] : m.message_map) { auto &ovec = message_map[target]; // put buffered messages in front @@ -197,6 +191,40 @@ struct PeeringCtx : BufferedRecoveryMessages { } }; +/** + * Wraps PeeringCtx to hide the difference between buffering messages to + * be sent after flush or immediately. + */ +struct PeeringCtxWrapper { + utime_t start_time; + map > &query_map; + map> &info_map; + map> &message_map; + ObjectStore::Transaction &transaction; + HBHandle * const handle = nullptr; + + PeeringCtxWrapper(PeeringCtx &wrapped) : + query_map(wrapped.query_map), + info_map(wrapped.info_map), + message_map(wrapped.message_map), + transaction(wrapped.transaction), + handle(wrapped.handle) {} + + PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped) + : query_map(buf.query_map), + info_map(buf.info_map), + message_map(buf.message_map), + transaction(wrapped.transaction), + handle(wrapped.handle) {} + + PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default; + + void send_osd_message(int target, Message *m) { + message_map[target].push_back(m); + } + void send_notify(int to, const pg_notify_t &n); +}; + /* Encapsulates PG recovery process */ class PeeringState : public MissingLoc::MappingInfo { public: @@ -372,47 +400,6 @@ public: virtual ~PeeringListener() {} }; -private: - /** - * Wraps PeeringCtx to hide the difference between buffering messages to - * be sent after flush or immediately. - */ - struct PeeringCtxWrapper { - utime_t start_time; - map > &query_map; - map> &info_map; - map> ¬ify_list; - map> &message_map; - ObjectStore::Transaction &transaction; - HBHandle * const handle = nullptr; - - PeeringCtxWrapper(PeeringCtx &wrapped) : - query_map(wrapped.query_map), - info_map(wrapped.info_map), - notify_list(wrapped.notify_list), - message_map(wrapped.message_map), - transaction(wrapped.transaction), - handle(wrapped.handle) {} - - PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped) - : query_map(buf.query_map), - info_map(buf.info_map), - notify_list(buf.notify_list), - message_map(buf.message_map), - transaction(wrapped.transaction), - handle(wrapped.handle) {} - - PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default; - - void send_osd_message(int target, Message *m) { - message_map[target].push_back(m); - } - void send_notify(pg_shard_t to, const pg_notify_t &n) { - notify_list[to.osd].emplace_back(n); - } - }; -public: - struct QueryState : boost::statechart::event< QueryState > { Formatter *f; explicit QueryState(Formatter *f) : f(f) {} @@ -594,7 +581,7 @@ public: return *(state->rctx); } - void send_notify(pg_shard_t to, const pg_notify_t &n) { + void send_notify(int to, const pg_notify_t &n) { ceph_assert(state->rctx); state->rctx->send_notify(to, n); }