std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
); ///< @return error code, 0 on success
};
+
+ /**
+ * Client writes
+ *
+ * ECTransaction is responsible for generating a transaction for
+ * each shard to which we need to send the write. As required
+ * by the PGBackend interface, the ECBackend write mechanism
+ * passes trim information with the write and last_complete back
+ * with the reply.
+ *
+ * As with client reads, there is a possibility of out-of-order
+ * completions. Thus, callbacks and completion are called in order
+ * on the writing std::list.
+ */
+
+ struct RMWPipeline {
+ struct Op : boost::intrusive::list_base_hook<> {
+ /// From submit_transaction caller, describes operation
+ hobject_t hoid;
+ object_stat_sum_t delta_stats;
+ eversion_t version;
+ eversion_t trim_to;
+ std::optional<pg_hit_set_history_t> updated_hit_set_history;
+ std::vector<pg_log_entry_t> log_entries;
+ ceph_tid_t tid;
+ osd_reqid_t reqid;
+ ZTracer::Trace trace;
+
+ eversion_t roll_forward_to; /// Soon to be generated internally
+
+ /// Ancillary also provided from submit_transaction caller
+ std::map<hobject_t, ObjectContextRef> obc_map;
+
+ /// see call_write_ordered
+ std::list<std::function<void(void)> > on_write;
+
+ /// Generated internally
+ std::set<hobject_t> temp_added;
+ std::set<hobject_t> temp_cleared;
+
+ ECTransaction::WritePlan plan;
+ bool requires_rmw() const { return !plan.to_read.empty(); }
+ bool invalidates_cache() const { return plan.invalidates_cache; }
+
+ // must be true if requires_rmw(), must be false if invalidates_cache()
+ bool using_cache = true;
+
+ /// In progress read state;
+ std::map<hobject_t,extent_set> pending_read; // subset already being read
+ std::map<hobject_t,extent_set> remote_read; // subset we must read
+ std::map<hobject_t,extent_map> remote_read_result;
+ bool read_in_progress() const {
+ return !remote_read.empty() && remote_read_result.empty();
+ }
+
+ /// In progress write state.
+ std::set<pg_shard_t> pending_commit;
+ // we need pending_apply for pre-mimic peers so that we don't issue a
+ // read on a remote shard before it has applied a previous write. We can
+ // remove this after nautilus.
+ std::set<pg_shard_t> pending_apply;
+ bool write_in_progress() const {
+ return !pending_commit.empty() || !pending_apply.empty();
+ }
+
+ /// optional, may be null, for tracking purposes
+ OpRequestRef client_op;
+
+ /// pin for cache
+ ExtentCache::write_pin pin;
+
+ /// Callbacks
+ Context *on_all_commit = nullptr;
+ virtual ~Op() {
+ delete on_all_commit;
+ }
+
+ virtual void generate_transactions(
+ ceph::ErasureCodeInterfaceRef &ecimpl,
+ pg_t pgid,
+ const ECUtil::stripe_info_t &sinfo,
+ std::map<hobject_t,extent_map> *written,
+ std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+ DoutPrefixProvider *dpp,
+ const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
+ };
+ using OpRef = std::unique_ptr<Op>;
+ using op_list = boost::intrusive::list<Op>;
+ friend ostream &operator<<(ostream &lhs, const Op &rhs);
+
+ ExtentCache cache;
+ std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
+ /**
+ * We model the possible rmw states as a std::set of waitlists.
+ * All writes at this time complete in order, so a write blocked
+ * at waiting_state blocks all writes behind it as well (same for
+ * other states).
+ *
+ * Future work: We can break this up into a per-object pipeline
+ * (almost). First, provide an ordering token to submit_transaction
+ * and require that all operations within a single transaction take
+ * place on a subset of hobject_t space partitioned by that token
+ * (the hashid seem about right to me -- even works for temp objects
+ * if you recall that a temp object created for object head foo will
+ * only ever be referenced by other transactions on foo and aren't
+ * reused). Next, factor this part into a class and maintain one per
+ * ordering token. Next, fixup PrimaryLogPG's repop queue to be
+ * partitioned by ordering token. Finally, refactor the op pipeline
+ * so that the log entries passed into submit_transaction aren't
+ * versioned. We can't assign versions to them until we actually
+ * submit the operation. That's probably going to be the hard part.
+ */
+ class pipeline_state_t {
+ enum {
+ CACHE_VALID = 0,
+ CACHE_INVALID = 1
+ } pipeline_state = CACHE_VALID;
+ public:
+ bool caching_enabled() const {
+ return pipeline_state == CACHE_VALID;
+ }
+ bool cache_invalid() const {
+ return !caching_enabled();
+ }
+ void invalidate() {
+ pipeline_state = CACHE_INVALID;
+ }
+ void clear() {
+ pipeline_state = CACHE_VALID;
+ }
+ friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+ } pipeline_state;
+
+ op_list waiting_state; /// writes waiting on pipe_state
+ op_list waiting_reads; /// writes waiting on partial stripe reads
+ op_list waiting_commit; /// writes waiting on initial commit
+ eversion_t completed_to;
+ eversion_t committed_to;
+ void start_rmw(OpRef op);
+ bool try_state_to_reads();
+ bool try_reads_to_commit();
+ bool try_finish_rmw();
+ void check_ops();
+
+ void on_change();
+ void call_write_ordered(std::function<void(void)> &&cb);
+
+ CephContext* cct;
+ ECListener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ template <typename Func>
+ void objects_read_async_no_cache(
+ const std::map<hobject_t,extent_set> &to_read,
+ Func &&on_complete
+ ) {
+ std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+ for (auto &&hpair: to_read) {
+ auto &l = _to_read[hpair.first];
+ for (auto extent: hpair.second) {
+ l.emplace_back(extent.first, extent.second, 0);
+ }
+ }
+ ec_backend.objects_read_and_reconstruct(
+ _to_read,
+ false,
+ make_gen_lambda_context<
+ std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
+ std::forward<Func>(on_complete)));
+ }
+ void handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace
+ ) {
+ ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+ }
+ // end of iface
+
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ ECListener* parent;
+ ECCommon& ec_backend;
+
+ RMWPipeline(CephContext* cct,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ ECListener* parent,
+ ECCommon& ec_backend)
+ : cct(cct),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ parent(parent),
+ ec_backend(ec_backend) {
+ }
+ };
};
class ECBackend : public PGBackend, public ECCommon {
public:
struct ReadPipeline read_pipeline;
-
-
- /**
- * Client writes
- *
- * ECTransaction is responsible for generating a transaction for
- * each shard to which we need to send the write. As required
- * by the PGBackend interface, the ECBackend write mechanism
- * passes trim information with the write and last_complete back
- * with the reply.
- *
- * As with client reads, there is a possibility of out-of-order
- * completions. Thus, callbacks and completion are called in order
- * on the writing std::list.
- */
-
- struct RMWPipeline {
- struct Op : boost::intrusive::list_base_hook<> {
- /// From submit_transaction caller, describes operation
- hobject_t hoid;
- object_stat_sum_t delta_stats;
- eversion_t version;
- eversion_t trim_to;
- std::optional<pg_hit_set_history_t> updated_hit_set_history;
- std::vector<pg_log_entry_t> log_entries;
- ceph_tid_t tid;
- osd_reqid_t reqid;
- ZTracer::Trace trace;
-
- eversion_t roll_forward_to; /// Soon to be generated internally
-
- /// Ancillary also provided from submit_transaction caller
- std::map<hobject_t, ObjectContextRef> obc_map;
-
- /// see call_write_ordered
- std::list<std::function<void(void)> > on_write;
-
- /// Generated internally
- std::set<hobject_t> temp_added;
- std::set<hobject_t> temp_cleared;
-
- ECTransaction::WritePlan plan;
- bool requires_rmw() const { return !plan.to_read.empty(); }
- bool invalidates_cache() const { return plan.invalidates_cache; }
-
- // must be true if requires_rmw(), must be false if invalidates_cache()
- bool using_cache = true;
-
- /// In progress read state;
- std::map<hobject_t,extent_set> pending_read; // subset already being read
- std::map<hobject_t,extent_set> remote_read; // subset we must read
- std::map<hobject_t,extent_map> remote_read_result;
- bool read_in_progress() const {
- return !remote_read.empty() && remote_read_result.empty();
- }
-
- /// In progress write state.
- std::set<pg_shard_t> pending_commit;
- // we need pending_apply for pre-mimic peers so that we don't issue a
- // read on a remote shard before it has applied a previous write. We can
- // remove this after nautilus.
- std::set<pg_shard_t> pending_apply;
- bool write_in_progress() const {
- return !pending_commit.empty() || !pending_apply.empty();
- }
-
- /// optional, may be null, for tracking purposes
- OpRequestRef client_op;
-
- /// pin for cache
- ExtentCache::write_pin pin;
-
- /// Callbacks
- Context *on_all_commit = nullptr;
- virtual ~Op() {
- delete on_all_commit;
- }
-
- virtual void generate_transactions(
- ceph::ErasureCodeInterfaceRef &ecimpl,
- pg_t pgid,
- const ECUtil::stripe_info_t &sinfo,
- std::map<hobject_t,extent_map> *written,
- std::map<shard_id_t, ObjectStore::Transaction> *transactions,
- DoutPrefixProvider *dpp,
- const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
- };
- using OpRef = std::unique_ptr<Op>;
- using op_list = boost::intrusive::list<Op>;
- friend ostream &operator<<(ostream &lhs, const Op &rhs);
-
- ExtentCache cache;
- std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
- /**
- * We model the possible rmw states as a std::set of waitlists.
- * All writes at this time complete in order, so a write blocked
- * at waiting_state blocks all writes behind it as well (same for
- * other states).
- *
- * Future work: We can break this up into a per-object pipeline
- * (almost). First, provide an ordering token to submit_transaction
- * and require that all operations within a single transaction take
- * place on a subset of hobject_t space partitioned by that token
- * (the hashid seem about right to me -- even works for temp objects
- * if you recall that a temp object created for object head foo will
- * only ever be referenced by other transactions on foo and aren't
- * reused). Next, factor this part into a class and maintain one per
- * ordering token. Next, fixup PrimaryLogPG's repop queue to be
- * partitioned by ordering token. Finally, refactor the op pipeline
- * so that the log entries passed into submit_transaction aren't
- * versioned. We can't assign versions to them until we actually
- * submit the operation. That's probably going to be the hard part.
- */
- class pipeline_state_t {
- enum {
- CACHE_VALID = 0,
- CACHE_INVALID = 1
- } pipeline_state = CACHE_VALID;
- public:
- bool caching_enabled() const {
- return pipeline_state == CACHE_VALID;
- }
- bool cache_invalid() const {
- return !caching_enabled();
- }
- void invalidate() {
- pipeline_state = CACHE_INVALID;
- }
- void clear() {
- pipeline_state = CACHE_VALID;
- }
- friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
- } pipeline_state;
-
- op_list waiting_state; /// writes waiting on pipe_state
- op_list waiting_reads; /// writes waiting on partial stripe reads
- op_list waiting_commit; /// writes waiting on initial commit
- eversion_t completed_to;
- eversion_t committed_to;
- void start_rmw(OpRef op);
- bool try_state_to_reads();
- bool try_reads_to_commit();
- bool try_finish_rmw();
- void check_ops();
-
- void on_change();
- void call_write_ordered(std::function<void(void)> &&cb);
-
- CephContext* cct;
- ECListener *get_parent() const { return parent; }
- const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
- epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
- const pg_info_t &get_info() { return get_parent()->get_info(); }
-
- template <typename Func>
- void objects_read_async_no_cache(
- const std::map<hobject_t,extent_set> &to_read,
- Func &&on_complete
- ) {
- std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
- for (auto &&hpair: to_read) {
- auto &l = _to_read[hpair.first];
- for (auto extent: hpair.second) {
- l.emplace_back(extent.first, extent.second, 0);
- }
- }
- ec_backend.objects_read_and_reconstruct(
- _to_read,
- false,
- make_gen_lambda_context<
- std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
- std::forward<Func>(on_complete)));
- }
- void handle_sub_write(
- pg_shard_t from,
- OpRequestRef msg,
- ECSubWrite &op,
- const ZTracer::Trace &trace
- ) {
- ec_backend.handle_sub_write(from, std::move(msg), op, trace);
- }
- // end of iface
-
- ceph::ErasureCodeInterfaceRef ec_impl;
- const ECUtil::stripe_info_t& sinfo;
- ECListener* parent;
- ECCommon& ec_backend;
-
- RMWPipeline(CephContext* cct,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t& sinfo,
- ECListener* parent,
- ECCommon& ec_backend)
- : cct(cct),
- ec_impl(std::move(ec_impl)),
- sinfo(sinfo),
- parent(parent),
- ec_backend(ec_backend) {
- }
- } rmw_pipeline;
+ struct RMWPipeline rmw_pipeline;
ceph::ErasureCodeInterfaceRef ec_impl;