From a888ebe4b499342c41eccbc1544c755da8ab1a18 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 21 Aug 2019 22:11:34 -0500 Subject: [PATCH] osd/PeeringState: add message_map to PeeringCtx/BufferedRecoveryMessages Have a generalized map of messages that are queued to be sent out. Send them when we dispatch the PeeringCtx. Signed-off-by: Sage Weil --- src/osd/OSD.cc | 19 +++++++++++++++++++ src/osd/PeeringState.cc | 4 +++- src/osd/PeeringState.h | 15 +++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index cb14e8d1b37eb..579a232ea78ab 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -9265,6 +9265,25 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap, do_notifies(ctx.notify_list, curmap); do_queries(ctx.query_map, curmap); do_infos(ctx.info_map, curmap); + + for (auto& [osd, ls] : ctx.message_map) { + if (!curmap->is_up(osd)) { + dout(20) << __func__ << " skipping down osd." << osd << dendl; + continue; + } + ConnectionRef con = service.get_con_osd_cluster( + osd, curmap->get_epoch()); + if (!con) { + dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" + << dendl; + continue; + } + service.maybe_share_map(con.get(), curmap); + for (auto m : ls) { + con->send_message2(m); + } + ls.clear(); + } } if ((!ctx.transaction.empty() || ctx.transaction.has_contexts()) && pg) { int tr = store->queue_transaction( diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 77c6b20d0b3a1..cd4c385ac6608 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -20,11 +20,13 @@ BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx) : query_map(std::move(ctx.query_map)), info_map(std::move(ctx.info_map)), - notify_list(std::move(ctx.notify_list)) + notify_list(std::move(ctx.notify_list)), + message_map(std::move(ctx.message_map)) { ctx.query_map.clear(); ctx.info_map.clear(); ctx.notify_list.clear(); + ctx.message_map.clear(); } void PGPool::update(CephContext *cct, OSDMapRef map) diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index ed95e82c08aa5..56a3835ccff8f 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -54,6 +54,7 @@ struct BufferedRecoveryMessages { map > query_map; map> info_map; map> notify_list; + map> message_map; BufferedRecoveryMessages() = default; BufferedRecoveryMessages(PeeringCtx &); @@ -75,6 +76,14 @@ struct BufferedRecoveryMessages { ovec.reserve(ovec.size() + nlist.size()); ovec.insert(ovec.end(), nlist.begin(), nlist.end()); } + for (auto &[target, ls] : m.message_map) { + auto &ovec = message_map[target]; + // put buffered messages in front + ls.reserve(ls.size() + ovec.size()); + ls.insert(ls.end(), ovec.begin(), ovec.end()); + ovec.clear(); + ovec.swap(ls); + } } }; @@ -373,6 +382,7 @@ private: map > &query_map; map> &info_map; map> ¬ify_list; + map> &message_map; ObjectStore::Transaction &transaction; HBHandle * const handle = nullptr; @@ -380,6 +390,7 @@ private: query_map(wrapped.query_map), info_map(wrapped.info_map), notify_list(wrapped.notify_list), + message_map(wrapped.message_map), transaction(wrapped.transaction), handle(wrapped.handle) {} @@ -387,11 +398,15 @@ private: : query_map(buf.query_map), info_map(buf.info_map), notify_list(buf.notify_list), + message_map(buf.message_map), transaction(wrapped.transaction), handle(wrapped.handle) {} PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default; + void send_osd_message(int target, Message *m) { + message_map[target].push_back(m); + } void send_notify(pg_shard_t to, const pg_notify_t &n) { notify_list[to.osd].emplace_back(n); } -- 2.39.5