From: Xuehan Xu Date: Wed, 4 Mar 2020 09:56:45 +0000 (+0800) Subject: crimson: add pg log based recovery machinery X-Git-Tag: v16.1.0~2488^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=566cd75748b2680d22924b85f00f5092b9c88372;p=ceph.git crimson: add pg log based recovery machinery Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index a4298c97fe7..ff5ba3aec6c 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -19,6 +19,9 @@ add_executable(crimson-osd osd_operations/pg_advance_map.cc osd_operations/replicated_request.cc osd_operations/background_recovery.cc + pg_recovery.cc + recovery_backend.cc + replicated_recovery_backend.cc scheduler/scheduler.cc scheduler/mclock_scheduler.cc osdmap_gate.cc diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 705b1af6b80..5148d46a65c 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -32,7 +32,7 @@ seastar::future BackgroundRecovery::do_recovery() if (pg->has_reset_since(epoch_started)) return seastar::make_ready_future(false); return with_blocking_future( - pg->start_recovery_ops( + pg->get_recovery_handler()->start_recovery_ops( crimson::common::local_conf()->osd_recovery_max_single_start)); } @@ -57,8 +57,14 @@ seastar::future<> BackgroundRecovery::start() IRef ref = this; return ss.throttler.with_throttle_while( - this, get_scheduler_params(), [ref, this] { + this, get_scheduler_params(), [this] { return do_recovery(); + }).handle_exception_type([ref, this](const std::system_error& err) { + if (err.code() == std::make_error_code(std::errc::interrupted)) { + logger().debug("{} recovery interruped: {}", *pg, err.what()); + return seastar::now(); + } + return seastar::make_exception_future<>(err); }); } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 47681a08c4e..8ee14f17741 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -37,6 +37,8 @@ #include "crimson/osd/ops_executer.h" #include "crimson/osd/osd_operations/osdop_params.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/pg_recovery.h" +#include "crimson/osd/replicated_recovery_backend.h" namespace { seastar::logger& logger() { @@ -98,6 +100,11 @@ PG::PG( coll_ref, shard_services, profile)), + recovery_backend( + std::make_unique( + *this, shard_services, coll_ref, backend.get())), + recovery_handler( + std::make_unique(this)), peering_state( shard_services.get_cct(), pg_shard, @@ -248,6 +255,8 @@ void PG::on_activate_complete() get_osdmap_epoch(), PeeringState::RequestBackfill{}); } else { + logger().debug("{}: no need to recover or backfill, AllReplicasRecovered", + " for pg: {}", __func__, pgid); shard_services.start_operation( this, shard_services, @@ -417,7 +426,7 @@ void PG::do_peering_event( PGPeeringEvent& evt, PeeringCtx &rctx) { if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) { - logger().debug("{} handling {}", __func__, evt.get_desc()); + logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid); do_peering_event(evt.get_event(), rctx); } else { logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc()); @@ -850,267 +859,4 @@ void PG::handle_rep_op_reply(crimson::net::Connection* conn, backend->got_rep_op_reply(m); } -crimson::osd::blocking_future PG::start_recovery_ops(size_t max_to_start) -{ - assert(is_primary()); - assert(is_peered()); - assert(!peering_state.is_deleting()); - - if (!is_recovering() && !is_backfilling()) { - return crimson::osd::make_ready_blocking_future(false); - } - - std::vector> started; - started.reserve(max_to_start); - max_to_start -= start_primary_recovery_ops(max_to_start, &started); - if (max_to_start > 0) { - max_to_start -= start_replica_recovery_ops(max_to_start, &started); - } - if (max_to_start > 0) { - max_to_start -= start_backfill_ops(max_to_start, &started); - } - - bool done = max_to_start == 0; - return crimson::osd::join_blocking_futures(std::move(started)).then([this, done] { - return seastar::make_ready_future(done); - }); -} - -size_t PG::start_primary_recovery_ops( - size_t max_to_start, - std::vector> *out) -{ - if (!is_recovering()) { - return 0; - } - - if (!peering_state.have_missing()) { - peering_state.local_recovery_complete(); - return 0; - } - - const auto &missing = peering_state.get_pg_log().get_missing(); - - logger().info( - "{} recovering {} in pg {}, missing {}", - __func__, - backend->total_recovering(), - *this, - missing); - - unsigned started = 0; - int skipped = 0; - - map::const_iterator p = - missing.get_rmissing().lower_bound(peering_state.get_pg_log().get_log().last_requested); - while (started < max_to_start && p != missing.get_rmissing().end()) { - // TODO: chain futures here to enable yielding to scheduler? - hobject_t soid; - version_t v = p->first; - - auto it_objects = peering_state.get_pg_log().get_log().objects.find(p->second); - if (it_objects != peering_state.get_pg_log().get_log().objects.end()) { - // look at log! - pg_log_entry_t *latest = it_objects->second; - assert(latest->is_update() || latest->is_delete()); - soid = latest->soid; - } else { - soid = p->second; - } - const pg_missing_item& item = missing.get_items().find(p->second)->second; - ++p; - - hobject_t head = soid.get_head(); - - logger().info( - "{} {} item.need {} {} {} {} {}", - __func__, - soid, - item.need, - missing.is_missing(soid) ? " (missing)":"", - missing.is_missing(head) ? " (missing head)":"", - backend->is_recovering(soid) ? " (recovering)":"", - backend->is_recovering(head) ? " (recovering head)":""); - - // TODO: handle lost/unfound - if (!backend->is_recovering(soid)) { - if (backend->is_recovering(head)) { - ++skipped; - } else { - auto futopt = recover_missing(soid, item.need); - if (futopt) { - out->push_back(std::move(*futopt)); - ++started; - } else { - ++skipped; - } - } - } - - if (!skipped) - peering_state.set_last_requested(v); - } - - logger().info( - "{} started {} skipped {}", - __func__, - started, - skipped); - - return started; -} - -std::vector PG::get_replica_recovery_order() const -{ - std::vector> replicas_by_num_missing, - async_by_num_missing; - replicas_by_num_missing.reserve( - peering_state.get_acting_recovery_backfill().size() - 1); - for (auto &p: peering_state.get_acting_recovery_backfill()) { - if (p == peering_state.get_primary()) { - continue; - } - auto pm = peering_state.get_peer_missing().find(p); - assert(pm != peering_state.get_peer_missing().end()); - auto nm = pm->second.num_missing(); - if (nm != 0) { - if (peering_state.is_async_recovery_target(p)) { - async_by_num_missing.push_back(make_pair(nm, p)); - } else { - replicas_by_num_missing.push_back(make_pair(nm, p)); - } - } - } - // sort by number of missing objects, in ascending order. - auto func = [](const std::pair &lhs, - const std::pair &rhs) { - return lhs.first < rhs.first; - }; - // acting goes first - std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func); - // then async_recovery_targets - std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func); - replicas_by_num_missing.insert(replicas_by_num_missing.end(), - async_by_num_missing.begin(), async_by_num_missing.end()); - - std::vector ret; - ret.reserve(replicas_by_num_missing.size()); - for (auto p : replicas_by_num_missing) { - ret.push_back(p.second); - } - return ret; -} - -size_t PG::start_replica_recovery_ops( - size_t max_to_start, - std::vector> *out) -{ - if (!is_recovering()) { - return 0; - } - uint64_t started = 0; - - assert(!peering_state.get_acting_recovery_backfill().empty()); - - auto recovery_order = get_replica_recovery_order(); - for (auto &peer: recovery_order) { - assert(peer != peering_state.get_primary()); - auto pm = peering_state.get_peer_missing().find(peer); - assert(pm != peering_state.get_peer_missing().end()); - - size_t m_sz = pm->second.num_missing(); - - logger().debug("{}: peer osd.{} missing {} objects", __func__, peer, m_sz); - logger().trace("{}: peer osd.{} missing {}", __func__, - peer, pm->second.get_items()); - - // recover oldest first - const pg_missing_t &m(pm->second); - for (auto p = m.get_rmissing().begin(); - p != m.get_rmissing().end() && started < max_to_start; - ++p) { - const auto &soid = p->second; - - if (peering_state.get_missing_loc().is_unfound(soid)) { - logger().debug("{}: object {} still unfound", __func__, soid); - continue; - } - - const pg_info_t &pi = peering_state.get_peer_info(peer); - if (soid > pi.last_backfill) { - if (!backend->is_recovering(soid)) { - logger().error( - "{}: object {} in missing set for backfill (last_backfill {})" - " but not in recovering", - __func__, - soid, - pi.last_backfill); - ceph_abort(); - } - continue; - } - - if (backend->is_recovering(soid)) { - logger().debug("{}: already recovering object {}", __func__, soid); - continue; - } - - if (peering_state.get_missing_loc().is_deleted(soid)) { - logger().debug("{}: soid {} is a delete, removing", __func__, soid); - map::const_iterator r = - m.get_items().find(soid); - started += prep_object_replica_deletes( - soid, r->second.need, out); - continue; - } - - if (soid.is_snap() && - peering_state.get_pg_log().get_missing().is_missing( - soid.get_head())) { - logger().debug("{}: head {} still missing on primary", - __func__, soid.get_head()); - continue; - } - - if (peering_state.get_pg_log().get_missing().is_missing(soid)) { - logger().debug("{}: soid {} still missing on primary", __func__, soid); - continue; - } - - logger().debug( - "{}: recover_object_replicas({})", - __func__, - soid); - map::const_iterator r = m.get_items().find( - soid); - started += prep_object_replica_pushes( - soid, r->second.need, out); - } - } - - return started; -} - -size_t PG::start_backfill_ops( - size_t max_to_start, - std::vector> *out) -{ - logger().debug( - "{}({}): bft={} lbs={} {}", - __func__, - peering_state.get_backfill_targets(), - last_backfill_started, - new_backfill ? "" : "new_backfill"); - assert(!peering_state.get_backfill_targets().empty()); - - ceph_abort("not implemented!"); -} - -std::optional> PG::recover_missing( - const hobject_t &hoid, eversion_t need) -{ - return std::nullopt; -} - - } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 9c43df8b419..e5f20c701b9 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -27,6 +27,9 @@ #include "crimson/osd/osd_operations/background_recovery.h" #include "crimson/osd/shard_services.h" #include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/pg_recovery.h" +#include "crimson/osd/pg_recovery_listener.h" +#include "crimson/osd/recovery_backend.h" class OSDMap; class MQuery; @@ -52,6 +55,7 @@ class ClientRequest; class PG : public boost::intrusive_ref_counter< PG, boost::thread_unsafe_counter>, + public PGRecoveryListener, PeeringState::PeeringListener, DoutPrefixProvider { @@ -82,11 +86,11 @@ public: ~PG(); - const pg_shard_t& get_pg_whoami() const { + const pg_shard_t& get_pg_whoami() const final { return pg_whoami; } - const spg_t& get_pgid() const { + const spg_t& get_pgid() const final { return pgid; } @@ -96,8 +100,6 @@ public: const PGBackend& get_backend() const { return *backend; } - - // EpochSource epoch_t get_osdmap_epoch() const final { return peering_state.get_osdmap_epoch(); @@ -199,7 +201,9 @@ public: start_peering_event_operation(std::move(*event)); }); } - + std::vector get_replica_recovery_order() const final { + return peering_state.get_replica_recovery_order(); + } void request_local_background_io_reservation( unsigned priority, PGPeeringEventRef on_grant, @@ -303,7 +307,7 @@ public: // Not needed yet } void on_change(ceph::os::Transaction &t) final { - // Not needed yet + recovery_backend->on_peering_interval_change(t); } void on_activate(interval_set to_trim) final; void on_activate_complete() final; @@ -344,24 +348,15 @@ public: return 0; } - void start_background_recovery( - crimson::osd::scheduler::scheduler_class_t klass) { - shard_services.start_operation( - this, - shard_services, - get_osdmap_epoch(), - klass); - } - void on_backfill_reserved() final { - start_background_recovery( + recovery_handler->start_background_recovery( crimson::osd::scheduler::scheduler_class_t::background_best_effort); } void on_backfill_canceled() final { ceph_assert(0 == "Not implemented"); } void on_recovery_reserved() final { - start_background_recovery( + recovery_handler->start_background_recovery( crimson::osd::scheduler::scheduler_class_t::background_recovery); } @@ -437,16 +432,16 @@ public: // Utility - bool is_primary() const { + bool is_primary() const final { return peering_state.is_primary(); } - bool is_peered() const { + bool is_peered() const final { return peering_state.is_peered(); } - bool is_recovering() const { + bool is_recovering() const final { return peering_state.is_recovering(); } - bool is_backfilling() const { + bool is_backfilling() const final { return peering_state.is_backfilling(); } pg_stat_t get_stats() { @@ -565,17 +560,69 @@ public: return eversion_t(projected_last_update.epoch, ++projected_last_update.version); } - + ShardServices& get_shard_services() final { + return shard_services; + } private: std::unique_ptr backend; + std::unique_ptr recovery_backend; + std::unique_ptr recovery_handler; PeeringState peering_state; eversion_t projected_last_update; public: - bool has_reset_since(epoch_t epoch) const { + RecoveryBackend* get_recovery_backend() final { + return recovery_backend.get(); + } + PGRecovery* get_recovery_handler() final { + return recovery_handler.get(); + } + PeeringState& get_peering_state() final { + return peering_state; + } + bool has_reset_since(epoch_t epoch) const final { return peering_state.pg_has_reset_since(epoch); } + const pg_missing_tracker_t& get_local_missing() const { + return peering_state.get_pg_log().get_missing(); + } + epoch_t get_last_peering_reset() const { + return peering_state.get_last_peering_reset(); + } + const set &get_acting_recovery_backfill() const { + return peering_state.get_acting_recovery_backfill(); + } + void begin_peer_recover(pg_shard_t peer, const hobject_t oid) { + peering_state.begin_peer_recover(peer, oid); + } + uint64_t min_peer_features() const { + return peering_state.get_min_peer_features(); + } + const map>& + get_missing_loc_shards() const { + return peering_state.get_missing_loc().get_missing_locs(); + } + const map &get_shard_missing() const { + return peering_state.get_peer_missing(); + } + const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const { + if (shard == pg_whoami) + return &get_local_missing(); + else { + auto it = peering_state.get_peer_missing().find(shard); + if (it == peering_state.get_peer_missing().end()) + return nullptr; + else + return &it->second; + } + } + int get_recovery_op_priority() const { + int64_t pri = 0; + get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri); + return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority; + } + private: class WaitForActiveBlocker : public BlockerT { PG *pg; @@ -599,41 +646,23 @@ private: friend class PGAdvanceMap; friend class PeeringEvent; friend class RepRequest; -public: - crimson::osd::blocking_future start_recovery_ops(size_t max_to_start); private: seastar::future find_unfound() { return seastar::make_ready_future(true); } - bool new_backfill; - hobject_t last_backfill_started; - - size_t start_primary_recovery_ops( - size_t max_to_start, - std::vector> *out); - size_t start_replica_recovery_ops( - size_t max_to_start, - std::vector> *out); - size_t start_backfill_ops( - size_t max_to_start, - std::vector> *out); - - std::vector get_replica_recovery_order() const; - std::optional> recover_missing( - const hobject_t &hoid, eversion_t need); - - size_t prep_object_replica_deletes( - const hobject_t& soid, - eversion_t need, - std::vector> *in_progress); - - size_t prep_object_replica_pushes( - const hobject_t& soid, - eversion_t need, - std::vector> *in_progress) { - return 0; + bool is_missing_object(const hobject_t& soid) const { + return peering_state.get_pg_log().get_missing().get_items().count(soid); + } + bool is_unreadable_object(const hobject_t &oid) const final { + return is_missing_object(oid) || + !peering_state.get_missing_loc().readable_with_acting( + oid, get_actingset()); + } + const set &get_actingset() const { + return peering_state.get_actingset(); } +}; std::ostream& operator<<(std::ostream&, const PG& pg); diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index 72f32f61881..a079ff5e999 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -15,11 +15,11 @@ #include "os/Transaction.h" #include "common/Clock.h" -#include "crimson/os/cyanstore/cyan_object.h" #include "crimson/os/futurized_collection.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/osd_operation.h" #include "replicated_backend.h" +#include "replicated_recovery_backend.h" #include "ec_backend.h" #include "exceptions.h" @@ -199,6 +199,7 @@ PGBackend::read(const object_info_t& oi, return _read(oi.soid, offset, length, flags).safe_then( [&oi](auto&& bl) -> read_errorator::future { if (const bool is_fine = _read_verify_data(oi, bl); is_fine) { + logger().debug("read: data length: {}", bl.length()); return read_errorator::make_ready_future(std::move(bl)); } else { return crimson::ct_error::object_corrupted::make(); @@ -480,6 +481,13 @@ maybe_get_omap_vals( } } +seastar::future PGBackend::omap_get_header( + crimson::os::CollectionRef& c, + const ghobject_t& oid) +{ + return store->omap_get_header(c, oid); +} + seastar::future<> PGBackend::omap_get_keys( const ObjectState& os, OSDOp& osd_op) const @@ -622,3 +630,21 @@ seastar::future<> PGBackend::omap_set_vals( os.oi.clear_omap_digest(); return seastar::now(); } + +seastar::future PGBackend::stat( + CollectionRef c, + const ghobject_t& oid) const +{ + return store->stat(c, oid); +} + +seastar::future> +PGBackend::fiemap( + CollectionRef c, + const ghobject_t& oid, + uint64_t off, + uint64_t len) +{ + return store->fiemap(c, oid, off, len); +} + diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index 13dfb30cff3..289c67f27bd 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -7,13 +7,16 @@ #include #include #include +#include #include "crimson/os/futurized_store.h" #include "crimson/os/futurized_collection.h" #include "crimson/osd/acked_peers.h" +#include "crimson/osd/pg.h" #include "crimson/common/shared_lru.h" #include "osd/osd_types.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/osdop_params.h" struct hobject_t; @@ -36,6 +39,8 @@ protected: using ll_read_errorator = crimson::os::FuturizedStore::read_errorator; public: + using load_metadata_ertr = crimson::errorator< + crimson::ct_error::object_corrupted>; PGBackend(shard_id_t shard, CollectionRef coll, crimson::os::FuturizedStore* store); virtual ~PGBackend() = default; static std::unique_ptr create(pg_t pgid, @@ -44,7 +49,8 @@ public: crimson::os::CollectionRef coll, crimson::osd::ShardServices& shard_services, const ec_profile_t& ec_profile); - + using attrs_t = + std::map>; using read_errorator = ll_read_errorator::extend< crimson::ct_error::object_corrupted>; read_errorator::future read( @@ -97,6 +103,14 @@ public: get_attr_errorator::future getxattr( const hobject_t& soid, std::string_view key) const; + seastar::future stat( + CollectionRef c, + const ghobject_t& oid) const; + seastar::future> fiemap( + CollectionRef c, + const ghobject_t& oid, + uint64_t off, + uint64_t len); // OMAP seastar::future<> omap_get_keys( @@ -112,17 +126,16 @@ public: ObjectState& os, const OSDOp& osd_op, ceph::os::Transaction& trans); + seastar::future omap_get_header( + crimson::os::CollectionRef& c, + const ghobject_t& oid); virtual void got_rep_op_reply(const MOSDRepOpReply&) {} - protected: const shard_id_t shard; CollectionRef coll; crimson::os::FuturizedStore* store; - public: - using load_metadata_ertr = crimson::errorator< - crimson::ct_error::object_corrupted>; struct loaded_object_md_t { ObjectState os; std::optional ss; @@ -146,4 +159,5 @@ private: const osd_op_params_t& osd_op_p, epoch_t min_epoch, epoch_t max_epoch, std::vector&& log_entries) = 0; + friend class ReplicatedRecoveryBackend; }; diff --git a/src/crimson/osd/pg_recovery.cc b/src/crimson/osd/pg_recovery.cc new file mode 100644 index 00000000000..af63df60dd8 --- /dev/null +++ b/src/crimson/osd/pg_recovery.cc @@ -0,0 +1,389 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include + +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operations/background_recovery.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_backend.h" +#include "crimson/osd/pg_recovery.h" + +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGRecoveryDelete.h" +#include "messages/MOSDPGRecoveryDeleteReply.h" + +#include "osd/osd_types.h" +#include "osd/PeeringState.h" + +void PGRecovery::start_background_recovery( + crimson::osd::scheduler::scheduler_class_t klass) +{ + using BackgroundRecovery = crimson::osd::BackgroundRecovery; + pg->get_shard_services().start_operation( + static_cast(pg), + pg->get_shard_services(), + pg->get_osdmap_epoch(), + klass); +} + +crimson::osd::blocking_future +PGRecovery::start_recovery_ops(size_t max_to_start) +{ + assert(pg->is_primary()); + assert(pg->is_peered()); + assert(!pg->get_peering_state().is_deleting()); + + if (!pg->is_recovering() && !pg->is_backfilling()) { + return crimson::osd::make_ready_blocking_future(false); + } + + std::vector> started; + started.reserve(max_to_start); + max_to_start -= start_primary_recovery_ops(max_to_start, &started); + if (max_to_start > 0) { + max_to_start -= start_replica_recovery_ops(max_to_start, &started); + } + if (max_to_start > 0) { + max_to_start -= start_backfill_ops(max_to_start, &started); + } + return crimson::osd::join_blocking_futures(std::move(started)).then( + [this] { + bool done = !pg->get_peering_state().needs_recovery(); + if (done) { + crimson::get_logger(ceph_subsys_osd).debug("start_recovery_ops: AllReplicasRecovered for pg: {}", + pg->get_pgid()); + using LocalPeeringEvent = crimson::osd::LocalPeeringEvent; + pg->get_shard_services().start_operation( + static_cast(pg), + pg->get_shard_services(), + pg->get_pg_whoami(), + pg->get_pgid(), + pg->get_osdmap_epoch(), + pg->get_osdmap_epoch(), + PeeringState::AllReplicasRecovered{}); + } + return seastar::make_ready_future(!done); + }); +} + +size_t PGRecovery::start_primary_recovery_ops( + size_t max_to_start, + std::vector> *out) +{ + if (!pg->is_recovering()) { + return 0; + } + + if (!pg->get_peering_state().have_missing()) { + pg->get_peering_state().local_recovery_complete(); + return 0; + } + + const auto &missing = pg->get_peering_state().get_pg_log().get_missing(); + + crimson::get_logger(ceph_subsys_osd).info( + "{} recovering {} in pg {}, missing {}", + __func__, + pg->get_recovery_backend()->total_recovering(), + *static_cast(pg), + missing); + + unsigned started = 0; + int skipped = 0; + + map::const_iterator p = + missing.get_rmissing().lower_bound(pg->get_peering_state().get_pg_log().get_log().last_requested); + while (started < max_to_start && p != missing.get_rmissing().end()) { + // TODO: chain futures here to enable yielding to scheduler? + hobject_t soid; + version_t v = p->first; + + auto it_objects = pg->get_peering_state().get_pg_log().get_log().objects.find(p->second); + if (it_objects != pg->get_peering_state().get_pg_log().get_log().objects.end()) { + // look at log! + pg_log_entry_t *latest = it_objects->second; + assert(latest->is_update() || latest->is_delete()); + soid = latest->soid; + } else { + soid = p->second; + } + const pg_missing_item& item = missing.get_items().find(p->second)->second; + ++p; + + hobject_t head = soid.get_head(); + + crimson::get_logger(ceph_subsys_osd).info( + "{} {} item.need {} {} {} {} {}", + __func__, + soid, + item.need, + missing.is_missing(soid) ? " (missing)":"", + missing.is_missing(head) ? " (missing head)":"", + pg->get_recovery_backend()->is_recovering(soid) ? " (recovering)":"", + pg->get_recovery_backend()->is_recovering(head) ? " (recovering head)":""); + + // TODO: handle lost/unfound + if (!pg->get_recovery_backend()->is_recovering(soid)) { + if (pg->get_recovery_backend()->is_recovering(head)) { + ++skipped; + } else { + auto futopt = recover_missing(soid, item.need); + if (futopt) { + out->push_back(std::move(*futopt)); + ++started; + } else { + ++skipped; + } + } + } + + if (!skipped) + pg->get_peering_state().set_last_requested(v); + } + + crimson::get_logger(ceph_subsys_osd).info( + "{} started {} skipped {}", + __func__, + started, + skipped); + + return started; +} + +size_t PGRecovery::start_replica_recovery_ops( + size_t max_to_start, + std::vector> *out) +{ + if (!pg->is_recovering()) { + return 0; + } + uint64_t started = 0; + + assert(!pg->get_peering_state().get_acting_recovery_backfill().empty()); + + auto recovery_order = get_replica_recovery_order(); + for (auto &peer : recovery_order) { + assert(peer != pg->get_peering_state().get_primary()); + auto pm = pg->get_peering_state().get_peer_missing().find(peer); + assert(pm != pg->get_peering_state().get_peer_missing().end()); + + size_t m_sz = pm->second.num_missing(); + + crimson::get_logger(ceph_subsys_osd).debug( + "{}: peer osd.{} missing {} objects", + __func__, + peer, + m_sz); + crimson::get_logger(ceph_subsys_osd).trace( + "{}: peer osd.{} missing {}", __func__, + peer, pm->second.get_items()); + + // recover oldest first + const pg_missing_t &m(pm->second); + for (auto p = m.get_rmissing().begin(); + p != m.get_rmissing().end() && started < max_to_start; + ++p) { + const auto &soid = p->second; + + if (pg->get_peering_state().get_missing_loc().is_unfound(soid)) { + crimson::get_logger(ceph_subsys_osd).debug( + "{}: object {} still unfound", __func__, soid); + continue; + } + + const pg_info_t &pi = pg->get_peering_state().get_peer_info(peer); + if (soid > pi.last_backfill) { + if (!pg->get_recovery_backend()->is_recovering(soid)) { + crimson::get_logger(ceph_subsys_osd).error( + "{}: object {} in missing set for backfill (last_backfill {})" + " but not in recovering", + __func__, + soid, + pi.last_backfill); + ceph_abort(); + } + continue; + } + + if (pg->get_recovery_backend()->is_recovering(soid)) { + crimson::get_logger(ceph_subsys_osd).debug( + "{}: already recovering object {}", __func__, soid); + continue; + } + + if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { + crimson::get_logger(ceph_subsys_osd).debug( + "{}: soid {} is a delete, removing", __func__, soid); + map::const_iterator r = + m.get_items().find(soid); + started += prep_object_replica_deletes( + soid, r->second.need, out); + continue; + } + + if (soid.is_snap() && + pg->get_peering_state().get_pg_log().get_missing().is_missing( + soid.get_head())) { + crimson::get_logger(ceph_subsys_osd).debug( + "{}: head {} still missing on primary", + __func__, soid.get_head()); + continue; + } + + if (pg->get_peering_state().get_pg_log().get_missing().is_missing(soid)) { + crimson::get_logger(ceph_subsys_osd).debug( + "{}: soid {} still missing on primary", __func__, soid); + continue; + } + + crimson::get_logger(ceph_subsys_osd).debug( + "{}: recover_object_replicas({})", + __func__, + soid); + map::const_iterator r = m.get_items().find( + soid); + started += prep_object_replica_pushes( + soid, r->second.need, out); + } + } + + return started; +} + +size_t PGRecovery::start_backfill_ops( + size_t max_to_start, + std::vector> *out) +{ + assert(!pg->get_peering_state().get_backfill_targets().empty()); + + ceph_abort("not implemented!"); +} + +std::optional> PGRecovery::recover_missing( + const hobject_t &soid, eversion_t need) +{ + if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) { + return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future( + pg->get_recovery_backend()->recover_delete(soid, need)); + } else { + return pg->get_recovery_backend()->get_recovering(soid).make_blocking_future( + pg->get_recovery_backend()->recover_object(soid, need).handle_exception( + [=, soid = std::move(soid)] (auto e) { + on_failed_recover({ pg->get_pg_whoami() }, soid, need); + return seastar::make_ready_future<>(); + }) + ); + } +} + +size_t PGRecovery::prep_object_replica_deletes( + const hobject_t& soid, + eversion_t need, + std::vector> *in_progress) +{ + in_progress->push_back( + pg->get_recovery_backend()->get_recovering(soid).make_blocking_future( + pg->get_recovery_backend()->push_delete(soid, need).then([=] { + object_stat_sum_t stat_diff; + stat_diff.num_objects_recovered = 1; + on_global_recover(soid, stat_diff, true); + return seastar::make_ready_future<>(); + }) + ) + ); + return 1; +} + +size_t PGRecovery::prep_object_replica_pushes( + const hobject_t& soid, + eversion_t need, + std::vector> *in_progress) +{ + in_progress->push_back( + pg->get_recovery_backend()->get_recovering(soid).make_blocking_future( + pg->get_recovery_backend()->recover_object(soid, need).handle_exception( + [=, soid = std::move(soid)] (auto e) { + on_failed_recover({ pg->get_pg_whoami() }, soid, need); + return seastar::make_ready_future<>(); + }) + ) + ); + return 1; +} + +void PGRecovery::on_local_recover( + const hobject_t& soid, + const ObjectRecoveryInfo& recovery_info, + const bool is_delete, + ceph::os::Transaction& t) +{ + pg->get_peering_state().recover_got(soid, + recovery_info.version, is_delete, t); + + if (pg->is_primary()) { + if (!is_delete) { + auto& obc = pg->get_recovery_backend()->get_recovering(soid).obc; //TODO: move to pg backend? + obc->obs.exists = true; + obc->obs.oi = recovery_info.oi; + // obc is loaded the excl lock + obc->put_lock_type(RWState::RWEXCL); + assert(obc->get_recovery_read()); + } + if (!pg->is_unreadable_object(soid)) { + pg->get_recovery_backend()->get_recovering(soid).set_readable(); + } + } +} + +void PGRecovery::on_global_recover ( + const hobject_t& soid, + const object_stat_sum_t& stat_diff, + const bool is_delete) +{ + pg->get_peering_state().object_recovered(soid, stat_diff); + auto& recovery_waiter = pg->get_recovery_backend()->get_recovering(soid); + if (!is_delete) + recovery_waiter.obc->drop_recovery_read(); + recovery_waiter.set_recovered(); + pg->get_recovery_backend()->remove_recovering(soid); +} + +void PGRecovery::on_failed_recover( + const set& from, + const hobject_t& soid, + const eversion_t& v) +{ + for (auto pg_shard : from) { + if (pg_shard != pg->get_pg_whoami()) { + pg->get_peering_state().force_object_missing(pg_shard, soid, v); + } + } +} + +void PGRecovery::on_peer_recover( + pg_shard_t peer, + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info) +{ + crimson::get_logger(ceph_subsys_osd).debug( + "{}: {}, {} on {}", __func__, oid, + recovery_info.version, peer); + pg->get_peering_state().on_peer_recover(peer, oid, recovery_info.version); +} + +void PGRecovery::_committed_pushed_object(epoch_t epoch, + eversion_t last_complete) +{ + if (!pg->has_reset_since(epoch)) { + pg->get_peering_state().recovery_committed_to(last_complete); + } else { + crimson::get_logger(ceph_subsys_osd).debug( + "{} pg has changed, not touching last_complete_ondisk", + __func__); + } +} diff --git a/src/crimson/osd/pg_recovery.h b/src/crimson/osd/pg_recovery.h new file mode 100644 index 00000000000..ccc46237daf --- /dev/null +++ b/src/crimson/osd/pg_recovery.h @@ -0,0 +1,79 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/pg_recovery_listener.h" +#include "crimson/osd/scheduler/scheduler.h" +#include "crimson/osd/shard_services.h" + +#include "osd/object_state.h" + +class PGBackend; + +class PGRecovery { +public: + PGRecovery(PGRecoveryListener* pg) : pg(pg) {} + virtual ~PGRecovery() {} + void start_background_recovery( + crimson::osd::scheduler::scheduler_class_t klass); + + crimson::osd::blocking_future start_recovery_ops(size_t max_to_start); + +private: + PGRecoveryListener* pg; + size_t start_primary_recovery_ops( + size_t max_to_start, + std::vector> *out); + size_t start_replica_recovery_ops( + size_t max_to_start, + std::vector> *out); + size_t start_backfill_ops( + size_t max_to_start, + std::vector> *out); + + std::vector get_replica_recovery_order() const { + return pg->get_replica_recovery_order(); + } + std::optional> recover_missing( + const hobject_t &soid, eversion_t need); + size_t prep_object_replica_deletes( + const hobject_t& soid, + eversion_t need, + std::vector> *in_progress); + size_t prep_object_replica_pushes( + const hobject_t& soid, + eversion_t need, + std::vector> *in_progress); + + void on_local_recover( + const hobject_t& soid, + const ObjectRecoveryInfo& recovery_info, + bool is_delete, + ceph::os::Transaction& t); + void on_global_recover ( + const hobject_t& soid, + const object_stat_sum_t& stat_diff, + bool is_delete); + void on_failed_recover( + const set& from, + const hobject_t& soid, + const eversion_t& v); + void on_peer_recover( + pg_shard_t peer, + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info); + void _committed_pushed_object(epoch_t epoch, + eversion_t last_complete); + friend class ReplicatedRecoveryBackend; + seastar::future<> handle_pull(Ref m); + seastar::future<> handle_push(Ref m); + seastar::future<> handle_push_reply(Ref m); + seastar::future<> handle_recovery_delete(Ref m); + seastar::future<> handle_recovery_delete_reply( + Ref m); + seastar::future<> handle_pull_response(Ref m); +}; diff --git a/src/crimson/osd/pg_recovery_listener.h b/src/crimson/osd/pg_recovery_listener.h new file mode 100644 index 00000000000..2946f93a53d --- /dev/null +++ b/src/crimson/osd/pg_recovery_listener.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "common/hobject.h" +#include "include/types.h" +#include "osd/osd_types.h" + +namespace crimson::osd { + class ShardServices; +}; + +class RecoveryBackend; +class PGRecovery; + +class PGRecoveryListener { +public: + virtual crimson::osd::ShardServices& get_shard_services() = 0; + virtual PGRecovery* get_recovery_handler() = 0; + virtual epoch_t get_osdmap_epoch() const = 0; + virtual bool is_primary() const = 0; + virtual bool is_peered() const = 0; + virtual bool is_recovering() const = 0; + virtual bool is_backfilling() const = 0; + virtual PeeringState& get_peering_state() = 0; + virtual const pg_shard_t& get_pg_whoami() const = 0; + virtual const spg_t& get_pgid() const = 0; + virtual RecoveryBackend* get_recovery_backend() = 0; + virtual bool is_unreadable_object(const hobject_t&) const = 0; + virtual bool has_reset_since(epoch_t) const = 0; + virtual std::vector get_replica_recovery_order() const = 0; +}; diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc new file mode 100644 index 00000000000..31ae27f91dd --- /dev/null +++ b/src/crimson/osd/recovery_backend.cc @@ -0,0 +1,44 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/recovery_backend.h" +#include "crimson/osd/pg.h" + +#include "osd/osd_types.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +hobject_t RecoveryBackend::get_temp_recovery_object( + const hobject_t& target, + eversion_t version) +{ + ostringstream ss; + ss << "temp_recovering_" << pg.get_info().pgid << "_" << version + << "_" << pg.get_info().history.same_interval_since << "_" << target.snap; + hobject_t hoid = target.make_temp_hobject(ss.str()); + logger().debug("{} {}", __func__, hoid); + return hoid; +} + +void RecoveryBackend::clean_up(ceph::os::Transaction& t, + const std::string& why) +{ + for (auto& soid : temp_contents) { + t.remove(pg.get_collection_ref()->get_cid(), + ghobject_t(soid, ghobject_t::NO_GEN, pg.get_pg_whoami().shard)); + } + temp_contents.clear(); + + for (auto& [soid, recovery_waiter] : recovering) { + if (recovery_waiter.obc) { + recovery_waiter.obc->drop_recovery_read(); + recovery_waiter.interrupt(why); + } + } + recovering.clear(); +} + diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h new file mode 100644 index 00000000000..461ae620fb7 --- /dev/null +++ b/src/crimson/osd/recovery_backend.h @@ -0,0 +1,155 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include "crimson/common/type_helpers.h" +#include "crimson/os/futurized_store.h" +#include "crimson/os/futurized_collection.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/shard_services.h" + +#include "osd/osd_types.h" + +namespace crimson::osd{ + class PG; +} + +class PGBackend; + +class RecoveryBackend { +protected: + class WaitForObjectRecovery; +public: + RecoveryBackend(crimson::osd::PG& pg, + crimson::osd::ShardServices& shard_services, + crimson::os::CollectionRef coll, + PGBackend* backend) + : pg{pg}, + shard_services{shard_services}, + store{&shard_services.get_store()}, + coll{coll}, + backend{backend} {} + virtual ~RecoveryBackend() {} + WaitForObjectRecovery& get_recovering(const hobject_t& soid) { + return recovering[soid]; + } + void remove_recovering(const hobject_t& soid) { + recovering.erase(soid); + } + bool is_recovering(const hobject_t& soid) { + return recovering.count(soid) != 0; + } + uint64_t total_recovering() { + return recovering.size(); + } + + virtual seastar::future<> handle_recovery_op( + Ref m) = 0; + + virtual seastar::future<> recover_object( + const hobject_t& soid, + eversion_t need) = 0; + virtual seastar::future<> recover_delete( + const hobject_t& soid, + eversion_t need) = 0; + virtual seastar::future<> push_delete( + const hobject_t& soid, + eversion_t need) = 0; + + void on_peering_interval_change(ceph::os::Transaction& t) { + clean_up(t, "new peering interval"); + } +protected: + crimson::osd::PG& pg; + crimson::osd::ShardServices& shard_services; + crimson::os::FuturizedStore* store; + crimson::os::CollectionRef coll; + PGBackend* backend; + + struct PullInfo { + pg_shard_t from; + hobject_t soid; + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + crimson::osd::ObjectContextRef head_ctx; + crimson::osd::ObjectContextRef obc; + object_stat_sum_t stat; + bool is_complete() const { + return recovery_progress.is_complete(recovery_info); + } + }; + + struct PushInfo { + ObjectRecoveryProgress recovery_progress; + ObjectRecoveryInfo recovery_info; + crimson::osd::ObjectContextRef obc; + object_stat_sum_t stat; + }; + + class WaitForObjectRecovery : public crimson::osd::BlockerT { + seastar::shared_promise<> readable, recovered, pulled; + std::map> pushes; + public: + static constexpr const char* type_name = "WaitForObjectRecovery"; + + crimson::osd::ObjectContextRef obc; + PullInfo pi; + std::map pushing; + + seastar::future<> wait_for_readable() { + return readable.get_shared_future(); + } + seastar::future<> wait_for_pushes(pg_shard_t shard) { + return pushes[shard].get_shared_future(); + } + seastar::future<> wait_for_recovered() { + return recovered.get_shared_future(); + } + seastar::future<> wait_for_pull() { + return pulled.get_shared_future(); + } + void set_readable() { + readable.set_value(); + } + void set_recovered() { + recovered.set_value(); + } + void set_pushed(pg_shard_t shard) { + pushes[shard].set_value(); + } + void set_pulled() { + pulled.set_value(); + } + void interrupt(const std::string& why) { + readable.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why)); + recovered.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why)); + pulled.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why)); + for (auto& [pg_shard, pr] : pushes) { + pr.set_exception(std::system_error( + std::make_error_code(std::errc::interrupted), why)); + } + } + void dump_detail(Formatter* f) const { + } + }; + std::map recovering; + hobject_t get_temp_recovery_object( + const hobject_t& target, + eversion_t version); + + boost::container::flat_set temp_contents; + + void add_temp_obj(const hobject_t &oid) { + temp_contents.insert(oid); + } + void clear_temp_obj(const hobject_t &oid) { + temp_contents.erase(oid); + } + void clean_up(ceph::os::Transaction& t, const std::string& why); +}; diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index 76f4539fa86..c9667e49278 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -6,10 +6,8 @@ #include "messages/MOSDRepOpReply.h" #include "crimson/common/log.h" -#include "crimson/os/cyanstore/cyan_object.h" #include "crimson/os/futurized_store.h" #include "crimson/osd/shard_services.h" -#include "crimson/osd/pg.h" #include "osd/PeeringState.h" namespace { diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc new file mode 100644 index 00000000000..4e0992c023d --- /dev/null +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -0,0 +1,1083 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include +#include +#include + +#include "crimson/osd/pg.h" +#include "crimson/osd/pg_backend.h" +#include "replicated_recovery_backend.h" + +#include "msg/Message.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +seastar::future<> ReplicatedRecoveryBackend::recover_object( + const hobject_t& soid, + eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + auto& recovery_waiter = recovering[soid]; + return seastar::do_with(std::map(), get_shards_to_push(soid), + [this, soid, need, &recovery_waiter](auto& pops, auto& shards) { + return [this, soid, need, &recovery_waiter] { + pg_missing_tracker_t local_missing = pg.get_local_missing(); + if (local_missing.is_missing(soid)) { + PullOp po; + auto& pi = recovery_waiter.pi; + prepare_pull(po, pi, soid, need); + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + std::vector pulls; + pulls.push_back(po); + msg->set_pulls(&pulls); + return shard_services.send_to_osd(pi.from.osd, + std::move(msg), + pg.get_osdmap_epoch()).then( + [&recovery_waiter] { + return recovery_waiter.wait_for_pull(); + }); + } else { + return seastar::make_ready_future<>(); + } + }().then([this, &pops, &shards, soid, need, &recovery_waiter]() mutable { + return [this, &recovery_waiter, soid] { + if (!recovery_waiter.obc) { + return pg.get_or_load_head_obc(soid).safe_then( + [this, &recovery_waiter](auto p) { + auto& [obc, existed] = p; + logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); + recovery_waiter.obc = obc; + if (!existed) { + // obc is loaded with excl lock + recovery_waiter.obc->put_lock_type(RWState::RWEXCL); + } + assert(recovery_waiter.obc->get_recovery_read()); + return seastar::make_ready_future<>(); + }, crimson::osd::PG::load_obc_ertr::all_same_way( + [this, &recovery_waiter, soid](const std::error_code& e) { + auto [obc, existed] = + shard_services.obc_registry.get_cached_obc(soid); + logger().debug("recover_object: load failure of obc: {}", + obc->obs.oi.soid); + recovery_waiter.obc = obc; + // obc is loaded with excl lock + recovery_waiter.obc->put_lock_type(RWState::RWEXCL); + assert(recovery_waiter.obc->get_recovery_read()); + return seastar::make_ready_future<>(); + }) + ); + } + return seastar::now(); + }().then([this, soid, need, &pops, &shards] { + return prep_push(soid, need, &pops, shards); + }); + }).handle_exception([this, soid, &recovery_waiter](auto e) { + auto& recovery_waiter = recovering[soid]; + if (recovery_waiter.obc) + recovery_waiter.obc->drop_recovery_read(); + recovering.erase(soid); + return seastar::make_exception_future<>(e); + }).then([this, &pops, &shards, soid] { + return seastar::parallel_for_each(shards, + [this, &pops, soid](auto shard) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(pops[shard->first]); + return shard_services.send_to_osd(shard->first.osd, std::move(msg), + pg.get_osdmap_epoch()).then( + [this, soid, shard] { + return recovering[soid].wait_for_pushes(shard->first); + }); + }); + }).then([this, soid, &recovery_waiter] { + bool error = recovering[soid].pi.recovery_progress.error; + if (!error) { + auto push_info = recovering[soid].pushing.begin(); + object_stat_sum_t stat = {}; + if (push_info != recovering[soid].pushing.end()) { + stat = push_info->second.stat; + } else { + // no push happened, take pull_info's stat + stat = recovering[soid].pi.stat; + } + pg.get_recovery_handler()->on_global_recover(soid, stat, false); + return seastar::make_ready_future<>(); + } else { + auto& recovery_waiter = recovering[soid]; + if (recovery_waiter.obc) + recovery_waiter.obc->drop_recovery_read(); + recovering.erase(soid); + return seastar::make_exception_future<>( + std::runtime_error(fmt::format("Errors during pushing for {}", soid))); + } + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::push_delete( + const hobject_t& soid, + eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + recovering[soid]; + epoch_t min_epoch = pg.get_last_peering_reset(); + + assert(pg.get_acting_recovery_backfill().size() > 0); + return seastar::parallel_for_each(pg.get_acting_recovery_backfill(), + [this, soid, need, min_epoch](pg_shard_t shard) { + if (shard == pg.get_pg_whoami()) + return seastar::make_ready_future<>(); + auto iter = pg.get_shard_missing().find(shard); + if (iter == pg.get_shard_missing().end()) + return seastar::make_ready_future<>(); + if (iter->second.is_missing(soid)) { + logger().debug("{} will remove {} from {}", __func__, soid, shard); + pg.begin_peer_recover(shard, soid); + spg_t target_pg = spg_t(pg.get_info().pgid.pgid, shard.shard); + auto msg = make_message( + pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch); + + msg->set_priority(pg.get_recovery_op_priority()); + msg->objects.push_back(std::make_pair(soid, need)); + return shard_services.send_to_osd(shard.osd, std::move(msg), + pg.get_osdmap_epoch()).then( + [this, soid, shard] { + return recovering[soid].wait_for_pushes(shard); + }); + } + return seastar::make_ready_future<>(); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete( + Ref m) +{ + logger().debug("{}: {}", __func__, *m); + + auto& p = m->objects.front(); //TODO: only one delete per message for now. + return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then( + [this, m, &p] { + auto reply = make_message(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard); + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + reply->objects = m->objects; + return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist( + const hobject_t& soid, + const ObjectRecoveryInfo& _recovery_info, + bool is_delete, + epoch_t epoch_frozen) +{ + logger().debug("{}", __func__); + ceph::os::Transaction t; + pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t); + return shard_services.get_store().do_transaction(coll, std::move(t)).then( + [this, &soid, &_recovery_info, epoch_frozen, + last_complete = pg.get_info().last_complete] { + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + return seastar::make_ready_future<>(); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::local_recover_delete( + const hobject_t& soid, + eversion_t need, + epoch_t epoch_to_freeze) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + return backend->load_metadata(soid).safe_then([this] + (auto lomt) { + if (lomt->os.exists) { + return seastar::do_with(ceph::os::Transaction(), + [this, lomt = std::move(lomt)](auto& txn) { + return backend->remove(lomt->os, txn).then([this, &txn]() mutable { + return shard_services.get_store().do_transaction(coll, + std::move(txn)); + }); + }); + } + return seastar::make_ready_future<>(); + }).safe_then([this, soid, epoch_to_freeze, need] { + auto& recovery_waiter = recovering[soid]; + auto& pi = recovery_waiter.pi; + pi.recovery_info.soid = soid; + pi.recovery_info.version = need; + return on_local_recover_persist(soid, pi.recovery_info, + true, epoch_to_freeze); + }, PGBackend::load_metadata_ertr::all_same_way( + [this, soid, epoch_to_freeze, need] (auto e) { + auto& recovery_waiter = recovering[soid]; + auto& pi = recovery_waiter.pi; + pi.recovery_info.soid = soid; + pi.recovery_info.version = need; + return on_local_recover_persist(soid, pi.recovery_info, + true, epoch_to_freeze); + }) + ); +} + +seastar::future<> ReplicatedRecoveryBackend::recover_delete( + const hobject_t &soid, eversion_t need) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + + epoch_t cur_epoch = pg.get_osdmap_epoch(); + return seastar::do_with(object_stat_sum_t(), + [this, soid, need, cur_epoch](auto& stat_diff) { + return local_recover_delete(soid, need, cur_epoch).then( + [this, &stat_diff, cur_epoch, soid, need] { + if (!pg.has_reset_since(cur_epoch)) { + bool object_missing = false; + for (const auto& shard : pg.get_acting_recovery_backfill()) { + if (shard == pg.get_pg_whoami()) + continue; + if (pg.get_shard_missing(shard)->is_missing(soid)) { + logger().debug("{}: soid {} needs to deleted from replca {}", + __func__, + soid, + shard); + object_missing = true; + break; + } + } + + if (!object_missing) { + stat_diff.num_objects_recovered = 1; + return seastar::make_ready_future<>(); + } else { + return push_delete(soid, need); + } + } + return seastar::make_ready_future<>(); + }).then([this, soid, &stat_diff] { + pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true); + return seastar::make_ready_future<>(); + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::prep_push( + const hobject_t& soid, + eversion_t need, + std::map* pops, + const std::list::const_iterator>& shards) +{ + logger().debug("{}: {}, {}", __func__, soid, need); + + return seastar::do_with(std::map>(), + [this, soid, need, pops, &shards](auto& data_subsets) { + return seastar::parallel_for_each(shards, + [this, soid, need, pops, &data_subsets](auto pg_shard) mutable { + pops->emplace(pg_shard->first, PushOp()); + auto& recovery_waiter = recovering[soid]; + auto& obc = recovery_waiter.obc; + auto& data_subset = data_subsets[pg_shard->first]; + + if (obc->obs.oi.size) { + data_subset.insert(0, obc->obs.oi.size); + } + const auto& missing = pg.get_shard_missing().find(pg_shard->first)->second; + if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) { + const auto it = missing.get_items().find(soid); + assert(it != missing.get_items().end()); + data_subset.intersection_of(it->second.clean_regions.get_dirty_regions()); + logger().debug("calc_head_subsets {} data_subset {}", soid, data_subset); + } + + logger().debug("prep_push: {} to {}", soid, pg_shard->first); + auto& pi = recovery_waiter.pushing[pg_shard->first]; + pg.begin_peer_recover(pg_shard->first, soid); + const auto pmissing_iter = pg.get_shard_missing().find(pg_shard->first); + const auto missing_iter = pmissing_iter->second.get_items().find(soid); + assert(missing_iter != pmissing_iter->second.get_items().end()); + + pi.obc = obc; + pi.recovery_info.size = obc->obs.oi.size; + pi.recovery_info.copy_subset = data_subset; + pi.recovery_info.soid = soid; + pi.recovery_info.oi = obc->obs.oi; + pi.recovery_info.version = obc->obs.oi.version; + pi.recovery_info.object_exist = + missing_iter->second.clean_regions.object_is_exist(); + pi.recovery_progress.omap_complete = + !missing_iter->second.clean_regions.omap_is_dirty() && + HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); + + return build_push_op(pi.recovery_info, pi.recovery_progress, + &pi.stat, &(*pops)[pg_shard->first]).then( + [this, soid, pg_shard](auto new_progress) { + auto& recovery_waiter = recovering[soid]; + auto& pi = recovery_waiter.pushing[pg_shard->first]; + pi.recovery_progress = new_progress; + return seastar::make_ready_future<>(); + }); + }); + }); +} + +void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi, + const hobject_t& soid, + eversion_t need) { + logger().debug("{}: {}, {}", __func__, soid, need); + + pg_missing_tracker_t local_missing = pg.get_local_missing(); + const auto missing_iter = local_missing.get_items().find(soid); + auto m = pg.get_missing_loc_shards(); + pg_shard_t fromshard = *(m[soid].begin()); + + //TODO: skipped snap objects case for now + po.recovery_info.copy_subset.insert(0, (uint64_t) -1); + if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) + po.recovery_info.copy_subset.intersection_of( + missing_iter->second.clean_regions.get_dirty_regions()); + po.recovery_info.size = ((uint64_t) -1); + po.recovery_info.object_exist = + missing_iter->second.clean_regions.object_is_exist(); + po.recovery_info.soid = soid; + po.soid = soid; + po.recovery_progress.data_complete = false; + po.recovery_progress.omap_complete = + !missing_iter->second.clean_regions.omap_is_dirty() && + HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); + po.recovery_progress.data_recovered_to = 0; + po.recovery_progress.first = true; + + pi.from = fromshard; + pi.soid = soid; + pi.recovery_info = po.recovery_info; + pi.recovery_progress = po.recovery_progress; +} + +seastar::future ReplicatedRecoveryBackend::build_push_op( + const ObjectRecoveryInfo& recovery_info, + const ObjectRecoveryProgress& progress, + object_stat_sum_t* stat, + PushOp* pop + ) { + logger().debug("{}", __func__); + return seastar::do_with(ObjectRecoveryProgress(progress), + object_info_t(), + uint64_t(crimson::common::local_conf() + ->osd_recovery_max_chunk), + eversion_t(), + [this, &recovery_info, &progress, stat, pop] + (auto& new_progress, auto& oi, auto& available, auto& v) { + return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] { + if (progress.first) { + v = recovery_info.version; + return backend->omap_get_header(coll, ghobject_t(recovery_info.soid)) + .then([this, &recovery_info, pop](auto bl) { + pop->omap_header.claim_append(bl); + return store->get_attrs(coll, ghobject_t(recovery_info.soid)); + }).safe_then([this, &oi, pop, &new_progress, &v](auto attrs) mutable { + //pop->attrset = attrs; + for (auto p : attrs) { + pop->attrset[p.first].push_back(p.second); + } + logger().debug("build_push_op: {}", pop->attrset[OI_ATTR]); + oi.decode(pop->attrset[OI_ATTR]); + new_progress.first = false; + if (v == eversion_t()) { + v = oi.version; + } + return seastar::make_ready_future<>(); + }, crimson::os::FuturizedStore::get_attrs_ertr::all_same_way( + [] (const std::error_code& e) { + return seastar::make_exception_future<>(e); + }) + ); + } + return seastar::make_ready_future<>(); + }().then([this, &recovery_info] { + return shard_services.get_store().get_omap_iterator(coll, + ghobject_t(recovery_info.soid)); + }).then([this, &progress, &available, &new_progress, pop](auto iter) { + if (!progress.omap_complete) { + return iter->lower_bound(progress.omap_recovered_to).then( + [this, iter, &new_progress, pop, &available](int ret) { + return seastar::repeat([this, iter, &new_progress, pop, &available] { + if (!iter->valid()) { + new_progress.omap_complete = true; + return seastar::make_ready_future( + seastar::stop_iteration::yes); + } + if (!pop->omap_entries.empty() + && ((crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk > 0 + && pop->omap_entries.size() + >= crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk) + || available <= iter->key().size() + iter->value().length())) { + new_progress.omap_recovered_to = iter->key(); + return seastar::make_ready_future( + seastar::stop_iteration::yes); + } + pop->omap_entries.insert(make_pair(iter->key(), iter->value())); + if ((iter->key().size() + iter->value().length()) <= available) + available -= (iter->key().size() + iter->value().length()); + else + available = 0; + return iter->next().then([](int r) { + return seastar::stop_iteration::no; + }); + }); + }); + } + return seastar::make_ready_future<>(); + }).then([this, &recovery_info, &progress, &available, &new_progress, pop] { + logger().debug("build_push_op: available: {}, copy_subset: {}", + available, recovery_info.copy_subset); + if (available > 0) { + if (!recovery_info.copy_subset.empty()) { + return seastar::do_with(interval_set(recovery_info.copy_subset), + [this, &recovery_info, &progress, &available, pop, &new_progress] + (auto& copy_subset) { + return backend->fiemap(coll, ghobject_t(recovery_info.soid), + 0, copy_subset.range_end()).then( + [©_subset](auto m) { + interval_set fiemap_included(std::move(m)); + copy_subset.intersection_of(fiemap_included); + return seastar::make_ready_future<>(); + }).then([this, &recovery_info, &progress, + ©_subset, &available, pop, &new_progress] { + pop->data_included.span_of(copy_subset, progress.data_recovered_to, + available); + if (pop->data_included.empty()) // zero filled section, skip to end! + new_progress.data_recovered_to = + recovery_info.copy_subset.range_end(); + else + new_progress.data_recovered_to = pop->data_included.range_end(); + return seastar::make_ready_future<>(); + }).handle_exception([©_subset](auto e) { + copy_subset.clear(); + return seastar::make_ready_future<>(); + }); + }); + } else { + return seastar::now(); + } + } else { + pop->data_included.clear(); + return seastar::make_ready_future<>(); + } + }).then([this, &recovery_info, &progress, &oi, pop] { + //TODO: there's no readv in cyan_store yet, use read temporarily. + return store->readv(coll, ghobject_t{oi.soid}, pop->data_included, 0); + }).safe_then([this, &recovery_info, &progress, + &new_progress, &oi, stat, pop, &v] + (auto bl) { + pop->data.claim_append(bl); + if (new_progress.is_complete(recovery_info)) { + new_progress.data_complete = true; + if (stat) + stat->num_objects_recovered++; + } else if (progress.first && progress.omap_complete) { + // If omap is not changed, we need recovery omap + // when recovery cannot be completed once + new_progress.omap_complete = false; + } + if (stat) { + stat->num_keys_recovered += pop->omap_entries.size(); + stat->num_bytes_recovered += pop->data.length(); + } + pop->version = v; + pop->soid = recovery_info.soid; + pop->recovery_info = recovery_info; + pop->after_progress = new_progress; + pop->before_progress = progress; + logger().debug("build_push_op: pop version: {}, pop data length: {}", + pop->version, pop->data.length()); + return seastar::make_ready_future + (std::move(new_progress)); + }, PGBackend::read_errorator::all_same_way([](auto e) { + logger().debug("build_push_op: read exception"); + return seastar::make_exception_future(e); + }) + ); + }); +} + +std::list::const_iterator> +ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) +{ + std::list::const_iterator> shards; + assert(pg.get_acting_recovery_backfill().size() > 0); + for (set::iterator i = + pg.get_acting_recovery_backfill().begin(); + i != pg.get_acting_recovery_backfill().end(); + ++i) { + if (*i == pg.get_pg_whoami()) + continue; + pg_shard_t peer = *i; + map::const_iterator j = + pg.get_shard_missing().find(peer); + assert(j != pg.get_shard_missing().end()); + if (j->second.is_missing(soid)) { + shards.push_back(j); + } + } + return shards; +} + +seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) +{ + logger().debug("{}: {}", __func__, *m); + vector pulls; + m->take_pulls(&pulls); + return seastar::do_with(std::move(pulls), + [this, m, from = m->from](auto& pulls) { + return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) { + const hobject_t& soid = pull_op.soid; + return seastar::do_with(PushOp(), + [this, &soid, &pull_op, from](auto& pop) { + logger().debug("handle_pull: {}", soid); + return backend->stat(coll, ghobject_t(soid)).then( + [this, &pull_op, &pop](auto st) { + ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; + ObjectRecoveryProgress &progress = pull_op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t) -1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + if (st.st_size) { + interval_set object_range; + object_range.insert(0, st.st_size); + recovery_info.copy_subset.intersection_of(object_range); + } else { + recovery_info.copy_subset.clear(); + } + assert(recovery_info.clone_subset.empty()); + } + return build_push_op(recovery_info, progress, 0, &pop); + }).handle_exception([soid, &pop](auto e) { + pop.recovery_info.version = eversion_t(); + pop.version = eversion_t(); + pop.soid = soid; + return seastar::make_ready_future(); + }).then([this, &pop, &pull_op, from](auto new_progress) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(pop); + return shard_services.send_to_osd(from.osd, std::move(msg), + pg.get_osdmap_epoch()); + }); + }); + }); + }); +} + +seastar::future ReplicatedRecoveryBackend::_handle_pull_response( + pg_shard_t from, + PushOp& pop, + PullOp* response, + ceph::os::Transaction* t) +{ + logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}", + pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included); + + const hobject_t &hoid = pop.soid; + auto& recovery_waiter = recovering[hoid]; + auto& pi = recovery_waiter.pi; + if (pi.recovery_info.size == (uint64_t(-1))) { + pi.recovery_info.size = pop.recovery_info.size; + pi.recovery_info.copy_subset.intersection_of( + pop.recovery_info.copy_subset); + } + + // If primary doesn't have object info and didn't know version + if (pi.recovery_info.version == eversion_t()) + pi.recovery_info.version = pop.version; + + bool first = pi.recovery_progress.first; + + return [this, &pi, first, &recovery_waiter, &pop] { + if (first) { + return pg.get_or_load_head_obc(pi.recovery_info.soid).safe_then( + [this, &pi, &recovery_waiter, &pop](auto p) { + auto& [obc, existed] = p; + pi.obc = obc; + recovery_waiter.obc = obc; + obc->obs.oi.decode(pop.attrset[OI_ATTR]); + pi.recovery_info.oi = obc->obs.oi; + return seastar::make_ready_future<>(); + }, crimson::osd::PG::load_obc_ertr::all_same_way( + [this, &pi](const std::error_code& e) { + auto [obc, existed] = shard_services.obc_registry.get_cached_obc( + pi.recovery_info.soid); + pi.obc = obc; + return seastar::make_ready_future<>(); + }) + ); + } + return seastar::make_ready_future<>(); + }().then([this, first, &pi, &pop, t, response]() mutable { + return seastar::do_with(interval_set(), + bufferlist(), + interval_set(), + [this, &pop, &pi, first, t, response] + (auto& data_zeros, auto& data, + auto& usable_intervals) { + data = pop.data; + ceph::bufferlist usable_data; + trim_pushed_data(pi.recovery_info.copy_subset, pop.data_included, data, + &usable_intervals, &usable_data); + data.claim(usable_data); + pi.recovery_progress = pop.after_progress; + logger().debug("new recovery_info {}, new progress {}", + pi.recovery_info, pi.recovery_progress); + uint64_t z_offset = pop.before_progress.data_recovered_to; + uint64_t z_length = pop.after_progress.data_recovered_to + - pop.before_progress.data_recovered_to; + if (z_length) + data_zeros.insert(z_offset, z_length); + bool complete = pi.is_complete(); + bool clear_omap = !pop.before_progress.omap_complete; + return submit_push_data(pi.recovery_info, first, complete, clear_omap, + data_zeros, usable_intervals, data, pop.omap_header, + pop.attrset, pop.omap_entries, t).then( + [this, response, &pi, &pop, &data, complete, t] { + pi.stat.num_keys_recovered += pop.omap_entries.size(); + pi.stat.num_bytes_recovered += data.length(); + + if (complete) { + pi.stat.num_objects_recovered++; + pg.get_recovery_handler()->on_local_recover(pop.soid, recovering[pop.soid].pi.recovery_info, + false, *t); + return seastar::make_ready_future(true); + } else { + response->soid = pop.soid; + response->recovery_info = pi.recovery_info; + response->recovery_progress = pi.recovery_progress; + return seastar::make_ready_future(false); + } + }); + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_pull_response( + Ref m) +{ + const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now. + if (pop.version == eversion_t()) { + // replica doesn't have it! + pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid, + get_recovering(pop.soid).pi.recovery_info.version); + return seastar::make_exception_future<>( + std::runtime_error(fmt::format( + "Error on pushing side {} when pulling obj {}", + m->from, pop.soid))); + } + + logger().debug("{}: {}", __func__, *m); + return seastar::do_with(PullOp(), [this, m](auto& response) { + return seastar::do_with(ceph::os::Transaction(), m.get(), + [this, &response](auto& t, auto& m) { + pg_shard_t from = m->from; + PushOp& pop = m->pushes[0]; // only one push per message for now + return _handle_pull_response(from, pop, &response, &t).then( + [this, &t](bool complete) { + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + return shard_services.get_store().do_transaction(coll, std::move(t)) + .then([this, epoch_frozen, complete, + last_complete = pg.get_info().last_complete] { + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + return seastar::make_ready_future(complete); + }); + }); + }).then([this, m, &response](bool complete) { + if (complete) { + auto& pop = m->pushes[0]; + recovering[pop.soid].set_pulled(); + return seastar::make_ready_future<>(); + } else { + auto reply = make_message(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + vector vec = { std::move(response) }; + reply->set_pulls(&vec); + return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch()); + } + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::_handle_push( + pg_shard_t from, + const PushOp &pop, + PushReplyOp *response, + ceph::os::Transaction *t) +{ + logger().debug("{}", __func__); + + return seastar::do_with(interval_set(), + bufferlist(), + [this, &pop, t, response](auto& data_zeros, auto& data) { + data = pop.data; + bool first = pop.before_progress.first; + bool complete = pop.after_progress.data_complete + && pop.after_progress.omap_complete; + bool clear_omap = !pop.before_progress.omap_complete; + uint64_t z_offset = pop.before_progress.data_recovered_to; + uint64_t z_length = pop.after_progress.data_recovered_to + - pop.before_progress.data_recovered_to; + if (z_length) + data_zeros.insert(z_offset, z_length); + response->soid = pop.recovery_info.soid; + + return submit_push_data(pop.recovery_info, first, complete, clear_omap, + data_zeros, pop.data_included, data, pop.omap_header, pop.attrset, + pop.omap_entries, t).then([this, complete, &data_zeros, &pop, t] { + if (complete) { + pg.get_recovery_handler()->on_local_recover(pop.recovery_info.soid, + pop.recovery_info, false, *t); + } + }); + }); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_push( + Ref m) +{ + if (pg.is_primary()) { + return handle_pull_response(m); + } + + logger().debug("{}: {}", __func__, *m); + return seastar::do_with(PushReplyOp(), [this, m](auto& response) { + const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now + return seastar::do_with(ceph::os::Transaction(), + [this, m, &pop, &response](auto& t) { + return _handle_push(m->from, pop, &response, &t).then( + [this, &t] { + epoch_t epoch_frozen = pg.get_osdmap_epoch(); + return shard_services.get_store().do_transaction(coll, std::move(t)).then( + [this, epoch_frozen, last_complete = pg.get_info().last_complete] { + //TODO: this should be grouped with pg.on_local_recover somehow. + pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete); + }); + }); + }).then([this, m, &pop, &response]() mutable { + auto reply = make_message(); + reply->from = pg.get_pg_whoami(); + reply->set_priority(m->get_priority()); + reply->pgid = pg.get_info().pgid; + reply->map_epoch = m->map_epoch; + reply->min_epoch = m->min_epoch; + std::vector replies = { std::move(response) }; + reply->replies.swap(replies); + return shard_services.send_to_osd(m->from.osd, + std::move(reply), pg.get_osdmap_epoch()); + }); + }); +} + +seastar::future ReplicatedRecoveryBackend::_handle_push_reply( + pg_shard_t peer, + const PushReplyOp &op, + PushOp *reply) +{ + const hobject_t& soid = op.soid; + logger().debug("{}, soid {}, from {}", __func__, soid, peer); + auto recovering_iter = recovering.find(soid); + if (recovering_iter == recovering.end() + || !recovering_iter->second.pushing.count(peer)) { + logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); + return seastar::make_ready_future(true); + } else { + auto& pi = recovering_iter->second.pushing[peer]; + return [this, &pi, &soid, reply, peer, recovering_iter] { + bool error = pi.recovery_progress.error; + if (!pi.recovery_progress.data_complete && !error) { + return build_push_op(pi.recovery_info, pi.recovery_progress, + &pi.stat, reply).then([&pi] (auto new_progress) { + pi.recovery_progress = new_progress; + return seastar::make_ready_future(false); + }); + } + if (!error) + pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); + recovering_iter->second.set_pushed(peer); + return seastar::make_ready_future(true); + }().handle_exception([this, recovering_iter, &pi, &soid, peer] (auto e) { + pi.recovery_progress.error = true; + recovering_iter->second.set_pushed(peer); + return seastar::make_ready_future(true); + }); + } +} + +seastar::future<> ReplicatedRecoveryBackend::handle_push_reply( + Ref m) +{ + logger().debug("{}: {}", __func__, *m); + auto from = m->from; + auto& push_reply = m->replies[0]; //TODO: only one reply per message + + return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) { + return _handle_push_reply(from, push_reply, &pop).then( + [this, &push_reply, &pop, from](bool finished) { + if (!finished) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(pop); + return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch()); + } + return seastar::make_ready_future<>(); + }); + }); +} + +void ReplicatedRecoveryBackend::trim_pushed_data( + const interval_set ©_subset, + const interval_set &intervals_received, + ceph::bufferlist data_received, + interval_set *intervals_usable, + bufferlist *data_usable) +{ + logger().debug("{}", __func__); + if (intervals_received.subset_of(copy_subset)) { + *intervals_usable = intervals_received; + *data_usable = data_received; + return; + } + + intervals_usable->intersection_of(copy_subset, intervals_received); + + uint64_t off = 0; + for (interval_set::const_iterator p = intervals_received.begin(); + p != intervals_received.end(); ++p) { + interval_set x; + x.insert(p.get_start(), p.get_len()); + x.intersection_of(copy_subset); + for (interval_set::const_iterator q = x.begin(); q != x.end(); + ++q) { + bufferlist sub; + uint64_t data_off = off + (q.get_start() - p.get_start()); + sub.substr_of(data_received, data_off, q.get_len()); + data_usable->claim_append(sub); + } + off += p.get_len(); + } +} + +seastar::future<> ReplicatedRecoveryBackend::submit_push_data( + const ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + bool clear_omap, + interval_set &data_zeros, + const interval_set &intervals_included, + bufferlist data_included, + bufferlist omap_header, + const map &attrs, + const map &omap_entries, + ObjectStore::Transaction *t) +{ + logger().debug("{}", __func__); + hobject_t target_oid; + if (first && complete) { + target_oid = recovery_info.soid; + } else { + target_oid = get_temp_recovery_object(recovery_info.soid, + recovery_info.version); + if (first) { + logger().debug("{}: Adding oid {} in the temp collection", + __func__, target_oid); + add_temp_obj(target_oid); + } + } + + return [this, &recovery_info, first, complete, t, + &omap_header, &attrs, &omap_entries, target_oid, clear_omap] { + if (first) { + if (!complete) { + t->remove(coll->get_cid(), ghobject_t(target_oid)); + t->touch(coll->get_cid(), ghobject_t(target_oid)); + bufferlist bv = attrs.at(OI_ATTR); + object_info_t oi(bv); + t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), + oi.expected_object_size, + oi.expected_write_size, + oi.alloc_hint_flags); + } else { + if (!recovery_info.object_exist) { + t->remove(coll->get_cid(), ghobject_t(target_oid)); + t->touch(coll->get_cid(), ghobject_t(target_oid)); + bufferlist bv = attrs.at(OI_ATTR); + object_info_t oi(bv); + t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid), + oi.expected_object_size, + oi.expected_write_size, + oi.alloc_hint_flags); + } + //remove xattr and update later if overwrite on original object + t->rmattrs(coll->get_cid(), ghobject_t(target_oid)); + //if need update omap, clear the previous content first + if (clear_omap) + t->omap_clear(coll->get_cid(), ghobject_t(target_oid)); + } + + t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size); + if (omap_header.length()) + t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header); + + return store->stat(coll, ghobject_t(recovery_info.soid)).then ( + [this, &recovery_info, complete, t, target_oid, + omap_header = std::move(omap_header), &attrs, &omap_entries] (auto st) { + //TODO: pg num bytes counting + if (!complete) { + //clone overlap content in local object + if (recovery_info.object_exist) { + uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size); + interval_set local_intervals_included, local_intervals_excluded; + if (local_size) { + local_intervals_included.insert(0, local_size); + local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset); + local_intervals_included.subtract(local_intervals_excluded); + } + for (interval_set::const_iterator q = local_intervals_included.begin(); + q != local_intervals_included.end(); + ++q) { + logger().debug(" clone_range {} {}~{}", + recovery_info.soid, q.get_start(), q.get_len()); + t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid), ghobject_t(target_oid), + q.get_start(), q.get_len(), q.get_start()); + } + } + } + return seastar::make_ready_future<>(); + }); + } + return seastar::make_ready_future<>(); + }().then([this, &data_zeros, &recovery_info, &intervals_included, t, target_oid, + &omap_entries, &attrs, data_included, complete, first] { + uint64_t off = 0; + uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL; + // Punch zeros for data, if fiemap indicates nothing but it is marked dirty + if (data_zeros.size() > 0) { + data_zeros.intersection_of(recovery_info.copy_subset); + assert(intervals_included.subset_of(data_zeros)); + data_zeros.subtract(intervals_included); + + logger().debug("submit_push_data recovering object {} copy_subset: {} " + "intervals_included: {} data_zeros: {}", + recovery_info.soid, recovery_info.copy_subset, + intervals_included, data_zeros); + + for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p) + t->zero(coll->get_cid(), ghobject_t(target_oid), p.get_start(), p.get_len()); + } + logger().debug("submit_push_data: test"); + for (interval_set::const_iterator p = intervals_included.begin(); + p != intervals_included.end(); + ++p) { + bufferlist bit; + bit.substr_of(data_included, off, p.get_len()); + logger().debug("submit_push_data: test1"); + t->write(coll->get_cid(), ghobject_t(target_oid), + p.get_start(), p.get_len(), bit, fadvise_flags); + off += p.get_len(); + } + + if (!omap_entries.empty()) + t->omap_setkeys(coll->get_cid(), ghobject_t(target_oid), omap_entries); + if (!attrs.empty()) + t->setattrs(coll->get_cid(), ghobject_t(target_oid), attrs); + + if (complete) { + if (!first) { + logger().debug("{}: Removing oid {} from the temp collection", + __func__, target_oid); + clear_temp_obj(target_oid); + t->remove(coll->get_cid(), ghobject_t(recovery_info.soid)); + t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid), + coll->get_cid(), ghobject_t(recovery_info.soid)); + } + submit_push_complete(recovery_info, t); + } + logger().debug("submit_push_data: done"); + return seastar::make_ready_future<>(); + }); +} + +void ReplicatedRecoveryBackend::submit_push_complete( + const ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t) +{ + for (map>::const_iterator p = + recovery_info.clone_subset.begin(); + p != recovery_info.clone_subset.end(); ++p) { + for (interval_set::const_iterator q = p->second.begin(); + q != p->second.end(); ++q) { + logger().debug(" clone_range {} {}~{}", p->first, q.get_start(), q.get_len()); + t->clone_range(coll->get_cid(), ghobject_t(p->first), ghobject_t(recovery_info.soid), + q.get_start(), q.get_len(), q.get_start()); + } + } +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply( + Ref m) +{ + auto& p = m->objects.front(); + hobject_t soid = p.first; + ObjectRecoveryInfo recovery_info; + recovery_info.version = p.second; + pg.get_recovery_handler()->on_peer_recover(m->from, soid, recovery_info); + get_recovering(soid).set_pushed(m->from); + return seastar::now(); +} + +seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref m) +{ + switch (m->get_header().type) { + case MSG_OSD_PG_PULL: + return handle_pull(boost::static_pointer_cast(m)); + case MSG_OSD_PG_PUSH: + return handle_push(boost::static_pointer_cast(m)); + case MSG_OSD_PG_PUSH_REPLY: + return handle_push_reply( + boost::static_pointer_cast(m)); + case MSG_OSD_PG_RECOVERY_DELETE: + return handle_recovery_delete( + boost::static_pointer_cast(m)); + case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + return handle_recovery_delete_reply( + boost::static_pointer_cast(m)); + default: + return seastar::make_exception_future<>( + std::invalid_argument(fmt::format("invalid request type: {}", + m->get_header().type))); + } +} + diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h new file mode 100644 index 00000000000..467d7a5f656 --- /dev/null +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -0,0 +1,108 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/osd/recovery_backend.h" + +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGRecoveryDelete.h" +#include "messages/MOSDPGRecoveryDeleteReply.h" +#include "os/ObjectStore.h" + +class ReplicatedRecoveryBackend : public RecoveryBackend { +public: + ReplicatedRecoveryBackend(crimson::osd::PG& pg, + crimson::osd::ShardServices& shard_services, + crimson::os::CollectionRef coll, + PGBackend* backend) + : RecoveryBackend(pg, shard_services, coll, backend) {} + seastar::future<> handle_recovery_op( + Ref m) final; + + seastar::future<> recover_object( + const hobject_t& soid, + eversion_t need) final; + seastar::future<> recover_delete( + const hobject_t& soid, + eversion_t need) final; + seastar::future<> push_delete( + const hobject_t& soid, + eversion_t need) final; +protected: + seastar::future<> handle_pull( + Ref m); + seastar::future<> handle_pull_response( + Ref m); + seastar::future<> handle_push( + Ref m); + seastar::future<> handle_push_reply( + Ref m); + seastar::future<> handle_recovery_delete( + Ref m); + seastar::future<> handle_recovery_delete_reply( + Ref m); + seastar::future<> prep_push( + const hobject_t& soid, + eversion_t need, + std::map* pops, + const std::list::const_iterator>& shards); + void prepare_pull( + PullOp& po, + PullInfo& pi, + const hobject_t& soid, + eversion_t need); + std::list::const_iterator> get_shards_to_push( + const hobject_t& soid); + seastar::future build_push_op( + const ObjectRecoveryInfo& recovery_info, + const ObjectRecoveryProgress& progress, + object_stat_sum_t* stat, + PushOp* pop); + seastar::future _handle_pull_response( + pg_shard_t from, + PushOp& pop, + PullOp* response, + ceph::os::Transaction* t); + void trim_pushed_data( + const interval_set ©_subset, + const interval_set &intervals_received, + ceph::bufferlist data_received, + interval_set *intervals_usable, + bufferlist *data_usable); + seastar::future<> submit_push_data( + const ObjectRecoveryInfo &recovery_info, + bool first, + bool complete, + bool clear_omap, + interval_set &data_zeros, + const interval_set &intervals_included, + ceph::bufferlist data_included, + ceph::bufferlist omap_header, + const std::map &attrs, + const std::map &omap_entries, + ceph::os::Transaction *t); + void submit_push_complete( + const ObjectRecoveryInfo &recovery_info, + ObjectStore::Transaction *t); + seastar::future<> _handle_push( + pg_shard_t from, + const PushOp &pop, + PushReplyOp *response, + ceph::os::Transaction *t); + seastar::future _handle_push_reply( + pg_shard_t peer, + const PushReplyOp &op, + PushOp *reply); + seastar::future<> on_local_recover_persist( + const hobject_t& soid, + const ObjectRecoveryInfo& _recovery_info, + bool is_delete, + epoch_t epoch_to_freeze); + seastar::future<> local_recover_delete( + const hobject_t& soid, + eversion_t need, + epoch_t epoch_frozen); +}; diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 324cff0f5f8..ac42d9c2818 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -7055,3 +7055,45 @@ ostream &operator<<(ostream &out, const PeeringState &ps) { } return out; } + +std::vector PeeringState::get_replica_recovery_order() const +{ + std::vector> replicas_by_num_missing, + async_by_num_missing; + replicas_by_num_missing.reserve(get_acting_recovery_backfill().size() - 1); + for (auto &p : get_acting_recovery_backfill()) { + if (p == get_primary()) { + continue; + } + auto pm = get_peer_missing().find(p); + assert(pm != get_peer_missing().end()); + auto nm = pm->second.num_missing(); + if (nm != 0) { + if (is_async_recovery_target(p)) { + async_by_num_missing.push_back(make_pair(nm, p)); + } else { + replicas_by_num_missing.push_back(make_pair(nm, p)); + } + } + } + // sort by number of missing objects, in ascending order. + auto func = [](const std::pair &lhs, + const std::pair &rhs) { + return lhs.first < rhs.first; + }; + // acting goes first + std::sort(replicas_by_num_missing.begin(), replicas_by_num_missing.end(), func); + // then async_recovery_targets + std::sort(async_by_num_missing.begin(), async_by_num_missing.end(), func); + replicas_by_num_missing.insert(replicas_by_num_missing.end(), + async_by_num_missing.begin(), async_by_num_missing.end()); + + std::vector ret; + ret.reserve(replicas_by_num_missing.size()); + for (auto p : replicas_by_num_missing) { + ret.push_back(p.second); + } + return ret; +} + + diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index ec3e365843c..5a765e03017 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1739,6 +1739,9 @@ public: /// Updates info.hit_set to hset_history, does not dirty void update_hset(const pg_hit_set_history_t &hset_history); + /// Get all pg_shards that needs recovery + std::vector get_replica_recovery_order() const; + /** * update_history *