From: Radosław Zarzyński Date: Tue, 28 Nov 2023 11:58:01 +0000 (+0100) Subject: osd: dissect the recovery-related parts out from ECBackend X-Git-Tag: testing/wip-batrick-testing-20240411.154038~431^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=deffa8209f9c0bd300cfdb54d358402bfc6e41c6;p=ceph-ci.git osd: dissect the recovery-related parts out from ECBackend For the sake of sharing with crimson. Signed-off-by: Radosław Zarzyński --- diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 472acdf8848..ecc5d5c8237 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -56,8 +56,12 @@ static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { return pgb->get_parent()->gen_dbg_prefix(*_dout); } +static ostream& _prefix(std::ostream *_dout, ECBackend::RecoveryBackend *pgb) { + return pgb->get_parent()->gen_dbg_prefix(*_dout); +} + struct ECRecoveryHandle : public PGBackend::RecoveryHandle { - list ops; + list ops; }; static ostream &operator<<(ostream &lhs, const map &rhs) @@ -86,7 +90,7 @@ static ostream &operator<<(ostream &lhs, const map &rhs) return lhs << "]"; } -ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) +ostream &operator<<(ostream &lhs, const ECBackend::RecoveryBackend::RecoveryOp &rhs) { return lhs << "RecoveryOp(" << "hoid=" << rhs.hoid @@ -96,13 +100,13 @@ ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) << " recovery_info=" << rhs.recovery_info << " recovery_progress=" << rhs.recovery_progress << " obc refcount=" << rhs.obc.use_count() - << " state=" << ECBackend::RecoveryOp::tostr(rhs.state) + << " state=" << ECBackend::RecoveryBackend::RecoveryOp::tostr(rhs.state) << " waiting_on_pushes=" << rhs.waiting_on_pushes << " extent_requested=" << rhs.extent_requested << ")"; } -void ECBackend::RecoveryOp::dump(Formatter *f) const +void ECBackend::RecoveryBackend::RecoveryOp::dump(Formatter *f) const { f->dump_stream("hoid") << hoid; f->dump_stream("v") << v; @@ -126,19 +130,25 @@ ECBackend::ECBackend( : PGBackend(cct, pg, store, coll, ch), read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener()), rmw_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener(), *this), - unstable_hashinfo_registry(cct, ec_impl), + recovery_backend(cct, coll, ec_impl, this->sinfo, read_pipeline, unstable_hashinfo_registry, get_parent()->get_eclistener()), ec_impl(ec_impl), - sinfo(ec_impl->get_data_chunk_count(), stripe_width) { + sinfo(ec_impl->get_data_chunk_count(), stripe_width), + unstable_hashinfo_registry(cct, ec_impl) { ceph_assert((ec_impl->get_data_chunk_count() * ec_impl->get_chunk_size(stripe_width)) == stripe_width); } PGBackend::RecoveryHandle *ECBackend::open_recovery_op() +{ + return recovery_backend.open_recovery_op(); +} + +PGBackend::RecoveryHandle *ECBackend::RecoveryBackend::open_recovery_op() { return new ECRecoveryHandle; } -void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res) +void ECBackend::RecoveryBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res) { dout(10) << __func__ << ": Read error " << hoid << " r=" << res.r << " errors=" << res.errors << dendl; @@ -188,6 +198,48 @@ void ECBackend::handle_recovery_push( const PushOp &op, RecoveryMessages *m, bool is_repair) +{ + if (get_parent()->pg_is_remote_backfilling()) { + get_parent()->pg_add_local_num_bytes(op.data.length()); + get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count()); + dout(10) << __func__ << " " << op.soid + << " add new actual data by " << op.data.length() + << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count() + << dendl; + } + + recovery_backend.handle_recovery_push(op, m, is_repair); + + if (op.after_progress.data_complete) { + if ((get_parent()->pgb_is_primary())) { + if (get_parent()->pg_is_repair() || is_repair) + get_parent()->inc_osd_stat_repaired(); + } else { + // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired + if (is_repair) + get_parent()->inc_osd_stat_repaired(); + if (get_parent()->pg_is_remote_backfilling()) { + struct stat st; + int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN, + get_parent()->whoami_shard().shard), &st); + if (r == 0) { + get_parent()->pg_sub_local_num_bytes(st.st_size); + // XXX: This can be way overestimated for small objects + get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count()); + dout(10) << __func__ << " " << op.soid + << " sub actual data by " << st.st_size + << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count() + << dendl; + } + } + } + } +} + +void ECBackend::RecoveryBackend::handle_recovery_push( + const PushOp &op, + RecoveryMessages *m, + bool is_repair) { if (get_parent()->check_failsafe_full()) { dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl; @@ -231,15 +283,6 @@ void ECBackend::handle_recovery_push( ceph_assert(op.data.length() == 0); } - if (get_parent()->pg_is_remote_backfilling()) { - get_parent()->pg_add_local_num_bytes(op.data.length()); - get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count()); - dout(10) << __func__ << " " << op.soid - << " add new actual data by " << op.data.length() - << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count() - << dendl; - } - if (op.before_progress.first) { ceph_assert(op.attrset.count(string("_"))); m->t.setattrs( @@ -281,27 +324,13 @@ void ECBackend::handle_recovery_push( ObjectContextRef(), false, &m->t); - if (get_parent()->pg_is_remote_backfilling()) { - struct stat st; - int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN, - get_parent()->whoami_shard().shard), &st); - if (r == 0) { - get_parent()->pg_sub_local_num_bytes(st.st_size); - // XXX: This can be way overestimated for small objects - get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count()); - dout(10) << __func__ << " " << op.soid - << " sub actual data by " << st.st_size - << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count() - << dendl; - } - } } } m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; } -void ECBackend::handle_recovery_push_reply( +void ECBackend::RecoveryBackend::handle_recovery_push_reply( const PushReplyOp &op, pg_shard_t from, RecoveryMessages *m) @@ -314,7 +343,7 @@ void ECBackend::handle_recovery_push_reply( continue_recovery_op(rop, m); } -void ECBackend::handle_recovery_read_complete( +void ECBackend::RecoveryBackend::handle_recovery_read_complete( const hobject_t &hoid, boost::tuple > &to_read, std::optional> > attrs, @@ -327,7 +356,7 @@ void ECBackend::handle_recovery_read_complete( << ")" << dendl; ceph_assert(recovery_ops.count(hoid)); - RecoveryOp &op = recovery_ops[hoid]; + RecoveryBackend::RecoveryOp &op = recovery_ops[hoid]; ceph_assert(op.returned_data.empty()); map target; for (set::iterator i = op.missing_on_shards.begin(); @@ -417,7 +446,7 @@ struct SendPushReplies : public Context { }; struct RecoveryReadCompleter : ECCommon::ReadCompleter { - RecoveryReadCompleter(ECBackend& backend) + RecoveryReadCompleter(ECBackend::RecoveryBackend& backend) : backend(backend) {} void finish_single_request( @@ -442,27 +471,28 @@ struct RecoveryReadCompleter : ECCommon::ReadCompleter { backend.dispatch_recovery_messages(rm, priority); } - ECBackend& backend; + ECBackend::RecoveryBackend& backend; RecoveryMessages rm; }; -void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) +void ECBackend::RecoveryBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) { for (map >::iterator i = m.pushes.begin(); i != m.pushes.end(); m.pushes.erase(i++)) { MOSDPGPush *msg = new MOSDPGPush(); msg->set_priority(priority); - msg->map_epoch = get_osdmap_epoch(); + msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->from = get_parent()->whoami_shard(); msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); msg->pushes.swap(i->second); msg->compute_cost(cct); msg->is_repair = get_parent()->pg_is_repair(); - get_parent()->send_message( - i->first.osd, - msg); + std::vector wrapped_msg { + std::make_pair(i->first.osd, static_cast(msg)) + }; + get_parent()->send_message_osd_cluster(wrapped_msg, msg->map_epoch); } map replies; for (map >::iterator i = @@ -471,7 +501,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) m.push_replies.erase(i++)) { MOSDPGPushReply *msg = new MOSDPGPushReply(); msg->set_priority(priority); - msg->map_epoch = get_osdmap_epoch(); + msg->map_epoch = get_parent()->pgb_get_osdmap_epoch(); msg->min_epoch = get_parent()->get_last_peering_reset_epoch(); msg->from = get_parent()->whoami_shard(); msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); @@ -505,14 +535,9 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) void ECBackend::RecoveryBackend::continue_recovery_op( RecoveryBackend::RecoveryOp &op, RecoveryMessages *m) -{ -} - -void ECBackend::continue_recovery_op( - RecoveryOp &op, - RecoveryMessages *m) { dout(10) << __func__ << ": continuing " << op << dendl; + using RecoveryOp = RecoveryBackend::RecoveryOp; while (1) { switch (op.state) { case RecoveryOp::IDLE: { @@ -532,6 +557,7 @@ void ECBackend::continue_recovery_op( ceph_assert(recovery_ops.count(op.hoid)); eversion_t v = recovery_ops[op.hoid].v; recovery_ops.erase(op.hoid); + // TODO: not in crimson yet get_parent()->on_failed_pull({get_parent()->whoami_shard()}, op.hoid, v); return; @@ -548,6 +574,7 @@ void ECBackend::continue_recovery_op( ceph_assert(!op.recovery_progress.first); dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid << dendl; + // in crimson get_parent()->cancel_pull(op.hoid); recovery_ops.erase(op.hoid); return; @@ -611,6 +638,7 @@ void ECBackend::continue_recovery_op( pop.before_progress = op.recovery_progress; pop.after_progress = after_progress; if (*mi != get_parent()->primary_shard()) + // already in crimson -- junction point with PeeringState get_parent()->begin_peer_recover( *mi, op.hoid); @@ -641,8 +669,10 @@ void ECBackend::continue_recovery_op( stat.num_bytes_recovered = op.recovery_info.size; stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ? stat.num_objects_recovered = 1; + // TODO: not in crimson yet if (get_parent()->pg_is_repair()) stat.num_objects_repaired = 1; + // pg_recovery.cc in crimson has it get_parent()->on_global_recover(op.hoid, stat, false); dout(10) << __func__ << ": WRITING return " << op << dendl; recovery_ops.erase(op.hoid); @@ -668,20 +698,27 @@ void ECBackend::run_recovery_op( RecoveryHandle *_h, int priority) { - ECRecoveryHandle *h = static_cast(_h); + ceph_assert(_h); + ECRecoveryHandle &h = static_cast(*_h); + recovery_backend.run_recovery_op(h, priority); + send_recovery_deletes(priority, h.deletes); + delete _h; +} + +void ECBackend::RecoveryBackend::run_recovery_op( + ECRecoveryHandle &h, + int priority) +{ RecoveryMessages m; - for (list::iterator i = h->ops.begin(); - i != h->ops.end(); + for (list::iterator i = h.ops.begin(); + i != h.ops.end(); ++i) { dout(10) << __func__ << ": starting " << *i << dendl; ceph_assert(!recovery_ops.count(i->hoid)); RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second; continue_recovery_op(op, &m); } - dispatch_recovery_messages(m, priority); - send_recovery_deletes(priority, h->deletes); - delete _h; } int ECBackend::recover_object( @@ -690,6 +727,16 @@ int ECBackend::recover_object( ObjectContextRef head, ObjectContextRef obc, RecoveryHandle *_h) +{ + return recovery_backend.recover_object(hoid, v, head, obc, _h); +} + +int ECBackend::RecoveryBackend::recover_object( + const hobject_t &hoid, + eversion_t v, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *_h) { ECRecoveryHandle *h = static_cast(_h); h->ops.push_back(RecoveryOp()); @@ -786,7 +833,7 @@ bool ECBackend::_handle_message( ++i) { handle_recovery_push(*i, &rm, op->is_repair); } - dispatch_recovery_messages(rm, priority); + recovery_backend.dispatch_recovery_messages(rm, priority); return true; } case MSG_OSD_PG_PUSH_REPLY: { @@ -796,9 +843,9 @@ bool ECBackend::_handle_message( for (vector::const_iterator i = op->replies.begin(); i != op->replies.end(); ++i) { - handle_recovery_push_reply(*i, op->from, &rm); + recovery_backend.handle_recovery_push_reply(*i, op->from, &rm); } - dispatch_recovery_messages(rm, priority); + recovery_backend.dispatch_recovery_messages(rm, priority); return true; } default: @@ -1285,14 +1332,14 @@ void ECBackend::on_change() void ECBackend::clear_recovery_state() { - recovery_ops.clear(); + recovery_backend.recovery_ops.clear(); } void ECBackend::dump_recovery_info(Formatter *f) const { f->open_array_section("recovery_ops"); - for (map::const_iterator i = recovery_ops.begin(); - i != recovery_ops.end(); + for (map::const_iterator i = recovery_backend.recovery_ops.begin(); + i != recovery_backend.recovery_ops.end(); ++i) { f->open_object_section("op"); i->second.dump(f); diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 1f10269947a..21f8ed29244 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -158,11 +158,6 @@ private: void kick_reads(); - uint64_t get_recovery_chunk_size() const { - return round_up_to(cct->_conf->osd_recovery_max_chunk, - sinfo.get_stripe_width()); - } - /** * Recovery * @@ -193,6 +188,40 @@ private: * Transaction, and reads in a RecoveryMessages object which is passed * among the recovery methods. */ +public: + struct RecoveryBackend { + CephContext* cct; + const coll_t &coll; + ceph::ErasureCodeInterfaceRef ec_impl; + const ECUtil::stripe_info_t& sinfo; + ReadPipeline& read_pipeline; + UnstableHashInfoRegistry& unstable_hashinfo_registry; + // TODO: lay an interface down here + ECListener* parent; + + ECListener *get_parent() const { return parent; } + const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); } + epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); } + const pg_info_t &get_info() { return get_parent()->get_info(); } + void add_temp_obj(const hobject_t &oid) { get_parent()->add_temp_obj(oid); } + void clear_temp_obj(const hobject_t &oid) { get_parent()->clear_temp_obj(oid); } + + RecoveryBackend(CephContext* cct, + const coll_t &coll, + ceph::ErasureCodeInterfaceRef ec_impl, + const ECUtil::stripe_info_t& sinfo, + ReadPipeline& read_pipeline, + UnstableHashInfoRegistry& unstable_hashinfo_registry, + ECListener* parent) + : cct(cct), + coll(coll), + ec_impl(std::move(ec_impl)), + sinfo(sinfo), + read_pipeline(read_pipeline), + unstable_hashinfo_registry(unstable_hashinfo_registry), + parent(parent) { + } + // <<<---- struct RecoveryOp { hobject_t hoid; eversion_t v; @@ -206,13 +235,13 @@ private: static const char* tostr(state_t state) { switch (state) { - case ECBackend::RecoveryOp::IDLE: + case RecoveryOp::IDLE: return "IDLE"; - case ECBackend::RecoveryOp::READING: + case RecoveryOp::READING: return "READING"; - case ECBackend::RecoveryOp::WRITING: + case RecoveryOp::WRITING: return "WRITING"; - case ECBackend::RecoveryOp::COMPLETE: + case RecoveryOp::COMPLETE: return "COMPLETE"; default: ceph_abort(); @@ -237,13 +266,26 @@ private: friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs); std::map recovery_ops; + uint64_t get_recovery_chunk_size() const { + return round_up_to(cct->_conf->osd_recovery_max_chunk, + sinfo.get_stripe_width()); + } + + void dispatch_recovery_messages(RecoveryMessages &m, int priority); + + RecoveryHandle *open_recovery_op(); + void run_recovery_op( + struct ECRecoveryHandle &h, + int priority); + int recover_object( + const hobject_t &hoid, + eversion_t v, + ObjectContextRef head, + ObjectContextRef obc, + RecoveryHandle *h); void continue_recovery_op( - RecoveryOp &op, + RecoveryBackend::RecoveryOp &op, RecoveryMessages *m); - friend struct RecoveryMessages; - void dispatch_recovery_messages(RecoveryMessages &m, int priority); - friend struct OnRecoveryReadComplete; - friend struct RecoveryReadCompleter; void handle_recovery_read_complete( const hobject_t &hoid, boost::tuple > &to_read, @@ -257,10 +299,26 @@ private: const PushReplyOp &op, pg_shard_t from, RecoveryMessages *m); + friend struct RecoveryMessages; + int get_ec_data_chunk_count() const { + return ec_impl->get_data_chunk_count(); + } + void _failed_push(const hobject_t &hoid, ECCommon::read_result_t &res); + }; + friend ostream &operator<<(ostream &lhs, const RecoveryBackend::RecoveryOp &rhs); + friend struct RecoveryMessages; + friend struct OnRecoveryReadComplete; + friend struct RecoveryReadCompleter; + + void handle_recovery_push( + const PushOp &op, + RecoveryMessages *m, + bool is_repair); public: struct ReadPipeline read_pipeline; struct RMWPipeline rmw_pipeline; + struct RecoveryBackend recovery_backend; ceph::ErasureCodeInterfaceRef ec_impl; @@ -358,7 +416,6 @@ public: uint64_t be_get_ondisk_size(uint64_t logical_size) const final { return sinfo.logical_to_next_chunk_offset(logical_size); } - void _failed_push(const hobject_t &hoid, ECBackend::read_result_t &res); }; ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::pipeline_state_t &rhs); diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index 83e1712b043..2ef088f5bf1 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -65,6 +65,76 @@ struct ECListener { // XXX virtual void cancel_pull( const hobject_t &soid) = 0; + +#ifndef WITH_SEASTAR + // XXX + virtual pg_shard_t primary_shard() const = 0; + virtual bool pgb_is_primary() const = 0; + + /** + * Called when a read from a std::set of replicas/primary fails + */ + virtual void on_failed_pull( + const std::set &from, + const hobject_t &soid, + const eversion_t &v + ) = 0; + + /** + * Called with the transaction recovering oid + */ + virtual void on_local_recover( + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info, + ObjectContextRef obc, + bool is_delete, + ceph::os::Transaction *t + ) = 0; + + /** + * Called when transaction recovering oid is durable and + * applied on all replicas + */ + virtual void on_global_recover( + const hobject_t &oid, + const object_stat_sum_t &stat_diff, + bool is_delete + ) = 0; + + /** + * Called when peer is recovered + */ + virtual void on_peer_recover( + pg_shard_t peer, + const hobject_t &oid, + const ObjectRecoveryInfo &recovery_info + ) = 0; + + virtual void begin_peer_recover( + pg_shard_t peer, + const hobject_t oid) = 0; + + virtual bool pg_is_repair() const = 0; + + virtual ObjectContextRef get_obc( + const hobject_t &hoid, + const std::map> &attrs) = 0; + + virtual bool check_failsafe_full() = 0; + virtual hobject_t get_temp_recovery_object(const hobject_t& target, + eversion_t version) = 0; + virtual bool pg_is_remote_backfilling() = 0; + virtual void pg_add_local_num_bytes(int64_t num_bytes) = 0; + //virtual void pg_sub_local_num_bytes(int64_t num_bytes) = 0; + virtual void pg_add_num_bytes(int64_t num_bytes) = 0; + //virtual void pg_sub_num_bytes(int64_t num_bytes) = 0; + virtual void inc_osd_stat_repaired() = 0; + + virtual void add_temp_obj(const hobject_t &oid) = 0; + virtual void clear_temp_obj(const hobject_t &oid) = 0; + virtual epoch_t get_last_peering_reset_epoch() const = 0; +#endif + // XXX #ifndef WITH_SEASTAR virtual GenContext *bless_unlocked_gencontext( diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 420871b1990..85849bb85b9 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -304,6 +304,8 @@ public: } /// Listener methods + void add_temp_obj(const hobject_t &oid) override { get_pgbackend()->add_temp_obj(oid); } + void clear_temp_obj(const hobject_t &oid) override { get_pgbackend()->clear_temp_obj(oid); } DoutPrefixProvider *get_dpp() override { return this; }