#include <boost/intrusive/set.hpp>
#include <boost/intrusive/list.hpp>
+#include "ECCommon.h"
#include "OSD.h"
#include "PGBackend.h"
#include "erasure-code/ErasureCodeInterface.h"
struct RecoveryMessages;
-// ECListener -- an interface decoupling the pipelines from
-// particular implementation of ECBackend (crimson vs cassical).
-// https://stackoverflow.com/q/7872958
-struct ECListener {
- virtual ~ECListener() = default;
- virtual const OSDMapRef& pgb_get_osdmap() const = 0;
- virtual epoch_t pgb_get_osdmap_epoch() const = 0;
- virtual const pg_info_t &get_info() const = 0;
- /**
- * Called when a pull on soid cannot be completed due to
- * down peers
- */
- // XXX
- virtual void cancel_pull(
- const hobject_t &soid) = 0;
- // XXX
- virtual void schedule_recovery_work(
- GenContext<ThreadPool::TPHandle&> *c,
- uint64_t cost) = 0;
-
- virtual epoch_t get_interval_start_epoch() const = 0;
- virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
- virtual const std::set<pg_shard_t> &get_backfill_shards() const = 0;
- virtual const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards()
- const = 0;
-
- virtual const std::map<pg_shard_t,
- pg_missing_t> &get_shard_missing() const = 0;
- virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
-#if 1
- virtual const pg_missing_const_i * maybe_get_shard_missing(
- pg_shard_t peer) const = 0;
- virtual const pg_info_t &get_shard_info(pg_shard_t peer) const = 0;
-#endif
- virtual ceph_tid_t get_tid() = 0;
- virtual pg_shard_t whoami_shard() const = 0;
-#if 0
- int whoami() const {
- return whoami_shard().osd;
- }
- spg_t whoami_spg_t() const {
- return get_info().pgid;
- }
-#endif
- // XXX
- virtual void send_message_osd_cluster(
- std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
-
- virtual std::ostream& gen_dbg_prefix(std::ostream& out) const = 0;
-
- // RMWPipeline
- virtual const pg_pool_t &get_pool() const = 0;
- virtual const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const = 0;
- // XXX
- virtual bool should_send_op(
- pg_shard_t peer,
- const hobject_t &hoid) = 0;
- virtual const std::map<pg_shard_t, pg_info_t> &get_shard_info() const = 0;
- virtual spg_t primary_spg_t() const = 0;
- virtual const PGLog &get_log() const = 0;
- virtual DoutPrefixProvider *get_dpp() = 0;
- // XXX
- virtual void apply_stats(
- const hobject_t &soid,
- const object_stat_sum_t &delta_stats) = 0;
-};
-
-struct ECCommon {
- virtual ~ECCommon() = default;
-
- virtual void handle_sub_write(
- pg_shard_t from,
- OpRequestRef msg,
- ECSubWrite &op,
- const ZTracer::Trace &trace
- ) = 0;
-
- virtual void objects_read_and_reconstruct(
- const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
- > &reads,
- bool fast_read,
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func) = 0;
-
- struct read_request_t {
- const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
- bool want_attrs;
- read_request_t(
- const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
- const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
- bool want_attrs)
- : to_read(to_read), need(need), want_attrs(want_attrs) {}
- };
- friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
- struct ReadOp;
- /**
- * Low level async read mechanism
- *
- * To avoid duplicating the logic for requesting and waiting for
- * multiple object shards, there is a common async read mechanism
- * taking a std::map of hobject_t->read_request_t which defines callbacks
- * taking read_result_ts as arguments.
- *
- * tid_to_read_map gives open read ops. check_recovery_sources uses
- * shard_to_read_map and ReadOp::source_to_obj to restart reads
- * involving down osds.
- *
- * The user is responsible for specifying replicas on which to read
- * and for reassembling the buffer on the other side since client
- * reads require the original object buffer while recovery only needs
- * the missing pieces.
- *
- * Rather than handling reads on the primary directly, we simply send
- * ourselves a message. This avoids a dedicated primary path for that
- * part.
- */
- struct read_result_t {
- int r;
- std::map<pg_shard_t, int> errors;
- std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
- std::list<
- boost::tuple<
- uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
- read_result_t() : r(0) {}
- };
-
- struct ReadCompleter {
- virtual void finish_single_request(
- const hobject_t &hoid,
- read_result_t &res,
- std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
-
- virtual void finish(int priority) && = 0;
-
- virtual ~ReadCompleter() = default;
- };
-
- friend struct CallClientContexts;
- struct ClientAsyncReadStatus {
- unsigned objects_to_read;
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
- std::map<hobject_t,std::pair<int, extent_map> > results;
- explicit ClientAsyncReadStatus(
- unsigned objects_to_read,
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
- : objects_to_read(objects_to_read), func(std::move(func)) {}
- void complete_object(
- const hobject_t &hoid,
- int err,
- extent_map &&buffers) {
- ceph_assert(objects_to_read);
- --objects_to_read;
- ceph_assert(!results.count(hoid));
- results.emplace(hoid, std::make_pair(err, std::move(buffers)));
- }
- bool is_complete() const {
- return objects_to_read == 0;
- }
- void run() {
- func.release()->complete(std::move(results));
- }
- };
-
- struct ReadOp {
- int priority;
- ceph_tid_t tid;
- OpRequestRef op; // may be null if not on behalf of a client
- // True if redundant reads are issued, false otherwise,
- // this is useful to tradeoff some resources (redundant ops) for
- // low latency read, especially on relatively idle cluster
- bool do_redundant_reads;
- // True if reading for recovery which could possibly reading only a subset
- // of the available shards.
- bool for_recovery;
- std::unique_ptr<ReadCompleter> on_complete;
-
- ZTracer::Trace trace;
-
- std::map<hobject_t, std::set<int>> want_to_read;
- std::map<hobject_t, read_request_t> to_read;
- std::map<hobject_t, read_result_t> complete;
-
- std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
- std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
-
- void dump(ceph::Formatter *f) const;
-
- std::set<pg_shard_t> in_progress;
-
- ReadOp(
- int priority,
- ceph_tid_t tid,
- bool do_redundant_reads,
- bool for_recovery,
- std::unique_ptr<ReadCompleter> _on_complete,
- OpRequestRef op,
- std::map<hobject_t, std::set<int>> &&_want_to_read,
- std::map<hobject_t, read_request_t> &&_to_read)
- : priority(priority),
- tid(tid),
- op(op),
- do_redundant_reads(do_redundant_reads),
- for_recovery(for_recovery),
- on_complete(std::move(_on_complete)),
- want_to_read(std::move(_want_to_read)),
- to_read(std::move(_to_read)) {
- for (auto &&hpair: to_read) {
- auto &returned = complete[hpair.first].returned;
- for (auto &&extent: hpair.second.to_read) {
- returned.push_back(
- boost::make_tuple(
- extent.get<0>(),
- extent.get<1>(),
- std::map<pg_shard_t, ceph::buffer::list>()));
- }
- }
- }
- ReadOp() = delete;
- ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
- ReadOp(ReadOp &&) = default;
- };
- struct ReadPipeline {
- void objects_read_and_reconstruct(
- const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
- > &reads,
- bool fast_read,
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
-
- template <class F>
- void filter_read_op(
- const OSDMapRef& osdmap,
- ReadOp &op,
- F&& on_erase);
-
- template <class F>
- void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
-
- void complete_read_op(ReadOp &rop);
-
- void start_read_op(
- int priority,
- std::map<hobject_t, std::set<int>> &want_to_read,
- std::map<hobject_t, read_request_t> &to_read,
- OpRequestRef op,
- bool do_redundant_reads,
- bool for_recovery,
- std::unique_ptr<ReadCompleter> on_complete);
-
- void do_read_op(ReadOp &rop);
-
- int send_all_remaining_reads(
- const hobject_t &hoid,
- ReadOp &rop);
-
- void on_change();
-
- void kick_reads();
-
- std::map<ceph_tid_t, ReadOp> tid_to_read_map;
- std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
- std::list<ClientAsyncReadStatus> in_progress_client_reads;
-
- CephContext* cct;
- ceph::ErasureCodeInterfaceRef ec_impl;
- const ECUtil::stripe_info_t& sinfo;
- // TODO: lay an interface down here
- ECListener* parent;
-
- ECListener *get_parent() const { return parent; }
- const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
- epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
- const pg_info_t &get_info() { return get_parent()->get_info(); }
-
- ReadPipeline(CephContext* cct,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t& sinfo,
- ECListener* parent)
- : cct(cct),
- ec_impl(std::move(ec_impl)),
- sinfo(sinfo),
- parent(parent) {
- }
-
- int get_remaining_shards(
- const hobject_t &hoid,
- const std::set<int> &avail,
- const std::set<int> &want,
- const read_result_t &result,
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
- bool for_recovery);
-
- void get_all_avail_shards(
- const hobject_t &hoid,
- const std::set<pg_shard_t> &error_shards,
- std::set<int> &have,
- std::map<shard_id_t, pg_shard_t> &shards,
- bool for_recovery);
-
- friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
- friend struct FinishReadOp;
-
- void get_want_to_read_shards(std::set<int> *want_to_read) const;
-
- /// Returns to_read replicas sufficient to reconstruct want
- int get_min_avail_to_read_shards(
- const hobject_t &hoid, ///< [in] object
- const std::set<int> &want, ///< [in] desired shards
- bool for_recovery, ///< [in] true if we may use non-acting replicas
- bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
- ); ///< @return error code, 0 on success
- };
-
- /**
- * Client writes
- *
- * ECTransaction is responsible for generating a transaction for
- * each shard to which we need to send the write. As required
- * by the PGBackend interface, the ECBackend write mechanism
- * passes trim information with the write and last_complete back
- * with the reply.
- *
- * As with client reads, there is a possibility of out-of-order
- * completions. Thus, callbacks and completion are called in order
- * on the writing std::list.
- */
-
- struct RMWPipeline {
- struct Op : boost::intrusive::list_base_hook<> {
- /// From submit_transaction caller, describes operation
- hobject_t hoid;
- object_stat_sum_t delta_stats;
- eversion_t version;
- eversion_t trim_to;
- std::optional<pg_hit_set_history_t> updated_hit_set_history;
- std::vector<pg_log_entry_t> log_entries;
- ceph_tid_t tid;
- osd_reqid_t reqid;
- ZTracer::Trace trace;
-
- eversion_t roll_forward_to; /// Soon to be generated internally
-
- /// Ancillary also provided from submit_transaction caller
- std::map<hobject_t, ObjectContextRef> obc_map;
-
- /// see call_write_ordered
- std::list<std::function<void(void)> > on_write;
-
- /// Generated internally
- std::set<hobject_t> temp_added;
- std::set<hobject_t> temp_cleared;
-
- ECTransaction::WritePlan plan;
- bool requires_rmw() const { return !plan.to_read.empty(); }
- bool invalidates_cache() const { return plan.invalidates_cache; }
-
- // must be true if requires_rmw(), must be false if invalidates_cache()
- bool using_cache = true;
-
- /// In progress read state;
- std::map<hobject_t,extent_set> pending_read; // subset already being read
- std::map<hobject_t,extent_set> remote_read; // subset we must read
- std::map<hobject_t,extent_map> remote_read_result;
- bool read_in_progress() const {
- return !remote_read.empty() && remote_read_result.empty();
- }
-
- /// In progress write state.
- std::set<pg_shard_t> pending_commit;
- // we need pending_apply for pre-mimic peers so that we don't issue a
- // read on a remote shard before it has applied a previous write. We can
- // remove this after nautilus.
- std::set<pg_shard_t> pending_apply;
- bool write_in_progress() const {
- return !pending_commit.empty() || !pending_apply.empty();
- }
-
- /// optional, may be null, for tracking purposes
- OpRequestRef client_op;
-
- /// pin for cache
- ExtentCache::write_pin pin;
-
- /// Callbacks
- Context *on_all_commit = nullptr;
- virtual ~Op() {
- delete on_all_commit;
- }
-
- virtual void generate_transactions(
- ceph::ErasureCodeInterfaceRef &ecimpl,
- pg_t pgid,
- const ECUtil::stripe_info_t &sinfo,
- std::map<hobject_t,extent_map> *written,
- std::map<shard_id_t, ObjectStore::Transaction> *transactions,
- DoutPrefixProvider *dpp,
- const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
- };
- using OpRef = std::unique_ptr<Op>;
- using op_list = boost::intrusive::list<Op>;
- friend ostream &operator<<(ostream &lhs, const Op &rhs);
-
- ExtentCache cache;
- std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
- /**
- * We model the possible rmw states as a std::set of waitlists.
- * All writes at this time complete in order, so a write blocked
- * at waiting_state blocks all writes behind it as well (same for
- * other states).
- *
- * Future work: We can break this up into a per-object pipeline
- * (almost). First, provide an ordering token to submit_transaction
- * and require that all operations within a single transaction take
- * place on a subset of hobject_t space partitioned by that token
- * (the hashid seem about right to me -- even works for temp objects
- * if you recall that a temp object created for object head foo will
- * only ever be referenced by other transactions on foo and aren't
- * reused). Next, factor this part into a class and maintain one per
- * ordering token. Next, fixup PrimaryLogPG's repop queue to be
- * partitioned by ordering token. Finally, refactor the op pipeline
- * so that the log entries passed into submit_transaction aren't
- * versioned. We can't assign versions to them until we actually
- * submit the operation. That's probably going to be the hard part.
- */
- class pipeline_state_t {
- enum {
- CACHE_VALID = 0,
- CACHE_INVALID = 1
- } pipeline_state = CACHE_VALID;
- public:
- bool caching_enabled() const {
- return pipeline_state == CACHE_VALID;
- }
- bool cache_invalid() const {
- return !caching_enabled();
- }
- void invalidate() {
- pipeline_state = CACHE_INVALID;
- }
- void clear() {
- pipeline_state = CACHE_VALID;
- }
- friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
- } pipeline_state;
-
- op_list waiting_state; /// writes waiting on pipe_state
- op_list waiting_reads; /// writes waiting on partial stripe reads
- op_list waiting_commit; /// writes waiting on initial commit
- eversion_t completed_to;
- eversion_t committed_to;
- void start_rmw(OpRef op);
- bool try_state_to_reads();
- bool try_reads_to_commit();
- bool try_finish_rmw();
- void check_ops();
-
- void on_change();
- void call_write_ordered(std::function<void(void)> &&cb);
-
- CephContext* cct;
- ECListener *get_parent() const { return parent; }
- const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
- epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
- const pg_info_t &get_info() { return get_parent()->get_info(); }
-
- template <typename Func>
- void objects_read_async_no_cache(
- const std::map<hobject_t,extent_set> &to_read,
- Func &&on_complete
- ) {
- std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
- for (auto &&hpair: to_read) {
- auto &l = _to_read[hpair.first];
- for (auto extent: hpair.second) {
- l.emplace_back(extent.first, extent.second, 0);
- }
- }
- ec_backend.objects_read_and_reconstruct(
- _to_read,
- false,
- make_gen_lambda_context<
- std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
- std::forward<Func>(on_complete)));
- }
- void handle_sub_write(
- pg_shard_t from,
- OpRequestRef msg,
- ECSubWrite &op,
- const ZTracer::Trace &trace
- ) {
- ec_backend.handle_sub_write(from, std::move(msg), op, trace);
- }
- // end of iface
-
- ceph::ErasureCodeInterfaceRef ec_impl;
- const ECUtil::stripe_info_t& sinfo;
- ECListener* parent;
- ECCommon& ec_backend;
-
- RMWPipeline(CephContext* cct,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t& sinfo,
- ECListener* parent,
- ECCommon& ec_backend)
- : cct(cct),
- ec_impl(std::move(ec_impl)),
- sinfo(sinfo),
- parent(parent),
- ec_backend(ec_backend) {
- }
- };
-};
-
class ECBackend : public PGBackend, public ECCommon {
public:
RecoveryHandle *open_recovery_op() override;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/list.hpp>
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "erasure-code/ErasureCodeInterface.h"
+#include "ECUtil.h"
+#include "ECTransaction.h"
+#include "ExtentCache.h"
+
+//forward declaration
+struct ECSubWrite;
+struct ECSubWriteReply;
+struct ECSubRead;
+struct ECSubReadReply;
+
+// ECListener -- an interface decoupling the pipelines from
+// particular implementation of ECBackend (crimson vs cassical).
+// https://stackoverflow.com/q/7872958
+struct ECListener {
+ virtual ~ECListener() = default;
+ virtual const OSDMapRef& pgb_get_osdmap() const = 0;
+ virtual epoch_t pgb_get_osdmap_epoch() const = 0;
+ virtual const pg_info_t &get_info() const = 0;
+ /**
+ * Called when a pull on soid cannot be completed due to
+ * down peers
+ */
+ // XXX
+ virtual void cancel_pull(
+ const hobject_t &soid) = 0;
+ // XXX
+ virtual void schedule_recovery_work(
+ GenContext<ThreadPool::TPHandle&> *c,
+ uint64_t cost) = 0;
+
+ virtual epoch_t get_interval_start_epoch() const = 0;
+ virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
+ virtual const std::set<pg_shard_t> &get_backfill_shards() const = 0;
+ virtual const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards()
+ const = 0;
+
+ virtual const std::map<pg_shard_t,
+ pg_missing_t> &get_shard_missing() const = 0;
+ virtual const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const = 0;
+#if 1
+ virtual const pg_missing_const_i * maybe_get_shard_missing(
+ pg_shard_t peer) const = 0;
+ virtual const pg_info_t &get_shard_info(pg_shard_t peer) const = 0;
+#endif
+ virtual ceph_tid_t get_tid() = 0;
+ virtual pg_shard_t whoami_shard() const = 0;
+#if 0
+ int whoami() const {
+ return whoami_shard().osd;
+ }
+ spg_t whoami_spg_t() const {
+ return get_info().pgid;
+ }
+#endif
+ // XXX
+ virtual void send_message_osd_cluster(
+ std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) = 0;
+
+ virtual std::ostream& gen_dbg_prefix(std::ostream& out) const = 0;
+
+ // RMWPipeline
+ virtual const pg_pool_t &get_pool() const = 0;
+ virtual const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const = 0;
+ // XXX
+ virtual bool should_send_op(
+ pg_shard_t peer,
+ const hobject_t &hoid) = 0;
+ virtual const std::map<pg_shard_t, pg_info_t> &get_shard_info() const = 0;
+ virtual spg_t primary_spg_t() const = 0;
+ virtual const PGLog &get_log() const = 0;
+ virtual DoutPrefixProvider *get_dpp() = 0;
+ // XXX
+ virtual void apply_stats(
+ const hobject_t &soid,
+ const object_stat_sum_t &delta_stats) = 0;
+};
+
+struct ECCommon {
+ virtual ~ECCommon() = default;
+
+ virtual void handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace
+ ) = 0;
+
+ virtual void objects_read_and_reconstruct(
+ const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func) = 0;
+
+ struct read_request_t {
+ const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
+ bool want_attrs;
+ read_request_t(
+ const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
+ const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
+ bool want_attrs)
+ : to_read(to_read), need(need), want_attrs(want_attrs) {}
+ };
+ friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+ struct ReadOp;
+ /**
+ * Low level async read mechanism
+ *
+ * To avoid duplicating the logic for requesting and waiting for
+ * multiple object shards, there is a common async read mechanism
+ * taking a std::map of hobject_t->read_request_t which defines callbacks
+ * taking read_result_ts as arguments.
+ *
+ * tid_to_read_map gives open read ops. check_recovery_sources uses
+ * shard_to_read_map and ReadOp::source_to_obj to restart reads
+ * involving down osds.
+ *
+ * The user is responsible for specifying replicas on which to read
+ * and for reassembling the buffer on the other side since client
+ * reads require the original object buffer while recovery only needs
+ * the missing pieces.
+ *
+ * Rather than handling reads on the primary directly, we simply send
+ * ourselves a message. This avoids a dedicated primary path for that
+ * part.
+ */
+ struct read_result_t {
+ int r;
+ std::map<pg_shard_t, int> errors;
+ std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
+ std::list<
+ boost::tuple<
+ uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
+ read_result_t() : r(0) {}
+ };
+
+ struct ReadCompleter {
+ virtual void finish_single_request(
+ const hobject_t &hoid,
+ read_result_t &res,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+ virtual void finish(int priority) && = 0;
+
+ virtual ~ReadCompleter() = default;
+ };
+
+ friend struct CallClientContexts;
+ struct ClientAsyncReadStatus {
+ unsigned objects_to_read;
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
+ std::map<hobject_t,std::pair<int, extent_map> > results;
+ explicit ClientAsyncReadStatus(
+ unsigned objects_to_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
+ : objects_to_read(objects_to_read), func(std::move(func)) {}
+ void complete_object(
+ const hobject_t &hoid,
+ int err,
+ extent_map &&buffers) {
+ ceph_assert(objects_to_read);
+ --objects_to_read;
+ ceph_assert(!results.count(hoid));
+ results.emplace(hoid, std::make_pair(err, std::move(buffers)));
+ }
+ bool is_complete() const {
+ return objects_to_read == 0;
+ }
+ void run() {
+ func.release()->complete(std::move(results));
+ }
+ };
+
+ struct ReadOp {
+ int priority;
+ ceph_tid_t tid;
+ OpRequestRef op; // may be null if not on behalf of a client
+ // True if redundant reads are issued, false otherwise,
+ // this is useful to tradeoff some resources (redundant ops) for
+ // low latency read, especially on relatively idle cluster
+ bool do_redundant_reads;
+ // True if reading for recovery which could possibly reading only a subset
+ // of the available shards.
+ bool for_recovery;
+ std::unique_ptr<ReadCompleter> on_complete;
+
+ ZTracer::Trace trace;
+
+ std::map<hobject_t, std::set<int>> want_to_read;
+ std::map<hobject_t, read_request_t> to_read;
+ std::map<hobject_t, read_result_t> complete;
+
+ std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
+ std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
+
+ void dump(ceph::Formatter *f) const;
+
+ std::set<pg_shard_t> in_progress;
+
+ ReadOp(
+ int priority,
+ ceph_tid_t tid,
+ bool do_redundant_reads,
+ bool for_recovery,
+ std::unique_ptr<ReadCompleter> _on_complete,
+ OpRequestRef op,
+ std::map<hobject_t, std::set<int>> &&_want_to_read,
+ std::map<hobject_t, read_request_t> &&_to_read)
+ : priority(priority),
+ tid(tid),
+ op(op),
+ do_redundant_reads(do_redundant_reads),
+ for_recovery(for_recovery),
+ on_complete(std::move(_on_complete)),
+ want_to_read(std::move(_want_to_read)),
+ to_read(std::move(_to_read)) {
+ for (auto &&hpair: to_read) {
+ auto &returned = complete[hpair.first].returned;
+ for (auto &&extent: hpair.second.to_read) {
+ returned.push_back(
+ boost::make_tuple(
+ extent.get<0>(),
+ extent.get<1>(),
+ std::map<pg_shard_t, ceph::buffer::list>()));
+ }
+ }
+ }
+ ReadOp() = delete;
+ ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
+ ReadOp(ReadOp &&) = default;
+ };
+ struct ReadPipeline {
+ void objects_read_and_reconstruct(
+ const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+ template <class F>
+ void filter_read_op(
+ const OSDMapRef& osdmap,
+ ReadOp &op,
+ F&& on_erase);
+
+ template <class F>
+ void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+
+ void complete_read_op(ReadOp &rop);
+
+ void start_read_op(
+ int priority,
+ std::map<hobject_t, std::set<int>> &want_to_read,
+ std::map<hobject_t, read_request_t> &to_read,
+ OpRequestRef op,
+ bool do_redundant_reads,
+ bool for_recovery,
+ std::unique_ptr<ReadCompleter> on_complete);
+
+ void do_read_op(ReadOp &rop);
+
+ int send_all_remaining_reads(
+ const hobject_t &hoid,
+ ReadOp &rop);
+
+ void on_change();
+
+ void kick_reads();
+
+ std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+ std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+ std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+ CephContext* cct;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ // TODO: lay an interface down here
+ ECListener* parent;
+
+ ECListener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ ReadPipeline(CephContext* cct,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ ECListener* parent)
+ : cct(cct),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ parent(parent) {
+ }
+
+ int get_remaining_shards(
+ const hobject_t &hoid,
+ const std::set<int> &avail,
+ const std::set<int> &want,
+ const read_result_t &result,
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+ bool for_recovery);
+
+ void get_all_avail_shards(
+ const hobject_t &hoid,
+ const std::set<pg_shard_t> &error_shards,
+ std::set<int> &have,
+ std::map<shard_id_t, pg_shard_t> &shards,
+ bool for_recovery);
+
+ friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+ friend struct FinishReadOp;
+
+ void get_want_to_read_shards(std::set<int> *want_to_read) const;
+
+ /// Returns to_read replicas sufficient to reconstruct want
+ int get_min_avail_to_read_shards(
+ const hobject_t &hoid, ///< [in] object
+ const std::set<int> &want, ///< [in] desired shards
+ bool for_recovery, ///< [in] true if we may use non-acting replicas
+ bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
+ ); ///< @return error code, 0 on success
+ };
+
+ /**
+ * Client writes
+ *
+ * ECTransaction is responsible for generating a transaction for
+ * each shard to which we need to send the write. As required
+ * by the PGBackend interface, the ECBackend write mechanism
+ * passes trim information with the write and last_complete back
+ * with the reply.
+ *
+ * As with client reads, there is a possibility of out-of-order
+ * completions. Thus, callbacks and completion are called in order
+ * on the writing std::list.
+ */
+
+ struct RMWPipeline {
+ struct Op : boost::intrusive::list_base_hook<> {
+ /// From submit_transaction caller, describes operation
+ hobject_t hoid;
+ object_stat_sum_t delta_stats;
+ eversion_t version;
+ eversion_t trim_to;
+ std::optional<pg_hit_set_history_t> updated_hit_set_history;
+ std::vector<pg_log_entry_t> log_entries;
+ ceph_tid_t tid;
+ osd_reqid_t reqid;
+ ZTracer::Trace trace;
+
+ eversion_t roll_forward_to; /// Soon to be generated internally
+
+ /// Ancillary also provided from submit_transaction caller
+ std::map<hobject_t, ObjectContextRef> obc_map;
+
+ /// see call_write_ordered
+ std::list<std::function<void(void)> > on_write;
+
+ /// Generated internally
+ std::set<hobject_t> temp_added;
+ std::set<hobject_t> temp_cleared;
+
+ ECTransaction::WritePlan plan;
+ bool requires_rmw() const { return !plan.to_read.empty(); }
+ bool invalidates_cache() const { return plan.invalidates_cache; }
+
+ // must be true if requires_rmw(), must be false if invalidates_cache()
+ bool using_cache = true;
+
+ /// In progress read state;
+ std::map<hobject_t,extent_set> pending_read; // subset already being read
+ std::map<hobject_t,extent_set> remote_read; // subset we must read
+ std::map<hobject_t,extent_map> remote_read_result;
+ bool read_in_progress() const {
+ return !remote_read.empty() && remote_read_result.empty();
+ }
+
+ /// In progress write state.
+ std::set<pg_shard_t> pending_commit;
+ // we need pending_apply for pre-mimic peers so that we don't issue a
+ // read on a remote shard before it has applied a previous write. We can
+ // remove this after nautilus.
+ std::set<pg_shard_t> pending_apply;
+ bool write_in_progress() const {
+ return !pending_commit.empty() || !pending_apply.empty();
+ }
+
+ /// optional, may be null, for tracking purposes
+ OpRequestRef client_op;
+
+ /// pin for cache
+ ExtentCache::write_pin pin;
+
+ /// Callbacks
+ Context *on_all_commit = nullptr;
+ virtual ~Op() {
+ delete on_all_commit;
+ }
+
+ virtual void generate_transactions(
+ ceph::ErasureCodeInterfaceRef &ecimpl,
+ pg_t pgid,
+ const ECUtil::stripe_info_t &sinfo,
+ std::map<hobject_t,extent_map> *written,
+ std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+ DoutPrefixProvider *dpp,
+ const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
+ };
+ using OpRef = std::unique_ptr<Op>;
+ using op_list = boost::intrusive::list<Op>;
+ friend ostream &operator<<(ostream &lhs, const Op &rhs);
+
+ ExtentCache cache;
+ std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
+ /**
+ * We model the possible rmw states as a std::set of waitlists.
+ * All writes at this time complete in order, so a write blocked
+ * at waiting_state blocks all writes behind it as well (same for
+ * other states).
+ *
+ * Future work: We can break this up into a per-object pipeline
+ * (almost). First, provide an ordering token to submit_transaction
+ * and require that all operations within a single transaction take
+ * place on a subset of hobject_t space partitioned by that token
+ * (the hashid seem about right to me -- even works for temp objects
+ * if you recall that a temp object created for object head foo will
+ * only ever be referenced by other transactions on foo and aren't
+ * reused). Next, factor this part into a class and maintain one per
+ * ordering token. Next, fixup PrimaryLogPG's repop queue to be
+ * partitioned by ordering token. Finally, refactor the op pipeline
+ * so that the log entries passed into submit_transaction aren't
+ * versioned. We can't assign versions to them until we actually
+ * submit the operation. That's probably going to be the hard part.
+ */
+ class pipeline_state_t {
+ enum {
+ CACHE_VALID = 0,
+ CACHE_INVALID = 1
+ } pipeline_state = CACHE_VALID;
+ public:
+ bool caching_enabled() const {
+ return pipeline_state == CACHE_VALID;
+ }
+ bool cache_invalid() const {
+ return !caching_enabled();
+ }
+ void invalidate() {
+ pipeline_state = CACHE_INVALID;
+ }
+ void clear() {
+ pipeline_state = CACHE_VALID;
+ }
+ friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+ } pipeline_state;
+
+ op_list waiting_state; /// writes waiting on pipe_state
+ op_list waiting_reads; /// writes waiting on partial stripe reads
+ op_list waiting_commit; /// writes waiting on initial commit
+ eversion_t completed_to;
+ eversion_t committed_to;
+ void start_rmw(OpRef op);
+ bool try_state_to_reads();
+ bool try_reads_to_commit();
+ bool try_finish_rmw();
+ void check_ops();
+
+ void on_change();
+ void call_write_ordered(std::function<void(void)> &&cb);
+
+ CephContext* cct;
+ ECListener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ template <typename Func>
+ void objects_read_async_no_cache(
+ const std::map<hobject_t,extent_set> &to_read,
+ Func &&on_complete
+ ) {
+ std::map<hobject_t,std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+ for (auto &&hpair: to_read) {
+ auto &l = _to_read[hpair.first];
+ for (auto extent: hpair.second) {
+ l.emplace_back(extent.first, extent.second, 0);
+ }
+ }
+ ec_backend.objects_read_and_reconstruct(
+ _to_read,
+ false,
+ make_gen_lambda_context<
+ std::map<hobject_t,std::pair<int, extent_map> > &&, Func>(
+ std::forward<Func>(on_complete)));
+ }
+ void handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace
+ ) {
+ ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+ }
+ // end of iface
+
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ ECListener* parent;
+ ECCommon& ec_backend;
+
+ RMWPipeline(CephContext* cct,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ ECListener* parent,
+ ECCommon& ec_backend)
+ : cct(cct),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ parent(parent),
+ ec_backend(ec_backend) {
+ }
+ };
+};
+