struct ECSubReadReply;
class ECSwitch;
-struct RecoveryMessages;
class ECSwitch;
class ECBackend : public ECCommon {
bool _handle_message(OpRequestRef op);
bool can_handle_while_inactive(OpRequestRef op);
+
friend struct SubWriteApplied;
friend struct SubWriteCommitted;
void sub_write_committed(
void kick_reads();
- /**
- * Recovery
- *
- * Recovery uses the same underlying read mechanism as client reads
- * with the slight difference that recovery reads may come from non
- * acting shards. Thus, check_recovery_sources may wind up calling
- * cancel_pull for a read originating with RecoveryOp.
- *
- * The recovery process is expressed as a state machine:
- * - IDLE: Nothing is currently in progress, reads will be started and
- * we will transition to READING
- * - READING: We are awaiting a pending read op. Once complete, we will
- * decode the buffers and proceed to WRITING
- * - WRITING: We are awaiting a completed push. Once complete, we will
- * either transition to COMPLETE or to IDLE to continue.
- * - COMPLETE: complete
- *
- * We use the existing Push and PushReply messages and structures to
- * handle actually shuffling the data over to the replicas. recovery_info
- * and recovery_progress are expressed in terms of the logical offset
- * space except for data_included which is in terms of the chunked object
- * space (to match the passed buffer).
- *
- * xattrs are requested on the first read and used to initialize the
- * object_context if missing on completion of the first read.
- *
- * In order to batch up reads and writes, we batch Push, PushReply,
- * Transaction, and reads in a RecoveryMessages object which is passed
- * among the recovery methods.
- */
- public:
- struct RecoveryBackend {
- CephContext *cct;
- const coll_t &coll;
- ceph::ErasureCodeInterfaceRef ec_impl;
- const ECUtil::stripe_info_t &sinfo;
- ReadPipeline &read_pipeline;
- // TODO: lay an interface down here
- ECListener *parent;
- ECBackend *ecbackend;
-
- ECListener *get_parent() const { return parent; }
-
- const OSDMapRef &get_osdmap() const {
- return get_parent()->pgb_get_osdmap();
- }
-
- epoch_t get_osdmap_epoch() const {
- return get_parent()->pgb_get_osdmap_epoch();
- }
-
- const pg_info_t &get_info() { return get_parent()->get_info(); }
- void add_temp_obj(const hobject_t &oid) { get_parent()->add_temp_obj(oid); }
-
- void clear_temp_obj(const hobject_t &oid) {
- get_parent()->clear_temp_obj(oid);
- }
-
- RecoveryBackend(CephContext *cct,
- const coll_t &coll,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t &sinfo,
- ReadPipeline &read_pipeline,
- ECListener *parent,
- ECBackend *ecbackend);
-
- struct RecoveryOp {
- hobject_t hoid;
- eversion_t v;
- std::set<pg_shard_t> missing_on;
- shard_id_set missing_on_shards;
-
- ObjectRecoveryInfo recovery_info;
- ObjectRecoveryProgress recovery_progress;
-
- enum state_t { IDLE, READING, WRITING, COMPLETE } state;
-
- static const char *tostr(state_t state) {
- switch (state) {
- case RecoveryOp::IDLE:
- return "IDLE";
- case RecoveryOp::READING:
- return "READING";
- case RecoveryOp::WRITING:
- return "WRITING";
- case RecoveryOp::COMPLETE:
- return "COMPLETE";
- default:
- ceph_abort();
- return "";
- }
- }
-
- // must be filled if state == WRITING
- std::optional<ECUtil::shard_extent_map_t> returned_data;
- std::map<std::string, ceph::buffer::list, std::less<>> xattrs;
- ObjectContextRef obc;
- std::set<pg_shard_t> waiting_on_pushes;
-
- void dump(ceph::Formatter *f) const;
-
- RecoveryOp() : state(IDLE) {}
-
- void print(std::ostream &os) const {
- os << "RecoveryOp("
- << "hoid=" << hoid
- << " v=" << v
- << " missing_on=" << missing_on
- << " missing_on_shards=" << missing_on_shards
- << " recovery_info=" << recovery_info
- << " recovery_progress=" << recovery_progress
- << " obc refcount=" << obc.use_count()
- << " state=" << ECBackend::RecoveryBackend::RecoveryOp::tostr(state)
- << " waiting_on_pushes=" << waiting_on_pushes
- << ")";
- }
- };
-
- std::map<hobject_t, RecoveryOp> recovery_ops;
-
- uint64_t get_recovery_chunk_size() const {
- return round_up_to(cct->_conf->osd_recovery_max_chunk,
- sinfo.get_stripe_width());
- }
-
- virtual ~RecoveryBackend() = default;
- virtual void commit_txn_send_replies(
- ceph::os::Transaction &&txn,
- std::map<int, MOSDPGPushReply*> replies) = 0;
- void dispatch_recovery_messages(RecoveryMessages &m, int priority);
-
- PGBackend::RecoveryHandle *open_recovery_op();
- void run_recovery_op(
- struct ECRecoveryHandle &h,
- int priority);
- int recover_object(
- const hobject_t &hoid,
- eversion_t v,
- ObjectContextRef head,
- ObjectContextRef obc,
- PGBackend::RecoveryHandle *h);
- void continue_recovery_op(
- RecoveryBackend::RecoveryOp &op,
- RecoveryMessages *m);
- void update_object_size_after_read(
- uint64_t size,
- read_result_t &res,
- read_request_t &req);
- void handle_recovery_read_complete(
- const hobject_t &hoid,
- read_result_t &&res,
- read_request_t &req,
- RecoveryMessages *m);
- void handle_recovery_push(
- const PushOp &op,
- RecoveryMessages *m,
- bool is_repair);
- void handle_recovery_push_reply(
- const PushReplyOp &op,
- pg_shard_t from,
- RecoveryMessages *m);
- friend struct RecoveryMessages;
- void _failed_push(const hobject_t &hoid, ECCommon::read_result_t &res);
- };
-
struct ECRecoveryBackend : RecoveryBackend {
ECRecoveryBackend(CephContext *cct,
const coll_t &coll,
//forward declaration
struct ECSubWrite;
struct PGLog;
+struct RecoveryMessages;
struct ECCommon {
struct ec_extent_t {
extent_cache(*this, ec_extent_cache_lru, sinfo, cct),
ec_pdw_write_mode(cct->_conf.get_val<uint64_t>("ec_pdw_write_mode")) {}
};
+
+
+ /**
+ * Recovery
+ *
+ * Recovery uses the same underlying read mechanism as client reads
+ * with the slight difference that recovery reads may come from non
+ * acting shards. Thus, check_recovery_sources may wind up calling
+ * cancel_pull for a read originating with RecoveryOp.
+ *
+ * The recovery process is expressed as a state machine:
+ * - IDLE: Nothing is currently in progress, reads will be started and
+ * we will transition to READING
+ * - READING: We are awaiting a pending read op. Once complete, we will
+ * decode the buffers and proceed to WRITING
+ * - WRITING: We are awaiting a completed push. Once complete, we will
+ * either transition to COMPLETE or to IDLE to continue.
+ * - COMPLETE: complete
+ *
+ * We use the existing Push and PushReply messages and structures to
+ * handle actually shuffling the data over to the replicas. recovery_info
+ * and recovery_progress are expressed in terms of the logical offset
+ * space except for data_included which is in terms of the chunked object
+ * space (to match the passed buffer).
+ *
+ * xattrs are requested on the first read and used to initialize the
+ * object_context if missing on completion of the first read.
+ *
+ * In order to batch up reads and writes, we batch Push, PushReply,
+ * Transaction, and reads in a RecoveryMessages object which is passed
+ * among the recovery methods.
+ */
+ public:
+ struct RecoveryBackend {
+ CephContext *cct;
+ const coll_t &coll;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t &sinfo;
+ ReadPipeline &read_pipeline;
+ // TODO: lay an interface down here
+ ECListener *parent;
+ ECBackend *ecbackend;
+
+ ECListener *get_parent() const { return parent; }
+
+ const OSDMapRef &get_osdmap() const {
+ return get_parent()->pgb_get_osdmap();
+ }
+
+ epoch_t get_osdmap_epoch() const {
+ return get_parent()->pgb_get_osdmap_epoch();
+ }
+
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+ void add_temp_obj(const hobject_t &oid) { get_parent()->add_temp_obj(oid); }
+
+ void clear_temp_obj(const hobject_t &oid) {
+ get_parent()->clear_temp_obj(oid);
+ }
+
+ RecoveryBackend(CephContext *cct,
+ const coll_t &coll,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t &sinfo,
+ ReadPipeline &read_pipeline,
+ ECListener *parent,
+ ECBackend *ecbackend);
+
+ struct RecoveryOp {
+ hobject_t hoid;
+ eversion_t v;
+ std::set<pg_shard_t> missing_on;
+ shard_id_set missing_on_shards;
+
+ ObjectRecoveryInfo recovery_info;
+ ObjectRecoveryProgress recovery_progress;
+
+ enum state_t { IDLE, READING, WRITING, COMPLETE } state;
+
+ static const char *tostr(state_t state) {
+ switch (state) {
+ case RecoveryOp::IDLE:
+ return "IDLE";
+ case RecoveryOp::READING:
+ return "READING";
+ case RecoveryOp::WRITING:
+ return "WRITING";
+ case RecoveryOp::COMPLETE:
+ return "COMPLETE";
+ default:
+ ceph_abort();
+ return "";
+ }
+ }
+
+ // must be filled if state == WRITING
+ std::optional<ECUtil::shard_extent_map_t> returned_data;
+ std::map<std::string, ceph::buffer::list, std::less<>> xattrs;
+ ObjectContextRef obc;
+ std::set<pg_shard_t> waiting_on_pushes;
+
+ void dump(ceph::Formatter *f) const;
+
+ RecoveryOp() : state(IDLE) {}
+
+ void print(std::ostream &os) const {
+ os << "RecoveryOp("
+ << "hoid=" << hoid
+ << " v=" << v
+ << " missing_on=" << missing_on
+ << " missing_on_shards=" << missing_on_shards
+ << " recovery_info=" << recovery_info
+ << " recovery_progress=" << recovery_progress
+ << " obc refcount=" << obc.use_count()
+ << " state=" << ECBackend::RecoveryBackend::RecoveryOp::tostr(state)
+ << " waiting_on_pushes=" << waiting_on_pushes
+ << ")";
+ }
+ };
+
+ std::map<hobject_t, RecoveryOp> recovery_ops;
+
+ uint64_t get_recovery_chunk_size() const {
+ return round_up_to(cct->_conf->osd_recovery_max_chunk,
+ sinfo.get_stripe_width());
+ }
+
+ virtual ~RecoveryBackend() = default;
+ virtual void commit_txn_send_replies(
+ ceph::os::Transaction &&txn,
+ std::map<int, MOSDPGPushReply*> replies) = 0;
+ void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+
+ PGBackend::RecoveryHandle *open_recovery_op();
+ void run_recovery_op(
+ struct ECRecoveryHandle &h,
+ int priority);
+ int recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ PGBackend::RecoveryHandle *h);
+ void continue_recovery_op(
+ RecoveryBackend::RecoveryOp &op,
+ RecoveryMessages *m);
+ void update_object_size_after_read(
+ uint64_t size,
+ read_result_t &res,
+ read_request_t &req);
+ void handle_recovery_read_complete(
+ const hobject_t &hoid,
+ read_result_t &&res,
+ read_request_t &req,
+ RecoveryMessages *m);
+ void handle_recovery_push(
+ const PushOp &op,
+ RecoveryMessages *m,
+ bool is_repair);
+ void handle_recovery_push_reply(
+ const PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m);
+ friend struct RecoveryMessages;
+ void _failed_push(const hobject_t &hoid, ECCommon::read_result_t &res);
+ };
+
};
template <class F, class G>