static ostream& _prefix(std::ostream *_dout, ECBackend::RMWPipeline *rmw_pipeline) {
return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
}
+static ostream& _prefix(std::ostream *_dout, ECBackend::ReadPipeline *read_pipeline) {
+ return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
+}
struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
list<ECBackend::RecoveryOp> ops;
ErasureCodeInterfaceRef ec_impl,
uint64_t stripe_width)
: PGBackend(cct, pg, store, coll, ch),
+ read_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this),
rmw_pipeline(cct, ec_impl, this->sinfo, get_parent(), *this),
ec_impl(ec_impl),
sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
if (m.recovery_reads.empty())
return;
- start_read_op(
+ read_pipeline.start_read_op(
priority,
m.want_to_read,
m.recovery_reads,
{
trace.event("ec sub read reply");
dout(10) << __func__ << ": reply " << op << dendl;
- map<ceph_tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
- if (iter == tid_to_read_map.end()) {
+ map<ceph_tid_t, ReadOp>::iterator iter = read_pipeline.tid_to_read_map.find(op.tid);
+ if (iter == read_pipeline.tid_to_read_map.end()) {
//canceled
dout(20) << __func__ << ": dropped " << op << dendl;
return;
}
map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
- shard_to_read_map.find(from);
- ceph_assert(siter != shard_to_read_map.end());
+ read_pipeline.shard_to_read_map.find(from);
+ ceph_assert(siter != read_pipeline.shard_to_read_map.end());
ceph_assert(siter->second.count(op.tid));
siter->second.erase(op.tid);
// During recovery there may be multiple osds with copies of the same shard,
// so getting EIO from one may result in multiple passes through this code path.
if (!rop.do_redundant_reads) {
- int r = send_all_remaining_reads(iter->first, rop);
+ int r = read_pipeline.send_all_remaining_reads(iter->first, rop);
if (r == 0) {
// We changed the rop's to_read and not incrementing is_complete
need_resend = true;
}
}
if (need_resend) {
- do_read_op(rop);
+ read_pipeline.do_read_op(rop);
} else if (rop.in_progress.empty() ||
is_complete == rop.complete.size()) {
dout(20) << __func__ << " Complete: " << rop << dendl;
rop.trace.event("ec read complete");
- complete_read_op(rop);
+ read_pipeline.complete_read_op(rop);
} else {
dout(10) << __func__ << " readop not complete: " << rop << dendl;
}
}
-void ECBackend::complete_read_op(ReadOp &rop)
+void ECBackend::ReadPipeline::complete_read_op(ReadOp &rop)
{
map<hobject_t, read_request_t>::iterator reqiter =
rop.to_read.begin();
}
struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
- ECBackend *ec;
+ ECBackend::ReadPipeline& read_pipeline;
ceph_tid_t tid;
- FinishReadOp(ECBackend *ec, ceph_tid_t tid) : ec(ec), tid(tid) {}
+ FinishReadOp(ECBackend::ReadPipeline& read_pipeline, ceph_tid_t tid)
+ : read_pipeline(read_pipeline), tid(tid) {}
void finish(ThreadPool::TPHandle&) override {
- auto ropiter = ec->tid_to_read_map.find(tid);
- ceph_assert(ropiter != ec->tid_to_read_map.end());
- ec->complete_read_op(ropiter->second);
+ auto ropiter = read_pipeline.tid_to_read_map.find(tid);
+ ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
+ read_pipeline.complete_read_op(ropiter->second);
}
};
-void ECBackend::filter_read_op(
+void ECBackend::ReadPipeline::filter_read_op(
const OSDMapRef& osdmap,
ReadOp &op)
{
op.to_read.erase(*i);
op.complete.erase(*i);
- recovery_ops.erase(*i);
+ // TODO: meh, this doesn't look like a part of the read pipeline
+ //recovery_ops.erase(*i);
}
if (op.in_progress.empty()) {
*/
get_parent()->schedule_recovery_work(
get_parent()->bless_unlocked_gencontext(
- new FinishReadOp(this, op.tid)),
+ new FinishReadOp(*this, op.tid)),
1);
}
}
void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
{
+ // TODO: dissect into ReadPipeline
set<ceph_tid_t> tids_to_filter;
for (map<pg_shard_t, set<ceph_tid_t> >::iterator
- i = shard_to_read_map.begin();
- i != shard_to_read_map.end();
+ i = read_pipeline.shard_to_read_map.begin();
+ i != read_pipeline.shard_to_read_map.end();
) {
if (osdmap->is_down(i->first.osd)) {
tids_to_filter.insert(i->second.begin(), i->second.end());
- shard_to_read_map.erase(i++);
+ read_pipeline.shard_to_read_map.erase(i++);
} else {
++i;
}
for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
i != tids_to_filter.end();
++i) {
- map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
- ceph_assert(j != tid_to_read_map.end());
- filter_read_op(osdmap, j->second);
+ map<ceph_tid_t, ReadOp>::iterator j = read_pipeline.tid_to_read_map.find(*i);
+ ceph_assert(j != read_pipeline.tid_to_read_map.end());
+ read_pipeline.filter_read_op(osdmap, j->second);
}
}
+void ECBackend::ReadPipeline::on_change()
+{
+ for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
+ i != tid_to_read_map.end();
+ ++i) {
+ dout(10) << __func__ << ": cancelling " << i->second << dendl;
+ for (map<hobject_t, read_request_t>::iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ delete j->second.cb;
+ j->second.cb = nullptr;
+ }
+ }
+ tid_to_read_map.clear();
+ shard_to_read_map.clear();
+ in_progress_client_reads.clear();
+}
+
void ECBackend::RMWPipeline::on_change()
{
dout(10) << __func__ << dendl;
void ECBackend::on_change()
{
rmw_pipeline.on_change();
- for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
- i != tid_to_read_map.end();
- ++i) {
- dout(10) << __func__ << ": cancelling " << i->second << dendl;
- for (map<hobject_t, read_request_t>::iterator j =
- i->second.to_read.begin();
- j != i->second.to_read.end();
- ++j) {
- delete j->second.cb;
- j->second.cb = nullptr;
- }
- }
- tid_to_read_map.clear();
- shard_to_read_map.clear();
- in_progress_client_reads.clear();
+ read_pipeline.on_change();
clear_recovery_state();
}
}
f->close_section();
f->open_array_section("read_ops");
- for (map<ceph_tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
- i != tid_to_read_map.end();
+ for (map<ceph_tid_t, ReadOp>::const_iterator i = read_pipeline.tid_to_read_map.begin();
+ i != read_pipeline.tid_to_read_map.end();
++i) {
f->open_object_section("read_op");
i->second.dump(f);
}
}
-void ECBackend::get_all_avail_shards(
+void ECBackend::ReadPipeline::get_all_avail_shards(
const hobject_t &hoid,
const set<pg_shard_t> &error_shards,
set<int> &have,
map<shard_id_t, pg_shard_t> shards;
set<pg_shard_t> error_shards;
- get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
+ read_pipeline.get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);
map<int, vector<pair<int, int>>> need;
int r = ec_impl->minimum_to_decode(want, have, &need);
return 0;
}
-int ECBackend::get_remaining_shards(
+int ECBackend::ReadPipeline::get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
const set<int> &want,
return 0;
}
-void ECBackend::start_read_op(
+void ECBackend::ReadPipeline::start_read_op(
int priority,
map<hobject_t, set<int>> &want_to_read,
map<hobject_t, read_request_t> &to_read,
do_read_op(op);
}
-void ECBackend::do_read_op(ReadOp &op)
+void ECBackend::ReadPipeline::do_read_op(ReadOp &op)
{
int priority = op.priority;
ceph_tid_t tid = op.tid;
struct CallClientContexts :
public GenContext<ECBackend::read_result_t&> {
hobject_t hoid;
- ECBackend *ec;
+ ECBackend::ReadPipeline &read_pipeline;
ECBackend::ClientAsyncReadStatus *status;
list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
CallClientContexts(
hobject_t hoid,
- ECBackend *ec,
+ ECBackend::ReadPipeline &read_pipeline,
ECBackend::ClientAsyncReadStatus *status,
const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
- : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
+ : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {}
void finish(ECBackend::read_result_t &res) override {
extent_map result;
if (res.r != 0)
ceph_assert(res.errors.empty());
for (auto &&read: to_read) {
pair<uint64_t, uint64_t> adjusted =
- ec->sinfo.offset_len_to_stripe_bounds(
+ read_pipeline.sinfo.offset_len_to_stripe_bounds(
make_pair(read.get<0>(), read.get<1>()));
ceph_assert(res.returned.front().get<0>() == adjusted.first);
ceph_assert(res.returned.front().get<1>() == adjusted.second);
to_decode[j->first.shard] = std::move(j->second);
}
int r = ECUtil::decode(
- ec->sinfo,
- ec->ec_impl,
+ read_pipeline.sinfo,
+ read_pipeline.ec_impl,
to_decode,
&bl);
if (r < 0) {
}
out:
status->complete_object(hoid, res.r, std::move(result));
- ec->kick_reads();
+ read_pipeline.kick_reads();
}
};
bool fast_read,
GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
{
+ return read_pipeline.objects_read_and_reconstruct(
+ *this, reads, fast_read, std::move(func));
+}
+
+void ECBackend::ReadPipeline::objects_read_and_reconstruct(
+ ECBackend& ecbackend,
+ const map<hobject_t,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+ return [this,
+ kick_reads=[this] (auto...) { return this->kick_reads();},
+ get_want_to_read_shards=[&ecbackend] (auto&&... args) {
+ return ecbackend.get_want_to_read_shards(std::forward<decltype(args)>(args)...);
+ },
+ get_min_avail_to_read_shards=[&ecbackend] (auto&&... args) {
+ return ecbackend.get_min_avail_to_read_shards(std::forward<decltype(args)>(args)...);
+ },
+ cct=(CephContext*)nullptr,
+ // params
+ &reads,
+ fast_read,
+ func=std::move(func)
+ ]() mutable {
in_progress_client_reads.emplace_back(
reads.size(), std::move(func));
if (!reads.size()) {
CallClientContexts *c = new CallClientContexts(
to_read.first,
- this,
+ *this,
&(in_progress_client_reads.back()),
to_read.second);
for_read_op.insert(
for_read_op,
OpRequestRef(),
fast_read, false, nullptr);
+ }();
return;
}
-int ECBackend::send_all_remaining_reads(
+int ECBackend::ReadPipeline::send_all_remaining_reads(
const hobject_t &hoid,
ReadOp &rop)
{
return 0;
}
+void ECBackend::ReadPipeline::kick_reads()
+{
+ while (in_progress_client_reads.size() &&
+ in_progress_client_reads.front().is_complete()) {
+ in_progress_client_reads.front().run();
+ in_progress_client_reads.pop_front();
+ }
+}
+
+void ECBackend::kick_reads() {
+ read_pipeline.kick_reads();
+}
+
+
int ECBackend::objects_get_attrs(
const hobject_t &hoid,
map<string, bufferlist, less<>> *out)
func.release()->complete(std::move(results));
}
};
- std::list<ClientAsyncReadStatus> in_progress_client_reads;
void objects_read_async(
const hobject_t &hoid,
const std::list<std::pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
Context *on_complete,
bool fast_read = false) override;
- void kick_reads() {
- while (in_progress_client_reads.size() &&
- in_progress_client_reads.front().is_complete()) {
- in_progress_client_reads.front().run();
- in_progress_client_reads.pop_front();
- }
- }
+ void kick_reads();
private:
friend struct ECRecoveryHandle;
const PushReplyOp &op,
pg_shard_t from,
RecoveryMessages *m);
- void get_all_avail_shards(
- const hobject_t &hoid,
- const std::set<pg_shard_t> &error_shards,
- std::set<int> &have,
- std::map<shard_id_t, pg_shard_t> &shards,
- bool for_recovery);
public:
/**
ReadOp(const ReadOp &) = default;
ReadOp(ReadOp &&) = default;
};
- friend struct FinishReadOp;
- void filter_read_op(
- const OSDMapRef& osdmap,
- ReadOp &op);
- void complete_read_op(ReadOp &rop);
- friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
-
- std::map<ceph_tid_t, ReadOp> tid_to_read_map;
- std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
- void start_read_op(
- int priority,
- std::map<hobject_t, std::set<int>> &want_to_read,
- std::map<hobject_t, read_request_t> &to_read,
- OpRequestRef op,
- bool do_redundant_reads,
- bool for_recovery,
- GenContext<int> *on_complete);
-
- void do_read_op(ReadOp &rop);
- int send_all_remaining_reads(
- const hobject_t &hoid,
- ReadOp &rop);
+
+ struct ReadPipeline {
+ void objects_read_and_reconstruct(
+ ECBackend& ecbackend,
+ const std::map<hobject_t, std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
+
+ void filter_read_op(
+ const OSDMapRef& osdmap,
+ ReadOp &op);
+
+ void complete_read_op(ReadOp &rop);
+
+ void start_read_op(
+ int priority,
+ std::map<hobject_t, std::set<int>> &want_to_read,
+ std::map<hobject_t, read_request_t> &to_read,
+ OpRequestRef op,
+ bool do_redundant_reads,
+ bool for_recovery,
+ GenContext<int> *on_complete);
+
+ void do_read_op(ReadOp &rop);
+
+ int send_all_remaining_reads(
+ const hobject_t &hoid,
+ ReadOp &rop);
+
+ void on_change();
+
+ void kick_reads();
+
+ std::map<ceph_tid_t, ReadOp> tid_to_read_map;
+ std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
+ std::list<ClientAsyncReadStatus> in_progress_client_reads;
+
+ CephContext* cct;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ PGBackend::Listener* parent;
+ // TODO: lay an interface down here
+ ECBackend& ec_backend;
+
+ PGBackend::Listener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ ReadPipeline(CephContext* cct,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ PGBackend::Listener* parent,
+ ECBackend& ec_backend)
+ : cct(cct),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ parent(parent),
+ ec_backend(ec_backend) {
+ }
+
+ int get_remaining_shards(
+ const hobject_t &hoid,
+ const std::set<int> &avail,
+ const std::set<int> &want,
+ const read_result_t &result,
+ std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
+ bool for_recovery);
+
+ void get_all_avail_shards(
+ const hobject_t &hoid,
+ const std::set<pg_shard_t> &error_shards,
+ std::set<int> &have,
+ std::map<shard_id_t, pg_shard_t> &shards,
+ bool for_recovery);
+
+ friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+ friend struct FinishReadOp;
+ } read_pipeline;
/**
std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
); ///< @return error code, 0 on success
- int get_remaining_shards(
- const hobject_t &hoid,
- const std::set<int> &avail,
- const std::set<int> &want,
- const read_result_t &result,
- std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read,
- bool for_recovery);
-
int objects_get_attrs(
const hobject_t &hoid,
std::map<std::string, ceph::buffer::list, std::less<>> *out) override;