_send_ready_to_merge();
}
+void OSDService::set_not_ready_to_merge_source(pg_t pgid)
+{
+ Mutex::Locker l(merge_lock);
+ dout(10) << __func__ << " " << pgid << dendl;
+ not_ready_to_merge_source.insert(pgid);
+ _send_ready_to_merge();
+}
+
void OSDService::send_ready_to_merge()
{
Mutex::Locker l(merge_lock);
void OSDService::_send_ready_to_merge()
{
+ for (auto src : not_ready_to_merge_source) {
+ if (sent_ready_to_merge_source.count(src) == 0) {
+ monc->send_mon_message(new MOSDPGReadyToMerge(
+ src,
+ 0,
+ false,
+ osdmap->get_epoch()));
+ sent_ready_to_merge_source.insert(src);
+ }
+ }
for (auto src : ready_to_merge_source) {
+ if (not_ready_to_merge_source.count(src)) {
+ continue;
+ }
auto p = ready_to_merge_target.find(src.get_parent());
if (p != ready_to_merge_target.end() &&
sent_ready_to_merge_source.count(src) == 0) {
dout(10) << __func__ << " " << pg->pg_id << dendl;
ready_to_merge_source.erase(pg->pg_id.pgid);
ready_to_merge_target.erase(pg->pg_id.pgid);
+ not_ready_to_merge_source.erase(pg->pg_id.pgid);
}
void OSDService::clear_sent_ready_to_merge()
Mutex merge_lock = {"OSD::merge_lock"};
set<pg_t> ready_to_merge_source;
map<pg_t,epoch_t> ready_to_merge_target; // pg -> last_epoch_clean
+ set<pg_t> not_ready_to_merge_source;
set<pg_t> sent_ready_to_merge_source;
void set_ready_to_merge_source(PG *pg);
void set_ready_to_merge_target(PG *pg, epoch_t last_epoch_clean);
+ void set_not_ready_to_merge_source(pg_t pgid);
void clear_ready_to_merge(PG *pg);
void send_ready_to_merge();
void _send_ready_to_merge();
if (pool.info.is_pending_merge(info.pgid.pgid, &target)) {
if (target) {
ldout(cct, 10) << "ready to merge (target)" << dendl;
- osd->set_ready_to_merge_target(this,
- info.history.last_epoch_clean);
+ osd->set_ready_to_merge_target(this, info.history.last_epoch_clean);
} else {
ldout(cct, 10) << "ready to merge (source)" << dendl;
osd->set_ready_to_merge_source(this);
}
}
} else {
-#warning we should back off the merge!
+ ldout(cct, 10) << "not clean, not ready to merge" << dendl;
+ // we should have notified OSD in Active state entry point
}
}
boost::statechart::result PG::RecoveryState::Active::react(const AllReplicasActivated &evt)
{
PG *pg = context< RecoveryMachine >().pg;
+ pg_t pgid = pg->info.pgid.pgid;
+
all_replicas_activated = true;
pg->state_clear(PG_STATE_ACTIVATING);
pg->state_clear(PG_STATE_CREATING);
pg->state_clear(PG_STATE_PREMERGE);
- if (pg->acting.size() < pg->pool.info.min_size) {
- pg->state_set(PG_STATE_PEERED);
- } else if (pg->pool.info.is_pending_merge(pg->info.pgid.pgid, nullptr)) {
+ bool merge_target;
+ if (pg->pool.info.is_pending_merge(pgid, &merge_target)) {
pg->state_set(PG_STATE_PEERED);
pg->state_set(PG_STATE_PREMERGE);
+
+ if (pg->actingset.size() != pg->get_osdmap()->get_pg_size(pgid)) {
+ if (merge_target) {
+ pg_t src = pgid;
+ src.set_ps(pg->pool.info.get_pg_num_pending());
+ assert(src.get_parent() == pgid);
+ pg->osd->set_not_ready_to_merge_source(src);
+ } else {
+ pg->osd->set_not_ready_to_merge_source(pgid);
+ }
+ }
+ } else if (pg->acting.size() < pg->pool.info.min_size) {
+ pg->state_set(PG_STATE_PEERED);
} else {
pg->state_set(PG_STATE_ACTIVE);
}
// info.last_epoch_started is set during activate()
if (pg->info.history.last_epoch_started == 0) {
- pg->osd->send_pg_created(pg->info.pgid.pgid);
+ pg->osd->send_pg_created(pgid);
}
+
pg->info.history.last_epoch_started = pg->info.last_epoch_started;
pg->info.history.last_interval_started = pg->info.last_interval_started;
pg->dirty_info = true;