}
}
-bool PG::append_log_entries_update_missing(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to)
-{
- ceph_assert(!entries.empty());
- ceph_assert(entries.begin()->version > info.last_update);
-
- PGLogEntryHandler rollbacker{this, &t};
- bool invalidate_stats =
- pg_log.append_new_log_entries(info.last_backfill,
- info.last_backfill_bitwise,
- entries,
- &rollbacker);
-
- if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
- pg_log.roll_forward(&rollbacker);
- }
- if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
- pg_log.roll_forward_to(*roll_forward_to, &rollbacker);
- last_rollback_info_trimmed_to_applied = *roll_forward_to;
- }
-
- info.last_update = pg_log.get_head();
-
- if (pg_log.get_missing().num_missing() == 0) {
- // advance last_complete since nothing else is missing!
- info.last_complete = info.last_update;
- }
- info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
-
- dout(20) << __func__ << " trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
- if (trim_to)
- pg_log.trim(*trim_to, info);
- dirty_info = true;
- write_if_dirty(t);
- return invalidate_stats;
-}
-
-
-void PG::merge_new_log_entries(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t,
- boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to)
-{
- dout(10) << __func__ << " " << entries << dendl;
- ceph_assert(is_primary());
-
- bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
- for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
- i != acting_recovery_backfill.end();
- ++i) {
- pg_shard_t peer(*i);
- if (peer == pg_whoami) continue;
- ceph_assert(peer_missing.count(peer));
- ceph_assert(peer_info.count(peer));
- pg_missing_t& pmissing(peer_missing[peer]);
- dout(20) << __func__ << " peer_missing for " << peer << " = " << pmissing << dendl;
- pg_info_t& pinfo(peer_info[peer]);
- bool invalidate_stats = PGLog::append_log_entries_update_missing(
- pinfo.last_backfill,
- info.last_backfill_bitwise,
- entries,
- true,
- NULL,
- pmissing,
- NULL,
- this);
- pinfo.last_update = info.last_update;
- pinfo.stats.stats_invalid = pinfo.stats.stats_invalid || invalidate_stats;
- rebuild_missing = rebuild_missing || invalidate_stats;
- }
-
- if (!rebuild_missing) {
- return;
- }
-
- for (auto &&i: entries) {
- missing_loc.rebuild(
- i.soid,
- pg_whoami,
- acting_recovery_backfill,
- info,
- pg_log.get_missing(),
- peer_missing,
- peer_info);
- }
-}
-
bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
{
if (last_peering_reset > reply_epoch ||
bool queue_scrub();
unsigned get_scrub_priority();
- bool append_log_entries_update_missing(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t,
- boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to);
-
- /**
- * Merge entries updating missing as necessary on all
- * acting_recovery_backfill logs and missings (also missing_loc)
- */
- void merge_new_log_entries(
- const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t,
- boost::optional<eversion_t> trim_to,
- boost::optional<eversion_t> roll_forward_to);
-
bool try_flush_or_schedule_async() override;
void start_flush_on_transaction(
ObjectStore::Transaction *t) override;
}
}
+bool PeeringState::append_log_entries_update_missing(
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to)
+{
+ ceph_assert(!entries.empty());
+ ceph_assert(entries.begin()->version > info.last_update);
+
+ PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+ bool invalidate_stats =
+ pg_log.append_new_log_entries(
+ info.last_backfill,
+ info.last_backfill_bitwise,
+ entries,
+ rollbacker.get());
+
+ if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
+ pg_log.roll_forward(rollbacker.get());
+ }
+ if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
+ pg_log.roll_forward_to(*roll_forward_to, rollbacker.get());
+ last_rollback_info_trimmed_to_applied = *roll_forward_to;
+ }
+
+ info.last_update = pg_log.get_head();
+
+ if (pg_log.get_missing().num_missing() == 0) {
+ // advance last_complete since nothing else is missing!
+ info.last_complete = info.last_update;
+ }
+ info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
+
+ psdout(20) << __func__ << " trim_to bool = " << bool(trim_to)
+ << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
+ if (trim_to)
+ pg_log.trim(*trim_to, info);
+ dirty_info = true;
+ write_if_dirty(t);
+ return invalidate_stats;
+}
+
+void PeeringState::merge_new_log_entries(
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to)
+{
+ psdout(10) << __func__ << " " << entries << dendl;
+ ceph_assert(is_primary());
+
+ bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
+ for (set<pg_shard_t>::const_iterator i = acting_recovery_backfill.begin();
+ i != acting_recovery_backfill.end();
+ ++i) {
+ pg_shard_t peer(*i);
+ if (peer == pg_whoami) continue;
+ ceph_assert(peer_missing.count(peer));
+ ceph_assert(peer_info.count(peer));
+ pg_missing_t& pmissing(peer_missing[peer]);
+ psdout(20) << __func__ << " peer_missing for " << peer
+ << " = " << pmissing << dendl;
+ pg_info_t& pinfo(peer_info[peer]);
+ bool invalidate_stats = PGLog::append_log_entries_update_missing(
+ pinfo.last_backfill,
+ info.last_backfill_bitwise,
+ entries,
+ true,
+ NULL,
+ pmissing,
+ NULL,
+ dpp);
+ pinfo.last_update = info.last_update;
+ pinfo.stats.stats_invalid = pinfo.stats.stats_invalid || invalidate_stats;
+ rebuild_missing = rebuild_missing || invalidate_stats;
+ }
+
+ if (!rebuild_missing) {
+ return;
+ }
+
+ for (auto &&i: entries) {
+ missing_loc.rebuild(
+ i.soid,
+ pg_whoami,
+ acting_recovery_backfill,
+ info,
+ pg_log.get_missing(),
+ peer_missing,
+ peer_info);
+ }
+}
/*------------ Peering State Machine----------------*/
#undef dout_prefix
#define dout_prefix (context< PeeringMachine >().dpp->gen_prefix(*_dout) \
void update_blocked_by();
void update_calc_stats();
+
+ bool append_log_entries_update_missing(
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to);
+
+ /**
+ * Merge entries updating missing as necessary on all
+ * acting_recovery_backfill logs and missings (also missing_loc)
+ */
+ void merge_new_log_entries(
+ const mempool::osd_pglog::list<pg_log_entry_t> &entries,
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to);
public:
PeeringState(
CephContext *cct,
[this, entries, repop, on_complete]() {
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
+ recovery_state.merge_new_log_entries(
+ entries, t, pg_trim_to, min_last_complete_ondisk);
set<pg_shard_t> waiting_on;
dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
- append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+ recovery_state.append_log_entries_update_missing(
+ m->entries, t, op_trim_to, op_roll_forward_to);
eversion_t new_lcod = info.last_complete;
Context *complete = new FunctionContext(