case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
[[fallthrough]];
case MSG_OSD_PG_SCAN:
+ [[fallthrough]];
+ case MSG_OSD_PG_BACKFILL:
return handle_recovery_subreq(conn, boost::static_pointer_cast<MOSDFastDispatchOp>(m));
case MSG_OSD_PG_LEASE:
[[fallthrough]];
return peering_state.get_min_last_complete_ondisk();
}
- const pg_info_t& get_info() const {
+ const pg_info_t& get_info() const final {
return peering_state.get_info();
}
void PGRecovery::update_peers_last_backfill(
const hobject_t& new_last_backfill)
{
- ceph_abort_msg("Not implemented");
+ logger().debug("{}: new_last_backfill={}",
+ __func__, new_last_backfill);
+ // If new_last_backfill == MAX, then we will send OP_BACKFILL_FINISH to
+ // all the backfill targets. Otherwise, we will move last_backfill up on
+ // those targets need it and send OP_BACKFILL_PROGRESS to them.
+ for (const auto& bt : pg->get_peering_state().get_backfill_targets()) {
+ if (const pg_info_t& pinfo = pg->get_peering_state().get_peer_info(bt);
+ new_last_backfill > pinfo.last_backfill) {
+ pg->get_peering_state().update_peer_last_backfill(bt, new_last_backfill);
+ auto m = make_message<MOSDPGBackfill>(
+ pinfo.last_backfill.is_max() ? MOSDPGBackfill::OP_BACKFILL_FINISH
+ : MOSDPGBackfill::OP_BACKFILL_PROGRESS,
+ pg->get_osdmap_epoch(),
+ pg->get_last_peering_reset(),
+ spg_t(pg->get_pgid().pgid, bt.shard));
+ // Use default priority here, must match sub_op priority
+ // TODO: if pinfo.last_backfill.is_max(), then
+ // start_recovery_op(hobject_t::get_max());
+ m->last_backfill = pinfo.last_backfill;
+ m->stats = pinfo.stats;
+ std::ignore = pg->get_shard_services().send_to_osd(
+ bt.osd, std::move(m), pg->get_osdmap_epoch());
+ logger().info("{}: peer {} num_objects now {} / {}",
+ __func__,
+ bt,
+ pinfo.stats.stats.sum.num_objects,
+ pg->get_info().stats.stats.sum.num_objects);
+ }
+ }
}
bool PGRecovery::budget_available() const
void PGRecovery::backfilled()
{
- shard_services.start_operation<LocalPeeringEvent>(
- this,
- shard_services,
- pg_whoami,
- pgid,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
+ using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
+ std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
+ static_cast<crimson::osd::PG*>(pg),
+ pg->get_shard_services(),
+ pg->get_pg_whoami(),
+ pg->get_pgid(),
+ pg->get_osdmap_epoch(),
+ pg->get_osdmap_epoch(),
PeeringState::Backfilled{});
}
virtual bool has_reset_since(epoch_t) const = 0;
virtual std::vector<pg_shard_t> get_replica_recovery_order() const = 0;
virtual epoch_t get_last_peering_reset() const = 0;
+ virtual const pg_info_t& get_info() const= 0;
virtual seastar::future<> stop() = 0;
};
}
}
+void RecoveryBackend::handle_backfill_finish(
+ MOSDPGBackfill& m)
+{
+ logger().debug("{}", __func__);
+ ceph_assert(!pg.is_primary());
+ ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 1);
+ auto reply = make_message<MOSDPGBackfill>(
+ MOSDPGBackfill::OP_BACKFILL_FINISH_ACK,
+ pg.get_osdmap_epoch(),
+ m.query_epoch,
+ spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
+ reply->set_priority(pg.get_recovery_op_priority());
+ std::ignore = m.get_connection()->send(std::move(reply));
+ shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
+ static_cast<crimson::osd::PG*>(&pg),
+ shard_services,
+ pg.get_pg_whoami(),
+ pg.get_pgid(),
+ pg.get_osdmap_epoch(),
+ pg.get_osdmap_epoch(),
+ RecoveryDone{});
+}
+
+seastar::future<> RecoveryBackend::handle_backfill_progress(
+ MOSDPGBackfill& m)
+{
+ logger().debug("{}", __func__);
+ ceph_assert(!pg.is_primary());
+ ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 2);
+
+ ObjectStore::Transaction t;
+ pg.get_peering_state().update_backfill_progress(
+ m.last_backfill,
+ m.stats,
+ m.op == MOSDPGBackfill::OP_BACKFILL_PROGRESS,
+ t);
+ return shard_services.get_store().do_transaction(
+ pg.get_collection_ref(), std::move(t)
+ ).handle_exception([] (auto) {
+ ceph_assert("this transaction shall not fail" == nullptr);
+ });
+}
+
+seastar::future<> RecoveryBackend::handle_backfill_finish_ack(
+ MOSDPGBackfill& m)
+{
+ logger().debug("{}", __func__);
+ ceph_assert(pg.is_primary());
+ ceph_assert(crimson::common::local_conf()->osd_kill_backfill_at != 3);
+ // TODO:
+ // finish_recovery_op(hobject_t::get_max());
+ return seastar::now();
+}
+
+seastar::future<> RecoveryBackend::handle_backfill(
+ MOSDPGBackfill& m)
+{
+ logger().debug("{}", __func__);
+ switch (m.op) {
+ case MOSDPGBackfill::OP_BACKFILL_FINISH:
+ handle_backfill_finish(m);
+ [[fallthrough]];
+ case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
+ return handle_backfill_progress(m);
+ case MOSDPGBackfill::OP_BACKFILL_FINISH_ACK:
+ return handle_backfill_finish_ack(m);
+ default:
+ ceph_assert("unknown op type for pg backfill");
+ return seastar::now();
+ }
+}
+
seastar::future<BackfillInterval> RecoveryBackend::scan_for_backfill(
const hobject_t& start,
[[maybe_unused]] const std::int64_t min,
Ref<MOSDFastDispatchOp> m)
{
switch (m->get_header().type) {
+ case MSG_OSD_PG_BACKFILL:
+ return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m));
case MSG_OSD_PG_SCAN:
return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m));
default:
#include "crimson/osd/object_context.h"
#include "crimson/osd/shard_services.h"
+#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDPGScan.h"
#include "osd/recovery_types.h"
#include "osd/osd_types.h"
class PGBackend;
class RecoveryBackend {
+ void handle_backfill_finish(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill_progress(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill_finish_ack(
+ MOSDPGBackfill& m);
+ seastar::future<> handle_backfill(MOSDPGBackfill& m);
+
seastar::future<> handle_scan_get_digest(
MOSDPGScan& m);
seastar::future<> handle_scan_digest(