From bbf952125e2f90a7e37f9922bbadae26b922d2ba Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sat, 17 Feb 2018 11:38:57 -0600 Subject: [PATCH] osd: notify mon when pending PGs are ready to merge When a PG is in the pending merge state it is >= pg_num_pending and < pg_num. When this happens quiesce IO, peer, wait for activate to commit, and then notify the mon that we are idle and safe to merge. Signed-off-by: Sage Weil --- doc/dev/placement-group.rst | 6 ++++ src/osd/OSD.cc | 67 +++++++++++++++++++++++++++++++++++++ src/osd/OSD.h | 14 ++++++++ src/osd/PG.cc | 28 ++++++++++++++-- src/osd/osd_types.cc | 26 ++++++++++++++ src/osd/osd_types.h | 4 ++- 6 files changed, 141 insertions(+), 4 deletions(-) diff --git a/doc/dev/placement-group.rst b/doc/dev/placement-group.rst index 3c067ea3fe6f3..c315e4b11a710 100644 --- a/doc/dev/placement-group.rst +++ b/doc/dev/placement-group.rst @@ -149,3 +149,9 @@ User-visible PG States *remapped* the PG is temporarily mapped to a different set of OSDs from what CRUSH specified + +*premerge* + the PG is in a quiesced-IO state due to an impending PG merge. That + happens when pg_num_pending < pg_num, and applies to the PGs with + pg_num_pending <= ps < pg_num as well as the corresponding peer PG + that it is merging with. diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 52b345aed10ad..36a6abc81fa3d 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -78,6 +78,7 @@ #include "messages/MOSDRepOpReply.h" #include "messages/MOSDBoot.h" #include "messages/MOSDPGTemp.h" +#include "messages/MOSDPGReadyToMerge.h" #include "messages/MOSDMap.h" #include "messages/MMonGetOSDMap.h" @@ -1634,6 +1635,70 @@ void OSDService::finish_pg_delete(PG *pg, unsigned old_pg_num) } } +// --- + +void OSDService::set_ready_to_merge_source(PG *pg) +{ + Mutex::Locker l(merge_lock); + dout(10) << __func__ << " " << pg->pg_id << dendl; + ready_to_merge_source.insert(pg->pg_id.pgid); + _send_ready_to_merge(); +} + +void OSDService::set_ready_to_merge_target(PG *pg) +{ + Mutex::Locker l(merge_lock); + dout(10) << __func__ << " " << pg->pg_id << dendl; + ready_to_merge_target.insert(pg->pg_id.pgid); + _send_ready_to_merge(); +} + +void OSDService::send_ready_to_merge() +{ + Mutex::Locker l(merge_lock); + _send_ready_to_merge(); +} + +void OSDService::_send_ready_to_merge() +{ + for (auto src : ready_to_merge_source) { + if (ready_to_merge_target.count(src.get_parent()) && + sent_ready_to_merge_source.count(src) == 0) { + monc->send_mon_message(new MOSDPGReadyToMerge(src, osdmap->get_epoch())); + sent_ready_to_merge_source.insert(src); + } + } +} + +void OSDService::clear_ready_to_merge(PG *pg) +{ + Mutex::Locker l(merge_lock); + ready_to_merge_source.erase(pg->pg_id.pgid); + ready_to_merge_target.erase(pg->pg_id.pgid); +} + +void OSDService::clear_sent_ready_to_merge() +{ + Mutex::Locker l(merge_lock); + sent_ready_to_merge_source.clear(); +} + +void OSDService::prune_sent_ready_to_merge(OSDMapRef& osdmap) +{ + Mutex::Locker l(merge_lock); + auto i = sent_ready_to_merge_source.begin(); + while (i != sent_ready_to_merge_source.end()) { + if (!osdmap->pg_exists(*i)) { + dout(10) << __func__ << " " << *i << dendl; + i = sent_ready_to_merge_source.erase(i); + } else { + ++i; + } + } +} + +// --- + void OSDService::_queue_for_recovery( std::pair p, uint64_t reserved_pushes) @@ -5166,7 +5231,9 @@ void OSD::ms_handle_connect(Connection *con) send_full_update(); send_alive(); service.requeue_pg_temp(); + service.clear_sent_ready_to_merge(); service.send_pg_temp(); + service.send_ready_to_merge(); requeue_failures(); send_failures(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c92b1bb8d7adf..4b766f56ad241 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -714,6 +714,20 @@ public: AsyncReserver local_reserver; AsyncReserver remote_reserver; + // -- pg merge -- + Mutex merge_lock = {"OSD::merge_lock"}; + set ready_to_merge_source; + set ready_to_merge_target; + set sent_ready_to_merge_source; + + void set_ready_to_merge_source(PG *pg); + void set_ready_to_merge_target(PG *pg); + void clear_ready_to_merge(PG *pg); + void send_ready_to_merge(); + void _send_ready_to_merge(); + void clear_sent_ready_to_merge(); + void prune_sent_ready_to_merge(OSDMapRef& osdmap); + // -- pg_temp -- private: Mutex pg_temp_lock; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index c912bf09b3757..5f7ea5573c399 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -5901,6 +5901,10 @@ void PG::start_peering_interval( unreg_next_scrub(); + if (is_primary()) { + osd->clear_ready_to_merge(this); + } + pg_shard_t old_acting_primary = get_primary(); pg_shard_t old_up_primary = up_primary; bool was_old_primary = is_primary(); @@ -6015,6 +6019,7 @@ void PG::start_peering_interval( // deactivate. state_clear(PG_STATE_ACTIVE); state_clear(PG_STATE_PEERED); + state_clear(PG_STATE_PREMERGE); state_clear(PG_STATE_DOWN); state_clear(PG_STATE_RECOVERY_WAIT); state_clear(PG_STATE_RECOVERY_TOOFULL); @@ -7857,8 +7862,20 @@ PG::RecoveryState::Clean::Clean(my_context ctx) if (pg->is_active()) { pg->mark_clean(); + } else if (pg->is_peered()) { + bool target; + if (pg->pool.info.is_pending_merge(pg->info.pgid.pgid, &target)) { + if (target) { + ldout(pg->cct, 10) << "ready to merge (target)" << dendl; + pg->osd->set_ready_to_merge_target(pg); + } else { + ldout(pg->cct, 10) << "ready to merge (source)" << dendl; + pg->osd->set_ready_to_merge_source(pg); + } + } } pg->state_clear(PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL); + pg->share_pg_info(); pg->publish_stats_to_osd(); pg->requeue_ops(pg->waiting_for_clean_to_primary_repair); @@ -8257,10 +8274,15 @@ boost::statechart::result PG::RecoveryState::Active::react(const AllReplicasActi pg->state_clear(PG_STATE_ACTIVATING); pg->state_clear(PG_STATE_CREATING); - if (pg->acting.size() >= pg->pool.info.min_size) { - pg->state_set(PG_STATE_ACTIVE); - } else { + pg->state_clear(PG_STATE_PREMERGE); + + if (pg->acting.size() < pg->pool.info.min_size) { pg->state_set(PG_STATE_PEERED); + } else if (pg->pool.info.is_pending_merge(pg->info.pgid.pgid, nullptr)) { + pg->state_set(PG_STATE_PEERED); + pg->state_set(PG_STATE_PREMERGE); + } else { + pg->state_set(PG_STATE_ACTIVE); } // info.last_epoch_started is set during activate() diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 1b88cc6ff772e..28257b03c6b99 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -863,6 +863,8 @@ std::string pg_state_string(uint64_t state) oss << "degraded+"; if (state & PG_STATE_REMAPPED) oss << "remapped+"; + if (state & PG_STATE_PREMERGE) + oss << "premerge+"; if (state & PG_STATE_SCRUBBING) oss << "scrubbing+"; if (state & PG_STATE_DEEP_SCRUB) @@ -912,6 +914,8 @@ boost::optional pg_string_state(const std::string& state) type = PG_STATE_RECOVERY_UNFOUND; else if (state == "backfill_unfound") type = PG_STATE_BACKFILL_UNFOUND; + else if (state == "premerge") + type = PG_STATE_PREMERGE; else if (state == "scrubbing") type = PG_STATE_SCRUBBING; else if (state == "degraded") @@ -1312,6 +1316,28 @@ unsigned pg_pool_t::get_pg_num_divisor(pg_t pgid) const return (pg_num_mask + 1) >> 1; // bigger bin (not yet split) } +bool pg_pool_t::is_pending_merge(pg_t pgid, bool *target) const +{ + if (pg_num_pending >= pg_num) { + return false; + } + if (pgid.ps() >= pg_num_pending && pgid.ps() < pg_num) { + if (target) { + *target = false; + } + return true; + } + for (unsigned ps = pg_num_pending; ps < pg_num; ++ps) { + if (pg_t(ps, pgid.pool()).get_parent() == pgid) { + if (target) { + *target = true; + } + return true; + } + } + return false; +} + /* * we have two snap modes: * - pool snaps diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 57b022cf6af2b..b2a558fb1d121 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1011,7 +1011,7 @@ inline ostream& operator<<(ostream& out, const osd_stat_t& s) { #define PG_STATE_DOWN (1ULL << 4) // a needed replica is down, PG offline #define PG_STATE_RECOVERY_UNFOUND (1ULL << 5) // recovery stopped due to unfound #define PG_STATE_BACKFILL_UNFOUND (1ULL << 6) // backfill stopped due to unfound -//#define PG_STATE_SPLITTING (1ULL << 7) // i am splitting +#define PG_STATE_PREMERGE (1ULL << 7) // i am prepare to merging #define PG_STATE_SCRUBBING (1ULL << 8) // scrubbing //#define PG_STATE_SCRUBQ (1ULL << 9) // queued for scrub #define PG_STATE_DEGRADED (1ULL << 10) // pg contains objects with reduced redundancy @@ -1566,6 +1566,8 @@ public: // pool size that it represents. unsigned get_pg_num_divisor(pg_t pgid) const; + bool is_pending_merge(pg_t pgid, bool *target) const; + void set_pg_num(int p) { pg_num = p; pg_num_pending = p; -- 2.39.5