#include "messages/MOSDOp.h"
#include "messages/MOSDPGNotify.h"
// #include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
if (c == finish_sync_event) {
dout(10) << "_finish_recovery" << dendl;
finish_sync_event = 0;
- purge_strays();
+ recovery_state.purge_strays();
publish_stats_to_osd();
clear_recovery_state();
}
-
-void PG::purge_strays()
-{
- if (is_premerge()) {
- dout(10) << "purge_strays " << stray_set << " but premerge, doing nothing"
- << dendl;
- return;
- }
- if (cct->_conf.get_val<bool>("osd_debug_no_purge_strays")) {
- return;
- }
- dout(10) << "purge_strays " << stray_set << dendl;
-
- bool removed = false;
- for (set<pg_shard_t>::iterator p = stray_set.begin();
- p != stray_set.end();
- ++p) {
- ceph_assert(!is_acting_recovery_backfill(*p));
- if (get_osdmap()->is_up(p->osd)) {
- dout(10) << "sending PGRemove to osd." << *p << dendl;
- vector<spg_t> to_remove;
- to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
- MOSDPGRemove *m = new MOSDPGRemove(
- get_osdmap_epoch(),
- to_remove);
- osd->send_message_osd_cluster(p->osd, m, get_osdmap_epoch());
- } else {
- dout(10) << "not sending PGRemove to down osd." << *p << dendl;
- }
- peer_missing.erase(*p);
- peer_info.erase(*p);
- peer_purged.insert(*p);
- removed = true;
- }
-
- // if we removed anyone, update peers (which include peer_info)
- if (removed)
- recovery_state.update_heartbeat_peers();
-
- stray_set.clear();
-
- // clear _requested maps; we may have to peer() again if we discover
- // (more) stray content
- peer_log_requested.clear();
- peer_missing_requested.clear();
-}
-
void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
{
std::lock_guard l(heartbeat_peer_lock);
}
}
+void PG::send_cluster_message(int target, Message *m, epoch_t epoch)
+{
+ osd->send_message_osd_cluster(target, m, epoch);
+}
+
void PG::clear_probe_targets()
{
std::lock_guard l(heartbeat_peer_lock);
}
}
-void PG::update_history(const pg_history_t& new_history)
-{
- unreg_next_scrub();
- if (info.history.merge(new_history)) {
- dout(20) << __func__ << " advanced history from " << new_history << dendl;
- dirty_info = true;
- if (info.history.last_epoch_clean >= info.history.same_interval_since) {
- dout(20) << __func__ << " clearing past_intervals" << dendl;
- past_intervals.clear();
- dirty_big_info = true;
- }
- }
- reg_next_scrub();
-}
-
void PG::fulfill_info(
pg_shard_t from, const pg_query_t &query,
pair<pg_shard_t, pg_info_t> ¬ify_info)
void finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t);
void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
- void reg_next_scrub();
- void unreg_next_scrub();
+
+ void reg_next_scrub() override;
+ void unreg_next_scrub() override;
bool is_forced_recovery_or_backfill() const {
return get_state() & (PG_STATE_FORCED_RECOVERY | PG_STATE_FORCED_BACKFILL);
public:
bool dne() { return info.dne(); }
+ virtual void send_cluster_message(int osd, Message *m, epoch_t epoch);
+
protected:
bool need_up_thru; ///< Flag indicating that this pg needs up through published
void start_flush(ObjectStore::Transaction *t);
void set_last_peering_reset();
- void update_history(const pg_history_t& history);
+ void update_history(const pg_history_t& history) {
+ recovery_state.update_history(history);
+ }
void fulfill_info(pg_shard_t from, const pg_query_t &query,
pair<pg_shard_t, pg_info_t> ¬ify_info);
void fulfill_log(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch);
#include "PG.h"
#include "OSD.h"
+#include "messages/MOSDPGRemove.h"
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDScrubReserve.h"
machine.initiate();
}
+void PeeringState::update_history(const pg_history_t& new_history)
+{
+ pl->unreg_next_scrub();
+ if (info.history.merge(new_history)) {
+ psdout(20) << __func__ << " advanced history from " << new_history << dendl;
+ dirty_info = true;
+ if (info.history.last_epoch_clean >= info.history.same_interval_since) {
+ psdout(20) << __func__ << " clearing past_intervals" << dendl;
+ past_intervals.clear();
+ dirty_big_info = true;
+ }
+ }
+ pl->reg_next_scrub();
+}
+
+void PeeringState::purge_strays()
+{
+ if (is_premerge()) {
+ psdout(10) << "purge_strays " << stray_set << " but premerge, doing nothing"
+ << dendl;
+ return;
+ }
+ if (cct->_conf.get_val<bool>("osd_debug_no_purge_strays")) {
+ return;
+ }
+ psdout(10) << "purge_strays " << stray_set << dendl;
+
+ bool removed = false;
+ for (set<pg_shard_t>::iterator p = stray_set.begin();
+ p != stray_set.end();
+ ++p) {
+ ceph_assert(!is_acting_recovery_backfill(*p));
+ if (get_osdmap()->is_up(p->osd)) {
+ dout(10) << "sending PGRemove to osd." << *p << dendl;
+ vector<spg_t> to_remove;
+ to_remove.push_back(spg_t(info.pgid.pgid, p->shard));
+ MOSDPGRemove *m = new MOSDPGRemove(
+ get_osdmap_epoch(),
+ to_remove);
+ pl->send_cluster_message(p->osd, m, get_osdmap_epoch());
+ } else {
+ dout(10) << "not sending PGRemove to down osd." << *p << dendl;
+ }
+ peer_missing.erase(*p);
+ peer_info.erase(*p);
+ peer_purged.insert(*p);
+ removed = true;
+ }
+
+ // if we removed anyone, update peers (which include peer_info)
+ if (removed)
+ update_heartbeat_peers();
+
+ stray_set.clear();
+
+ // clear _requested maps; we may have to peer() again if we discover
+ // (more) stray content
+ peer_log_requested.clear();
+ peer_missing_requested.clear();
+}
+
+
bool PeeringState::proc_replica_info(
pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch)
{
peer_info[from] = oinfo;
might_have_unfound.insert(from);
- pg->update_history(oinfo.history);
+ update_history(oinfo.history);
// stray?
if (!is_up(from) && !is_acting(from)) {
dout(10) << " osd." << from << " has stray content: " << oinfo << dendl;
stray_set.insert(from);
if (is_clean()) {
- pg->purge_strays();
+ purge_strays();
}
}
virtual void update_store_with_options(const pool_opts_t &opts) = 0;
virtual void update_heartbeat_peers(set<int> peers) = 0;
+ virtual void reg_next_scrub() = 0;
+ virtual void unreg_next_scrub() = 0;
+
+ virtual void send_cluster_message(int osd, Message *m, epoch_t epoch) = 0;
+
virtual void on_pool_change() = 0;
virtual void on_role_change() = 0;
virtual void on_change(ObjectStore::Transaction *t) = 0;
bool proc_replica_info(
pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch);
void remove_down_peer_info(const OSDMapRef &osdmap);
+ void purge_strays();
+ void update_history(const pg_history_t& new_history);
public:
PeeringState(