Seastore cannot merge collections between reactor shards currently.
On cross-shard detection, tell the monitor the source PG is not ready
(via MOSDPGReadyToMerge{ ready=false }) so the unsafe pg_num decrement
is never proposed, then send MOSDPGStopMerge to clamp pg_num_target and
permanently disable further shrink for the pool.
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
if (pg->pgid.is_merge_source(old_pg_num,
new_pg_num,
&parent)) {
+ parent.is_split(new_pg_num, old_pg_num, &merge_sources);
+ if (!co_await shard_services.seastore_merge_shards_ok(
+ parent, merge_sources)) {
+ co_return merge_result_t{};
+ }
co_return merge_result_t{merge_role_t::Source, parent};
} else if (pg->pgid.is_merge_target(old_pg_num,
new_pg_num)) {
DEBUG("Target PG {} identified. Waiting for sources...", pg->get_pgid());
pg->pgid.is_split(new_pg_num, old_pg_num, &merge_sources);
+
+ if (!co_await shard_services.seastore_merge_shards_ok(
+ pg->get_pgid(), merge_sources)) {
+ co_return merge_result_t{};
+ }
// Block until all source PGs (potentially from other shards) arrive
// on this PG's rendezvous
auto sources = co_await pg->collect_merge_sources(merge_sources.size());
clear_ready_to_merge();
}
- // Wake any coroutine parked in collect_merge_sources() so shutdown
- // doesn't hang waiting for sources that will never arrive.
- merge_rendezvous.arrivals.broken();
- merge_rendezvous.sources.clear();
+ reset_merge_rendezvous();
cancel_local_background_io_reservation();
cancel_remote_recovery_reservation();
co_return sources;
}
+void PG::reset_merge_rendezvous()
+{
+ // Unblock any waiter in collect_merge_sources() with broken(); then
+ // replace the semaphore so the next merge attempt starts at zero signals.
+ merge_rendezvous.arrivals.broken();
+ merge_rendezvous = merge_rendezvous_t{};
+}
+
void PG::merge_from(
merge_source_map_t& sources,
PeeringCtx &rctx,
// return them and clear the rendezvous state. Returns an empty map if
// reset_merge_rendezvous() breaks the wait (e.g. PG stop or merge cancel).
seastar::future<merge_source_map_t> collect_merge_sources(std::size_t n);
- void merge_from(
- merge_source_map_t& sources,
- PeeringCtx &rctx,
- unsigned split_bits,
- const pg_merge_meta_t& last_pg_merge_meta);
// Drop in-flight handoffs and reset the semaphore. Call on PG stop or
// after Seastore cross-shard cancel so a failed try cannot leave stale
#include "messages/MOSDPGCreated.h"
#include "messages/MOSDPGTemp.h"
#include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
#include "osd/osd_perf_counters.h"
#include "osd/PeeringState.h"
});
}
+bool ShardServices::is_seastore_objectstore()
+{
+ return crimson::common::local_conf().get_val<std::string>(
+ "osd_objectstore") == "seastore";
+}
+
+seastar::future<> ShardServices::reset_target_merge_rendezvous(spg_t target)
+{
+ const core_id_t target_core = co_await get_pg_mapping(target);
+ co_await container().invoke_on(
+ target_core,
+ [target](ShardServices& target_svc) {
+ if (auto target_pg = target_svc.local_state.pg_map.get_pg(target)) {
+ target_pg->reset_merge_rendezvous();
+ }
+ });
+}
+
+seastar::future<> ShardServices::send_stop_pool_merge(pg_t source_pgid)
+{
+ co_await with_singleton(
+ [pool = source_pgid.pool(), source_pgid](OSDSingletonState& singleton) {
+ return singleton.send_stop_pool_pg_merge(pool, source_pgid);
+ });
+}
+
+seastar::future<> ShardServices::abort_seastore_cross_shard_merge(
+ spg_t target,
+ pg_t source_pgid,
+ const std::set<spg_t>* all_sources,
+ bool reset_target_rendezvous)
+{
+ if (all_sources) {
+ for (const auto& src : *all_sources) {
+ co_await clear_ready_to_merge(src.pgid);
+ }
+ } else {
+ co_await clear_ready_to_merge(source_pgid);
+ }
+ co_await clear_ready_to_merge(target.pgid);
+
+ // Ensure the monitor backs off before it can commit a pg_num decrement
+ // for this source. "Stop merge" is permanent policy; "not ready" prevents
+ // an already-in-flight decrement decision from being committed.
+ co_await set_not_ready_to_merge_source(source_pgid);
+ co_await send_stop_pool_merge(source_pgid);
+ if (reset_target_rendezvous) {
+ co_await reset_target_merge_rendezvous(target);
+ }
+}
+
+
+seastar::future<bool> ShardServices::seastore_merge_shards_ok(
+ spg_t target,
+ const std::set<spg_t>& merge_sources)
+{
+ LOG_PREFIX(ShardServices::seastore_merge_shards_ok);
+ if (!is_seastore_objectstore()) {
+ co_return true;
+ }
+ const core_id_t target_core = co_await get_pg_mapping(target);
+ std::optional<pg_t> cross_shard_source;
+ for (const auto& src : merge_sources) {
+ if (co_await get_pg_mapping(src) != target_core) {
+ cross_shard_source = src.pgid;
+ break;
+ }
+ }
+ if (!cross_shard_source) {
+ co_return true;
+ }
+ DEBUG("seastore: target {} has a cross-shard merge source {}; stop pool merge",
+ target, *cross_shard_source);
+ co_await abort_seastore_cross_shard_merge(
+ target, *cross_shard_source, &merge_sources, true);
+ co_return false;
+}
+
Ref<PG> PerShardState::get_pg(spg_t pgid)
{
assert_core();
sent_ready_to_merge_source.clear();
}
+seastar::future<> OSDSingletonState::send_stop_pool_pg_merge(int64_t pool, pg_t pgid)
+{
+ LOG_PREFIX(OSDSingletonState::send_stop_pool_pg_merge);
+ const pg_pool_t *pi = osdmap->get_pg_pool(pool);
+ if (!pi || !pi->is_crimson()) {
+ co_return;
+ }
+ if (!pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) {
+ pools_merge_stopped_reported.insert(pool);
+ co_return;
+ }
+ // Claim the pool before co_await: otherwise concurrent callers can all pass
+ // the check and each send MOSDPGStopMerge while yielded on send_message().
+ if (!pools_merge_stopped_reported.insert(pool).second) {
+ co_return;
+ }
+ DEBUG("seastore: asking monitor to stop PG merge for pool {} (pg {})",
+ pool, pgid);
+ co_await monc.send_message(crimson::make_message<MOSDPGStopMerge>(
+ pool, pgid, MOSDPGStopMerge::REASON_CROSS_SHARD, osdmap->get_epoch()));
+}
+
+void OSDSingletonState::prune_pools_merge_stopped_reported()
+{
+ // send_stop_pool_pg_merge() inserts a pool after one MOSDPGStopMerge so we
+ // do not spam the mon on every cross-shard source/target pair. Keep that
+ // entry while the map still has merge disabled; forget it if the pool
+ // disappears or crimson_allow_pg_merge is set again.
+ auto pool = pools_merge_stopped_reported.begin();
+ while (pool != pools_merge_stopped_reported.end()) {
+ const pg_pool_t *pi = osdmap->get_pg_pool(*pool);
+ if (!pi || pi->has_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE)) {
+ pool = pools_merge_stopped_reported.erase(pool);
+ } else {
+ ++pool;
+ }
+ }
+}
+
void OSDSingletonState::prune_sent_ready_to_merge()
{
LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge);
+ prune_pools_merge_stopped_reported();
auto source = sent_ready_to_merge_source.begin();
while (source != sent_ready_to_merge_source.end()) {
if (!osdmap->pg_exists(*source)) {
std::set<pg_t> not_ready_to_merge_source;
std::map<pg_t,pg_t> not_ready_to_merge_target;
std::set<pg_t> sent_ready_to_merge_source;
+ std::set<int64_t> pools_merge_stopped_reported;
seastar::future<> set_ready_to_merge_source(pg_t pgid,
eversion_t version);
seastar::future<> set_ready_to_merge_target(pg_t pgid,
seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source);
void clear_ready_to_merge(pg_t pgid);
seastar::future<> send_ready_to_merge();
+ seastar::future<> send_stop_pool_pg_merge(int64_t pool, pg_t pgid);
void clear_sent_ready_to_merge();
+ void prune_pools_merge_stopped_reported();
void prune_sent_ready_to_merge();
};
// local_shared_foreign_ptr it received.
seastar::future<> register_merge_source(spg_t target, spg_t source);
+ static bool is_seastore_objectstore();
+
+ // True if every merge source is co-located on the target PG's shard
+ // (Seastore only). Uses the full sibling set so one source cannot commit
+ // while another would abort.
+ seastar::future<bool> seastore_merge_shards_ok(
+ spg_t target, const std::set<spg_t>& merge_sources);
+
FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)
invoke_context_on_core(seastar::this_shard_id(), on_reserved));
}
+private:
+ seastar::future<> reset_target_merge_rendezvous(spg_t target);
+ seastar::future<> send_stop_pool_merge(pg_t source_pgid);
+ seastar::future<> abort_seastore_cross_shard_merge(
+ spg_t target,
+ pg_t source_pgid,
+ const std::set<spg_t>* all_sources,
+ bool reset_target_rendezvous);
+
#undef FORWARD_CONST
#undef FORWARD
#undef FORWARD_TO_OSD_SINGLETON
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#pragma once
+
+#include "osd/osd_types.h"
+#include "messages/PaxosServiceMessage.h"
+
+/// OSD -> mon: permanently stop PG merge shrink for a Crimson pool.
+class MOSDPGStopMerge : public PaxosServiceMessage {
+public:
+ static constexpr uint8_t REASON_CROSS_SHARD = 1;
+
+ int64_t pool = -1;
+ pg_t pgid;
+ uint8_t reason = REASON_CROSS_SHARD;
+
+ MOSDPGStopMerge()
+ : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, 0}
+ {}
+ MOSDPGStopMerge(int64_t pool, pg_t pgid, uint8_t reason, epoch_t epoch)
+ : PaxosServiceMessage{MSG_OSD_PG_STOP_MERGE, epoch},
+ pool(pool),
+ pgid(pgid),
+ reason(reason)
+ {}
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ paxos_encode();
+ encode(pool, payload);
+ encode(pgid, payload);
+ encode(reason, payload);
+ }
+ void decode_payload() override {
+ using ceph::decode;
+ auto p = payload.cbegin();
+ paxos_decode(p);
+ decode(pool, p);
+ decode(pgid, p);
+ decode(reason, p);
+ }
+ std::string_view get_type_name() const override { return "osd_pg_stop_merge"; }
+ void print(std::ostream &out) const {
+ out << get_type_name()
+ << "(pool " << pool
+ << " pg " << pgid
+ << " reason " << (unsigned)reason
+ << " v" << version << ")";
+ }
+private:
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
case MSG_REMOVE_SNAPS:
case MSG_MON_GET_PURGED_SNAPS:
case MSG_OSD_PG_READY_TO_MERGE:
+ case MSG_OSD_PG_STOP_MERGE:
paxos_service[PAXOS_OSDMAP]->dispatch(op);
return;
#include "messages/MOSDPGCreated.h"
#include "messages/MOSDPGTemp.h"
#include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
#include "messages/MMonCommand.h"
#include "messages/MRemoveSnaps.h"
#include "messages/MRoute.h"
return preprocess_pg_created(op);
case MSG_OSD_PG_READY_TO_MERGE:
return preprocess_pg_ready_to_merge(op);
+ case MSG_OSD_PG_STOP_MERGE:
+ return preprocess_pg_stop_merge(op);
case MSG_OSD_PGTEMP:
return preprocess_pgtemp(op);
case MSG_OSD_BEACON:
return prepare_pgtemp(op);
case MSG_OSD_PG_READY_TO_MERGE:
return prepare_pg_ready_to_merge(op);
+ case MSG_OSD_PG_STOP_MERGE:
+ return prepare_pg_stop_merge(op);
case MSG_OSD_BEACON:
return prepare_beacon(op);
p.last_change = pending_inc.epoch;
} else {
// back off the merge attempt!
+ if (!m->ready && !p.has_flag(pg_pool_t::FLAG_CRIMSON)) {
+ mon.clog->warn() << "osd." << m->get_orig_source().num()
+ << " reported pg " << m->pgid
+ << " not ready to merge; backing off pg_num decrease"
+ << " for pool '"
+ << osdmap.get_pool_name(m->pgid.pool()) << "'";
+ dout(1) << __func__ << " osd." << m->get_orig_source().num()
+ << " pg " << m->pgid << " not ready to merge, backing off"
+ << dendl;
+ }
p.set_pg_num_pending(p.get_pg_num());
}
return true;
}
+bool OSDMonitor::preprocess_pg_stop_merge(MonOpRequestRef op)
+{
+ op->mark_osdmon_event(__func__);
+ auto m = op->get_req<MOSDPGStopMerge>();
+ dout(10) << __func__ << " " << *m << dendl;
+ auto session = op->get_session();
+ if (!session) {
+ dout(10) << __func__ << ": no monitor session!" << dendl;
+ goto ignore;
+ }
+ if (!session->is_capable("osd", MON_CAP_X)) {
+ derr << __func__ << " received from entity "
+ << "with insufficient privileges " << session->caps << dendl;
+ goto ignore;
+ }
+ if (!osdmap.get_pg_pool(m->pool)) {
+ derr << __func__ << " pool " << m->pool << " dne" << dendl;
+ goto ignore;
+ }
+ return false;
+
+ ignore:
+ mon.no_reply(op);
+ return true;
+}
+
+bool OSDMonitor::prepare_pg_stop_merge(MonOpRequestRef op)
+{
+ op->mark_osdmon_event(__func__);
+ auto m = op->get_req<MOSDPGStopMerge>();
+ dout(10) << __func__ << " " << *m << dendl;
+
+ pg_pool_t p;
+ if (pending_inc.new_pools.count(m->pool))
+ p = pending_inc.new_pools[m->pool];
+ else
+ p = *osdmap.get_pg_pool(m->pool);
+
+ if (!p.is_crimson()) {
+ dout(10) << __func__ << " pool " << m->pool << " is not crimson, ignoring"
+ << dendl;
+ wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version));
+ return false;
+ }
+
+ const char *reason_str = "unknown";
+ if (m->reason == MOSDPGStopMerge::REASON_CROSS_SHARD) {
+ reason_str = "cross-shard PG merge not supported on Seastore";
+ }
+
+ mon.clog->warn() << "osd." << m->get_orig_source().num()
+ << " stopped PG merge for pool '"
+ << osdmap.get_pool_name(m->pool)
+ << "' (" << reason_str << ", source pg " << m->pgid
+ << "); no further pg_num decrease will be attempted";
+
+ // Cancel any in-flight shrink; keep current pg_num as-is.
+ p.set_pg_num_pending(p.get_pg_num());
+ if (p.get_pg_num_target() < p.get_pg_num()) {
+ p.set_pg_num_target(p.get_pg_num());
+ }
+ if (p.get_pgp_num_target() < p.get_pgp_num()) {
+ p.set_pgp_num_target(p.get_pgp_num());
+ }
+ p.unset_flag(pg_pool_t::FLAG_CRIMSON_ALLOW_PG_MERGE);
+ p.last_pg_merge_meta = pg_merge_meta_t{};
+ p.last_change = pending_inc.epoch;
+ p.last_force_op_resend_prenautilus = pending_inc.epoch;
+
+ pending_inc.new_pools[m->pool] = p;
+ wait_for_finished_proposal(op, new C_ReplyMap(this, op, m->version));
+ return true;
+}
// -------------
// pg_temp changes
bool preprocess_pg_ready_to_merge(MonOpRequestRef op);
bool prepare_pg_ready_to_merge(MonOpRequestRef op);
+ bool preprocess_pg_stop_merge(MonOpRequestRef op);
+ bool prepare_pg_stop_merge(MonOpRequestRef op);
+
int _check_remove_pool(int64_t pool_id, const pg_pool_t &pool, std::ostream *ss);
bool _check_become_tier(
int64_t tier_pool_id, const pg_pool_t *tier_pool,
#include "messages/MOSDPGRecoveryDelete.h"
#include "messages/MOSDPGRecoveryDeleteReply.h"
#include "messages/MOSDPGReadyToMerge.h"
+#include "messages/MOSDPGStopMerge.h"
#include "messages/MRemoveSnaps.h"
case MSG_OSD_PG_READY_TO_MERGE:
m = make_message<MOSDPGReadyToMerge>();
break;
+ case MSG_OSD_PG_STOP_MERGE:
+ m = make_message<MOSDPGStopMerge>();
+ break;
case MSG_OSD_EC_WRITE:
m = make_message<MOSDECSubOpWrite>();
break;
#define MSG_OSD_SCRUB2 121
#define MSG_OSD_PG_READY_TO_MERGE 122
+#define MSG_OSD_PG_STOP_MERGE 124
#define MSG_OSD_PG_LEASE 133
#define MSG_OSD_PG_LEASE_ACK 134
class MOSDPGPushReply;
class MOSDPGQuery;
class MOSDPGReadyToMerge;
+class MOSDPGStopMerge;
class MOSDPGRecoveryDelete;
class MOSDPGRecoveryDeleteReply;
class MOSDPGRemove;