static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) {
return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
}
-static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) {
+static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) {
return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
}
<< rhs.get<1>() << ", " << rhs.get<2>() << ")";
}
-ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::read_request_t &rhs)
{
return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
<< ", need=" << rhs.need
<< ")";
}
-ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::read_result_t &rhs)
{
lhs << "read_result_t(r=" << rhs.r
<< ", errors=" << rhs.errors;
return lhs << ", returned=" << rhs.returned << ")";
}
-ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
+ostream &operator<<(ostream &lhs, const ECCommon::ReadOp &rhs)
{
lhs << "ReadOp(tid=" << rhs.tid;
if (rhs.op && rhs.op->get_req()) {
<< ", in_progress=" << rhs.in_progress << ")";
}
-void ECBackend::ReadOp::dump(Formatter *f) const
+void ECCommon::ReadOp::dump(Formatter *f) const
{
f->dump_unsigned("tid", tid);
if (op && op->get_req()) {
return new ECRecoveryHandle;
}
-void ECBackend::_failed_push(const hobject_t &hoid, ECBackend::read_result_t &res)
+void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
{
dout(10) << __func__ << ": Read error " << hoid << " r="
<< res.r << " errors=" << res.errors << dendl;
struct RecoveryMessages {
map<hobject_t,
- ECBackend::read_request_t> recovery_reads;
+ ECCommon::read_request_t> recovery_reads;
map<hobject_t, set<int>> want_to_read;
void recovery_read(
recovery_reads.insert(
make_pair(
hoid,
- ECBackend::read_request_t(
+ ECCommon::read_request_t(
to_read,
need,
attrs)));
}
};
-struct RecoveryReadCompleter : ECBackend::ReadCompleter {
+struct RecoveryReadCompleter : ECCommon::ReadCompleter {
RecoveryReadCompleter(ECBackend& backend)
: backend(backend) {}
void finish_single_request(
const hobject_t &hoid,
- ECBackend::read_result_t &res,
+ ECCommon::read_result_t &res,
list<boost::tuple<uint64_t, uint64_t, uint32_t> >) override
{
if (!(res.r == 0 && res.errors.empty())) {
}
}
-void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
+void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop)
{
map<hobject_t, read_request_t>::iterator reqiter =
rop.to_read.begin();
}
struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
- ECBackend::ReadPipeline& read_pipeline;
+ ECCommon::ReadPipeline& read_pipeline;
ceph_tid_t tid;
- FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid)
+ FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
: read_pipeline(read_pipeline), tid(tid) {}
void finish(ThreadPool::TPHandle&) override {
auto ropiter = read_pipeline.tid_to_read_map.find(tid);
};
template <class F>
-void ECBackend::ReadPipeline::filter_read_op(
+void ECCommon::ReadPipeline::filter_read_op(
const OSDMapRef& osdmap,
ReadOp &op,
F&& on_erase)
}
template <class F>
-void ECBackend::ReadPipeline::check_recovery_sources(
+void ECCommon::ReadPipeline::check_recovery_sources(
const OSDMapRef& osdmap,
F&& on_erase)
{
});
}
-void ECBackend::ReadPipeline::on_change()
+void ECCommon::ReadPipeline::on_change()
{
for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
i != tid_to_read_map.end();
}
}
-void ECBackend::ReadPipeline::get_all_avail_shards(
+void ECCommon::ReadPipeline::get_all_avail_shards(
const hobject_t &hoid,
const set<pg_shard_t> &error_shards,
set<int> &have,
return 0;
}
-int ECBackend::ReadPipeline::get_remaining_shards(
+int ECCommon::ReadPipeline::get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
const set<int> &want,
return 0;
}
-void ECBackend::ReadPipeline::start_read_op(
+void ECCommon::ReadPipeline::start_read_op(
int priority,
map<hobject_t, set<int>> &want_to_read,
map<hobject_t, read_request_t> &to_read,
OpRequestRef _op,
bool do_redundant_reads,
bool for_recovery,
- std::unique_ptr<ECBackend::ReadCompleter> on_complete)
+ std::unique_ptr<ECCommon::ReadCompleter> on_complete)
{
ceph_tid_t tid = get_parent()->get_tid();
ceph_assert(!tid_to_read_map.count(tid));
do_read_op(op);
}
-void ECBackend::ReadPipeline::do_read_op(ReadOp &op)
+void ECCommon::ReadPipeline::do_read_op(ReadOp &op)
{
int priority = op.priority;
ceph_tid_t tid = op.tid;
on_complete)));
}
-struct ClientReadCompleter : ECBackend::ReadCompleter {
- ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline,
- ECBackend::ClientAsyncReadStatus *status)
+struct ClientReadCompleter : ECCommon::ReadCompleter {
+ ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
+ ECCommon::ClientAsyncReadStatus *status)
: read_pipeline(read_pipeline),
status(status) {}
void finish_single_request(
const hobject_t &hoid,
- ECBackend::read_result_t &res,
+ ECCommon::read_result_t &res,
list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
{
extent_map result;
// NOP
}
- ECBackend::ReadPipeline &read_pipeline;
- ECBackend::ClientAsyncReadStatus *status;
+ ECCommon::ReadPipeline &read_pipeline;
+ ECCommon::ClientAsyncReadStatus *status;
};
*this, reads, fast_read, std::move(func));
}
-void ECBackend::ReadPipeline::objects_read_and_reconstruct(
+void ECCommon::ReadPipeline::objects_read_and_reconstruct(
ECBackend& ec_backend,
const map<hobject_t,
std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
}
-int ECBackend::ReadPipeline::send_all_remaining_reads(
+int ECCommon::ReadPipeline::send_all_remaining_reads(
const hobject_t &hoid,
ReadOp &rop)
{
return 0;
}
-void ECBackend::ReadPipeline::kick_reads()
+void ECCommon::ReadPipeline::kick_reads()
{
while (in_progress_client_reads.size() &&
in_progress_client_reads.front().is_complete()) {
const hobject_t &soid,
const object_stat_sum_t &delta_stats) = 0;
};
-class ECBackend : public PGBackend {
+
+struct ECBackend;
+struct ECCommon {
+ struct read_request_t {
+ const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
+ bool want_attrs;
+ read_request_t(
+ const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
+ const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
+ bool want_attrs)
+ : to_read(to_read), need(need), want_attrs(want_attrs) {}
+ };
+ friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+ struct ReadOp;
+ /**
+ * Low level async read mechanism
+ *
+ * To avoid duplicating the logic for requesting and waiting for
+ * multiple object shards, there is a common async read mechanism
+ * taking a std::map of hobject_t->read_request_t which defines callbacks
+ * taking read_result_ts as arguments.
+ *
+ * tid_to_read_map gives open read ops. check_recovery_sources uses
+ * shard_to_read_map and ReadOp::source_to_obj to restart reads
+ * involving down osds.
+ *
+ * The user is responsible for specifying replicas on which to read
+ * and for reassembling the buffer on the other side since client
+ * reads require the original object buffer while recovery only needs
+ * the missing pieces.
+ *
+ * Rather than handling reads on the primary directly, we simply send
+ * ourselves a message. This avoids a dedicated primary path for that
+ * part.
+ */
+ struct read_result_t {
+ int r;
+ std::map<pg_shard_t, int> errors;
+ std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
+ std::list<
+ boost::tuple<
+ uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
+ read_result_t() : r(0) {}
+ };
+
+ struct ReadCompleter {
+ virtual void finish_single_request(
+ const hobject_t &hoid,
+ read_result_t &res,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+ virtual void finish(int priority) && = 0;
+
+ virtual ~ReadCompleter() = default;
+ };
+
+ friend struct CallClientContexts;
+ struct ClientAsyncReadStatus {
+ unsigned objects_to_read;
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
+ std::map<hobject_t,std::pair<int, extent_map> > results;
+ explicit ClientAsyncReadStatus(
+ unsigned objects_to_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
+ : objects_to_read(objects_to_read), func(std::move(func)) {}
+ void complete_object(
+ const hobject_t &hoid,
+ int err,
+ extent_map &&buffers) {
+ ceph_assert(objects_to_read);
+ --objects_to_read;
+ ceph_assert(!results.count(hoid));
+ results.emplace(hoid, std::make_pair(err, std::move(buffers)));
+ }
+ bool is_complete() const {
+ return objects_to_read == 0;
+ }
+ void run() {
+ func.release()->complete(std::move(results));
+ }
+ };
+
+ struct ReadOp {
+ int priority;
+ ceph_tid_t tid;
+ OpRequestRef op; // may be null if not on behalf of a client
+ // True if redundant reads are issued, false otherwise,
+ // this is useful to tradeoff some resources (redundant ops) for
+ // low latency read, especially on relatively idle cluster
+ bool do_redundant_reads;
+ // True if reading for recovery which could possibly reading only a subset
+ // of the available shards.
+ bool for_recovery;
+ std::unique_ptr<ReadCompleter> on_complete;
+
+ ZTracer::Trace trace;
+
+ std::map<hobject_t, std::set<int>> want_to_read;
+ std::map<hobject_t, read_request_t> to_read;
+ std::map<hobject_t, read_result_t> complete;
+
+ std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
+ std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
+
+ void dump(ceph::Formatter *f) const;
+
+ std::set<pg_shard_t> in_progress;
+
+ ReadOp(
+ int priority,
+ ceph_tid_t tid,
+ bool do_redundant_reads,
+ bool for_recovery,
+ std::unique_ptr<ReadCompleter> _on_complete,
+ OpRequestRef op,
+ std::map<hobject_t, std::set<int>> &&_want_to_read,
+ std::map<hobject_t, read_request_t> &&_to_read)
+ : priority(priority),
+ tid(tid),
+ op(op),
+ do_redundant_reads(do_redundant_reads),
+ for_recovery(for_recovery),
+ on_complete(std::move(_on_complete)),
+ want_to_read(std::move(_want_to_read)),
+ to_read(std::move(_to_read)) {
+ for (auto &&hpair: to_read) {
+ auto &returned = complete[hpair.first].returned;
+ for (auto &&extent: hpair.second.to_read) {
+ returned.push_back(
+ boost::make_tuple(
+ extent.get<0>(),
+ extent.get<1>(),
+ std::map<pg_shard_t, ceph::buffer::list>()));
+ }
+ }
+ }
+ ReadOp() = delete;
+ ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
+ ReadOp(ReadOp &&) = default;
+ };
+ struct ReadPipeline {
+ void objects_read_and_reconstruct(
+ ECBackend& ecbackend,
+ const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+ template <class F>
+ void filter_read_op(
+ const OSDMapRef& osdmap,
+ ReadOp &op,
+ F&& on_erase);
+
+ template <class F>
+ void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+
+ void complete_read_op(ReadOp &rop);
+
+ void start_read_op(
+ int priority,
+ std::map<hobject_t, std::set<int>> &want_to_read,
+ std::map<hobject_t, read_request_t> &to_read,
+ OpRequestRef op,
+ bool do_redundant_reads,
+ bool for_recovery,
+ std::unique_ptr<ReadCompleter> on_complete);
+
+ void do_read_op(ReadOp &rop);
+
+ int send_all_remaining_reads(
+ const hobject_t &hoid,
+ ReadOp &rop);
+
+ void on_change();
+
+ void kick_reads();
+
+ std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+ std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+ std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+ CephContext* cct;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ // TODO: lay an interface down here
+ ECListener* parent;
+
+ ECListener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ ReadPipeline(CephContext* cct,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ ECListener* parent)
+ : cct(cct),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ parent(parent) {
+ }
+
+ int get_remaining_shards(
+ const hobject_t &hoid,
+ const std::set<int> &avail,
+ const std::set<int> &want,
+ const read_result_t &result,
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+ bool for_recovery);
+
+ void get_all_avail_shards(
+ const hobject_t &hoid,
+ const std::set<pg_shard_t> &error_shards,
+ std::set<int> &have,
+ std::map<shard_id_t, pg_shard_t> &shards,
+ bool for_recovery);
+
+ friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+ friend struct FinishReadOp;
+ };
+};
+
+class ECBackend : public PGBackend, public ECCommon {
public:
RecoveryHandle *open_recovery_op() override;
bool fast_read,
GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
- friend struct CallClientContexts;
- struct ClientAsyncReadStatus {
- unsigned objects_to_read;
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> func;
- std::map<hobject_t,std::pair<int, extent_map> > results;
- explicit ClientAsyncReadStatus(
- unsigned objects_to_read,
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func)
- : objects_to_read(objects_to_read), func(std::move(func)) {}
- void complete_object(
- const hobject_t &hoid,
- int err,
- extent_map &&buffers) {
- ceph_assert(objects_to_read);
- --objects_to_read;
- ceph_assert(!results.count(hoid));
- results.emplace(hoid, std::make_pair(err, std::move(buffers)));
- }
- bool is_complete() const {
- return objects_to_read == 0;
- }
- void run() {
- func.release()->complete(std::move(results));
- }
- };
void objects_read_async(
const hobject_t &hoid,
const std::list<std::pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
sinfo.get_stripe_width());
}
+public:
void get_want_to_read_shards(std::set<int> *want_to_read) const {
const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
want_to_read->insert(chunk);
}
}
+private:
/**
* Recovery
RecoveryMessages *m);
public:
- /**
- * Low level async read mechanism
- *
- * To avoid duplicating the logic for requesting and waiting for
- * multiple object shards, there is a common async read mechanism
- * taking a std::map of hobject_t->read_request_t which defines callbacks
- * taking read_result_ts as arguments.
- *
- * tid_to_read_map gives open read ops. check_recovery_sources uses
- * shard_to_read_map and ReadOp::source_to_obj to restart reads
- * involving down osds.
- *
- * The user is responsible for specifying replicas on which to read
- * and for reassembling the buffer on the other side since client
- * reads require the original object buffer while recovery only needs
- * the missing pieces.
- *
- * Rather than handling reads on the primary directly, we simply send
- * ourselves a message. This avoids a dedicated primary path for that
- * part.
- */
- struct read_result_t {
- int r;
- std::map<pg_shard_t, int> errors;
- std::optional<std::map<std::string, ceph::buffer::list, std::less<>> > attrs;
- std::list<
- boost::tuple<
- uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > > returned;
- read_result_t() : r(0) {}
- };
- struct read_request_t {
- const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
- bool want_attrs;
- read_request_t(
- const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
- const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
- bool want_attrs)
- : to_read(to_read), need(need), want_attrs(want_attrs) {}
- };
- friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
-
- struct ReadCompleter {
- virtual void finish_single_request(
- const hobject_t &hoid,
- ECBackend::read_result_t &res,
- std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
-
- virtual void finish(int priority) && = 0;
-
- virtual ~ReadCompleter() = default;
- };
-
- struct ReadOp {
- int priority;
- ceph_tid_t tid;
- OpRequestRef op; // may be null if not on behalf of a client
- // True if redundant reads are issued, false otherwise,
- // this is useful to tradeoff some resources (redundant ops) for
- // low latency read, especially on relatively idle cluster
- bool do_redundant_reads;
- // True if reading for recovery which could possibly reading only a subset
- // of the available shards.
- bool for_recovery;
- std::unique_ptr<ReadCompleter> on_complete;
-
- ZTracer::Trace trace;
-
- std::map<hobject_t, std::set<int>> want_to_read;
- std::map<hobject_t, read_request_t> to_read;
- std::map<hobject_t, read_result_t> complete;
-
- std::map<hobject_t, std::set<pg_shard_t>> obj_to_source;
- std::map<pg_shard_t, std::set<hobject_t> > source_to_obj;
-
- void dump(ceph::Formatter *f) const;
-
- std::set<pg_shard_t> in_progress;
-
- ReadOp(
- int priority,
- ceph_tid_t tid,
- bool do_redundant_reads,
- bool for_recovery,
- std::unique_ptr<ReadCompleter> _on_complete,
- OpRequestRef op,
- std::map<hobject_t, std::set<int>> &&_want_to_read,
- std::map<hobject_t, read_request_t> &&_to_read)
- : priority(priority),
- tid(tid),
- op(op),
- do_redundant_reads(do_redundant_reads),
- for_recovery(for_recovery),
- on_complete(std::move(_on_complete)),
- want_to_read(std::move(_want_to_read)),
- to_read(std::move(_to_read)) {
- for (auto &&hpair: to_read) {
- auto &returned = complete[hpair.first].returned;
- for (auto &&extent: hpair.second.to_read) {
- returned.push_back(
- boost::make_tuple(
- extent.get<0>(),
- extent.get<1>(),
- std::map<pg_shard_t, ceph::buffer::list>()));
- }
- }
- }
- ReadOp() = delete;
- ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
- ReadOp(ReadOp &&) = default;
- };
-
- struct ReadPipeline {
- void objects_read_and_reconstruct(
- ECBackend& ecbackend,
- const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
- > &reads,
- bool fast_read,
- GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
-
- template <class F>
- void filter_read_op(
- const OSDMapRef& osdmap,
- ReadOp &op,
- F&& on_erase);
-
- template <class F>
- void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
-
- void complete_read_op(ReadOp &rop);
-
- void start_read_op(
- int priority,
- std::map<hobject_t, std::set<int>> &want_to_read,
- std::map<hobject_t, read_request_t> &to_read,
- OpRequestRef op,
- bool do_redundant_reads,
- bool for_recovery,
- std::unique_ptr<ReadCompleter> on_complete);
-
- void do_read_op(ReadOp &rop);
-
- int send_all_remaining_reads(
- const hobject_t &hoid,
- ReadOp &rop);
-
- void on_change();
-
- void kick_reads();
-
- std::map<ceph_tid_t, ReadOp> tid_to_read_map;
- std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
- std::list<ClientAsyncReadStatus> in_progress_client_reads;
-
- CephContext* cct;
- ceph::ErasureCodeInterfaceRef ec_impl;
- const ECUtil::stripe_info_t& sinfo;
- // TODO: lay an interface down here
- ECListener* parent;
-
- ECListener *get_parent() const { return parent; }
- const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
- epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
- const pg_info_t &get_info() { return get_parent()->get_info(); }
-
- ReadPipeline(CephContext* cct,
- ceph::ErasureCodeInterfaceRef ec_impl,
- const ECUtil::stripe_info_t& sinfo,
- ECListener* parent)
- : cct(cct),
- ec_impl(std::move(ec_impl)),
- sinfo(sinfo),
- parent(parent) {
- }
-
- int get_remaining_shards(
- const hobject_t &hoid,
- const std::set<int> &avail,
- const std::set<int> &want,
- const read_result_t &result,
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
- bool for_recovery);
-
- void get_all_avail_shards(
- const hobject_t &hoid,
- const std::set<pg_shard_t> &error_shards,
- std::set<int> &have,
- std::map<shard_id_t, pg_shard_t> &shards,
- bool for_recovery);
-
- friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
- friend struct FinishReadOp;
- } read_pipeline;
+ struct ReadPipeline read_pipeline;
/**