From 56c542ccb37f489e653b4adfe1c5346a8b07d9c7 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 22 Aug 2019 10:40:29 -0500 Subject: [PATCH] osd/PeeringState: send infos via message_map (not info_map) Queue up separate message for each. Note that even though we are currenting sending this via a pg_notify_t, not all of those fields are needed or consumed on the other end (notably, PastIntervals is not used by handle_fast_pg_info). Only pass the fields we need to send_info() so that we can improve the transport to be less weird later. Signed-off-by: Sage Weil --- src/osd/PeeringState.cc | 47 ++++++++++++++++++++++++++--------------- src/osd/PeeringState.h | 9 +++++++- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 94e9c3b8e8c37..feb4d1fc513f5 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -48,6 +48,23 @@ void BufferedRecoveryMessages::send_query( ); } +void BufferedRecoveryMessages::send_info( + int to, + spg_t to_spgid, + epoch_t min_epoch, + epoch_t cur_epoch, + const pg_info_t &info) +{ + MOSDPGInfo *m = new MOSDPGInfo(cur_epoch); + m->pg_list.push_back( + pg_notify_t( + to_spgid.shard, + info.pgid.shard, + min_epoch, cur_epoch, + info, PastIntervals())); + message_map[to].push_back(m); +} + void PGPool::update(CephContext *cct, OSDMapRef map) { const pg_pool_t *pi = map->get_pg_pool(id); @@ -1974,12 +1991,12 @@ bool PeeringState::search_for_missing( oinfo.last_update != eversion_t()) { pg_info_t tinfo(oinfo); tinfo.pgid.shard = pg_whoami.shard; - ctx.info_map[from.osd].emplace_back( - pg_notify_t( - from.shard, pg_whoami.shard, - get_osdmap_epoch(), - get_osdmap_epoch(), - tinfo, past_intervals)); + ctx.send_info( + from.osd, + spg_t(info.pgid.pgid, from.shard), + get_osdmap_epoch(), // fixme: use lower epoch? + get_osdmap_epoch(), + tinfo); } return found_missing; } @@ -2081,7 +2098,6 @@ void PeeringState::build_might_have_unfound() void PeeringState::activate( ObjectStore::Transaction& t, epoch_t activation_epoch, - map> *activator_map, PeeringCtxWrapper &ctx) { ceph_assert(!is_peered()); @@ -2198,16 +2214,15 @@ void PeeringState::activate( << " from (" << pi.log_tail << "," << pi.last_update << "] " << pi.last_backfill << " to " << info.last_update; - if (!pi.is_empty() && activator_map) { + if (!pi.is_empty()) { psdout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl; - (*activator_map)[peer.osd].emplace_back( - pg_notify_t( - peer.shard, pg_whoami.shard, - get_osdmap_epoch(), - get_osdmap_epoch(), - info, - past_intervals)); + ctx.send_info( + peer.osd, + spg_t(info.pgid.pgid, peer.shard), + get_osdmap_epoch(), // fixme: use lower epoch? + get_osdmap_epoch(), + info); } else { psdout(10) << "activate peer osd." << peer << " is up to date, but sending pg_log anyway" << dendl; @@ -5264,7 +5279,6 @@ PeeringState::Active::Active(my_context ctx) ps->start_flush(context< PeeringMachine >().get_cur_transaction()); ps->activate(context< PeeringMachine >().get_cur_transaction(), ps->get_osdmap_epoch(), - &context< PeeringMachine >().get_info_map(), context< PeeringMachine >().get_recovery_ctx()); // everyone has to commit/ack before we are truly active @@ -5633,7 +5647,6 @@ boost::statechart::result PeeringState::ReplicaActive::react( ps->activate( context< PeeringMachine >().get_cur_transaction(), actevt.activation_epoch, - NULL, context< PeeringMachine >().get_recovery_ctx()); psdout(10) << "Activate Finished" << dendl; return discard_event(); diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index c42e690fcb2f8..ca66589917779 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -78,6 +78,9 @@ struct BufferedRecoveryMessages { } void send_notify(int to, const pg_notify_t &n); void send_query(int to, spg_t spgid, const pg_query_t &q); + void send_info(int to, spg_t to_spgid, + epoch_t min_epoch, epoch_t cur_epoch, + const pg_info_t &info); }; struct HeartbeatStamps : public RefCountedObject { @@ -224,6 +227,11 @@ struct PeeringCtxWrapper { void send_query(int to, spg_t spgid, const pg_query_t &q) { msgs.send_query(to, spgid, q); } + void send_info(int to, spg_t to_spgid, + epoch_t min_epoch, epoch_t cur_epoch, + const pg_info_t &info) { + msgs.send_info(to, to_spgid, min_epoch, cur_epoch, info); + } }; /* Encapsulates PG recovery process */ @@ -1511,7 +1519,6 @@ public: void activate( ObjectStore::Transaction& t, epoch_t activation_epoch, - map> *activator_map, PeeringCtxWrapper &ctx); void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead); -- 2.39.5