backend->got_rep_op_reply(m);
}
+crimson::osd::blocking_future<bool> PG::start_recovery_ops(size_t max_to_start)
+{
+ assert(is_primary());
+ assert(is_peered());
+ assert(!peering_state.is_deleting());
+
+ if (!is_recovering() && !is_backfilling()) {
+ return crimson::osd::make_ready_blocking_future<bool>(false);
+ }
+
+ std::vector<crimson::osd::blocking_future<>> started;
+ started.reserve(max_to_start);
+ max_to_start -= start_primary_recovery_ops(max_to_start, &started);
+ if (max_to_start > 0) {
+ max_to_start -= start_replica_recovery_ops(max_to_start, &started);
+ }
+ if (max_to_start > 0) {
+ max_to_start -= start_backfill_ops(max_to_start, &started);
+ }
+
+ bool done = max_to_start == 0;
+ return crimson::osd::join_blocking_futures(std::move(started)).then([this, done] {
+ return seastar::make_ready_future<bool>(done);
+ });
+}
+
+size_t PG::start_primary_recovery_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out)
+{
+ if (!is_recovering()) {
+ return 0;
+ }
+
+ if (!peering_state.have_missing()) {
+ peering_state.local_recovery_complete();
+ return 0;
+ }
+
+ const auto &missing = peering_state.get_pg_log().get_missing();
+
+ logger().info(
+ "{} recovering {} in pg {}, missing {}",
+ __func__,
+ backend->total_recovering(),
+ *this,
+ missing);
+
+ unsigned started = 0;
+ int skipped = 0;
+
+ map<version_t, hobject_t>::const_iterator p =
+ missing.get_rmissing().lower_bound(peering_state.get_pg_log().get_log().last_requested);
+ while (started < max_to_start && p != missing.get_rmissing().end()) {
+ // TODO: chain futures here to enable yielding to scheduler?
+ hobject_t soid;
+ version_t v = p->first;
+
+ auto it_objects = peering_state.get_pg_log().get_log().objects.find(p->second);
+ if (it_objects != peering_state.get_pg_log().get_log().objects.end()) {
+ // look at log!
+ pg_log_entry_t *latest = it_objects->second;
+ assert(latest->is_update() || latest->is_delete());
+ soid = latest->soid;
+ } else {
+ soid = p->second;
+ }
+ const pg_missing_item& item = missing.get_items().find(p->second)->second;
+ ++p;
+
+ hobject_t head = soid.get_head();
+
+ logger().info(
+ "{} {} item.need {} {} {} {} {}",
+ __func__,
+ soid,
+ item.need,
+ missing.is_missing(soid) ? " (missing)":"",
+ missing.is_missing(head) ? " (missing head)":"",
+ backend->is_recovering(soid) ? " (recovering)":"",
+ backend->is_recovering(head) ? " (recovering head)":"");
+
+ // TODO: handle lost/unfound
+ if (!backend->is_recovering(soid)) {
+ if (backend->is_recovering(head)) {
+ ++skipped;
+ } else {
+ auto futopt = recover_missing(soid, item.need);
+ if (futopt) {
+ out->push_back(std::move(*futopt));
+ ++started;
+ } else {
+ ++skipped;
+ }
+ }
+ }
+
+ if (!skipped)
+ peering_state.set_last_requested(v);
+ }
+
+ logger().info(
+ "{} started {} skipped {}",
+ __func__,
+ started,
+ skipped);
+
+ return started;
+}
+
+std::vector<pg_shard_t> PG::get_replica_recovery_order() const
+{
+ std::vector<std::pair<unsigned int, pg_shard_t>> replicas_by_num_missing,
+ async_by_num_missing;
+ replicas_by_num_missing.reserve(
+ peering_state.get_acting_recovery_backfill().size() - 1);
+ for (auto &p: peering_state.get_acting_recovery_backfill()) {
+ if (p == peering_state.get_primary()) {
+ continue;
+ }
+ auto pm = peering_state.get_peer_missing().find(p);
+ assert(pm != peering_state.get_peer_missing().end());
+ auto nm = pm->second.num_missing();
+ if (nm != 0) {
+ if (peering_state.is_async_recovery_target(p)) {
+ async_by_num_missing.push_back(make_pair(nm, p));
+ } else {
+ replicas_by_num_missing.push_back(make_pair(nm, p));
+ }
+ }
+ }
+ // sort by number of missing objects, in ascending order.
+ auto func = [](const std::pair<unsigned int, pg_shard_t> &lhs,
+ const std::pair<unsigned int, pg_shard_t> &rhs) {
+ return lhs.first < rhs.first;
+ };
+ // acting goes first
+ std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func);
+ // then async_recovery_targets
+ std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func);
+ replicas_by_num_missing.insert(replicas_by_num_missing.end(),
+ async_by_num_missing.begin(), async_by_num_missing.end());
+
+ std::vector<pg_shard_t> ret;
+ ret.reserve(replicas_by_num_missing.size());
+ for (auto p : replicas_by_num_missing) {
+ ret.push_back(p.second);
+ }
+ return ret;
+}
+
+size_t PG::start_replica_recovery_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out)
+{
+ if (!is_recovering()) {
+ return 0;
+ }
+ uint64_t started = 0;
+
+ assert(!peering_state.get_acting_recovery_backfill().empty());
+
+ auto recovery_order = get_replica_recovery_order();
+ for (auto &peer: recovery_order) {
+ assert(peer != peering_state.get_primary());
+ auto pm = peering_state.get_peer_missing().find(peer);
+ assert(pm != peering_state.get_peer_missing().end());
+
+ size_t m_sz = pm->second.num_missing();
+
+ logger().debug("{}: peer osd.{} missing {} objects", __func__, peer, m_sz);
+ logger().trace("{}: peer osd.{} missing {}", __func__,
+ peer, pm->second.get_items());
+
+ // recover oldest first
+ const pg_missing_t &m(pm->second);
+ for (auto p = m.get_rmissing().begin();
+ p != m.get_rmissing().end() && started < max_to_start;
+ ++p) {
+ const auto &soid = p->second;
+
+ if (peering_state.get_missing_loc().is_unfound(soid)) {
+ logger().debug("{}: object {} still unfound", __func__, soid);
+ continue;
+ }
+
+ const pg_info_t &pi = peering_state.get_peer_info(peer);
+ if (soid > pi.last_backfill) {
+ if (!backend->is_recovering(soid)) {
+ logger().error(
+ "{}: object {} in missing set for backfill (last_backfill {})"
+ " but not in recovering",
+ __func__,
+ soid,
+ pi.last_backfill);
+ ceph_abort();
+ }
+ continue;
+ }
+
+ if (backend->is_recovering(soid)) {
+ logger().debug("{}: already recovering object {}", __func__, soid);
+ continue;
+ }
+
+ if (peering_state.get_missing_loc().is_deleted(soid)) {
+ logger().debug("{}: soid {} is a delete, removing", __func__, soid);
+ map<hobject_t,pg_missing_item>::const_iterator r =
+ m.get_items().find(soid);
+ started += prep_object_replica_deletes(
+ soid, r->second.need, out);
+ continue;
+ }
+
+ if (soid.is_snap() &&
+ peering_state.get_pg_log().get_missing().is_missing(
+ soid.get_head())) {
+ logger().debug("{}: head {} still missing on primary",
+ __func__, soid.get_head());
+ continue;
+ }
+
+ if (peering_state.get_pg_log().get_missing().is_missing(soid)) {
+ logger().debug("{}: soid {} still missing on primary", __func__, soid);
+ continue;
+ }
+
+ logger().debug(
+ "{}: recover_object_replicas({})",
+ __func__,
+ soid);
+ map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(
+ soid);
+ started += prep_object_replica_pushes(
+ soid, r->second.need, out);
+ }
+ }
+
+ return started;
+}
+
+size_t PG::start_backfill_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out)
+{
+ logger().debug(
+ "{}({}): bft={} lbs={} {}",
+ __func__,
+ peering_state.get_backfill_targets(),
+ last_backfill_started,
+ new_backfill ? "" : "new_backfill");
+ assert(!peering_state.get_backfill_targets().empty());
+
+ ceph_abort("not implemented!");
+}
+
+std::optional<crimson::osd::blocking_future<>> PG::recover_missing(
+ const hobject_t &hoid, eversion_t need)
+{
+ return std::nullopt;
+}
+
+
}
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/replicated_request.h"
+#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/osdmap_gate.h"
return *backend;
}
+
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
return 0;
}
+ void start_background_recovery(
+ crimson::osd::scheduler::scheduler_class_t klass) {
+ shard_services.start_operation<BackgroundRecovery>(
+ this,
+ shard_services,
+ get_osdmap_epoch(),
+ klass);
+ }
void on_backfill_reserved() final {
- ceph_assert(0 == "Not implemented");
+ start_background_recovery(
+ crimson::osd::scheduler::scheduler_class_t::background_best_effort);
}
void on_backfill_canceled() final {
ceph_assert(0 == "Not implemented");
}
void on_recovery_reserved() final {
- ceph_assert(0 == "Not implemented");
+ start_background_recovery(
+ crimson::osd::scheduler::scheduler_class_t::background_recovery);
}
bool is_primary() const {
return peering_state.is_primary();
}
+ bool is_peered() const {
+ return peering_state.is_peered();
+ }
+ bool is_recovering() const {
+ return peering_state.is_recovering();
+ }
+ bool is_backfilling() const {
+ return peering_state.is_backfilling();
+ }
pg_stat_t get_stats() {
auto stats = peering_state.prepare_stats_for_publish(
false,
PeeringState peering_state;
eversion_t projected_last_update;
+public:
+ bool has_reset_since(epoch_t epoch) const {
+ return peering_state.pg_has_reset_since(epoch);
+ }
+private:
class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
PG *pg;
friend class PGAdvanceMap;
friend class PeeringEvent;
friend class RepRequest;
-};
+public:
+ crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+private:
+ seastar::future<bool> find_unfound() {
+ return seastar::make_ready_future<bool>(true);
+ }
+
+ bool new_backfill;
+ hobject_t last_backfill_started;
+
+ size_t start_primary_recovery_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out);
+ size_t start_replica_recovery_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out);
+ size_t start_backfill_ops(
+ size_t max_to_start,
+ std::vector<crimson::osd::blocking_future<>> *out);
+
+ std::vector<pg_shard_t> get_replica_recovery_order() const;
+ std::optional<crimson::osd::blocking_future<>> recover_missing(
+ const hobject_t &hoid, eversion_t need);
+
+ size_t prep_object_replica_deletes(
+ const hobject_t& soid,
+ eversion_t need,
+ std::vector<crimson::osd::blocking_future<>> *in_progress);
+
+ size_t prep_object_replica_pushes(
+ const hobject_t& soid,
+ eversion_t need,
+ std::vector<crimson::osd::blocking_future<>> *in_progress) {
+ return 0;
+ }
std::ostream& operator<<(std::ostream&, const PG& pg);