static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
return pgb->get_parent()->gen_dbg_prefix(*_dout);
}
-static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) {
- return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout);
-}
-static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) {
- return read_pipeline->get_parent()->gen_dbg_prefix(*_dout);
-}
struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
list<ECBackend::RecoveryOp> ops;
return lhs << "]";
}
-static ostream &operator<<(
- ostream &lhs,
- const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
-{
- return lhs << "(" << rhs.get<0>() << ", "
- << rhs.get<1>() << ", " << rhs.get<2>() << ")";
-}
-
ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
{
return lhs << "RecoveryOp("
on_complete)));
}
-struct ClientReadCompleter : ECCommon::ReadCompleter {
- ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
- ECCommon::ClientAsyncReadStatus *status)
- : read_pipeline(read_pipeline),
- status(status) {}
-
- void finish_single_request(
- const hobject_t &hoid,
- ECCommon::read_result_t &res,
- list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
- {
- extent_map result;
- if (res.r != 0)
- goto out;
- ceph_assert(res.returned.size() == to_read.size());
- ceph_assert(res.errors.empty());
- for (auto &&read: to_read) {
- pair<uint64_t, uint64_t> adjusted =
- read_pipeline.sinfo.offset_len_to_stripe_bounds(
- make_pair(read.get<0>(), read.get<1>()));
- ceph_assert(res.returned.front().get<0>() == adjusted.first);
- ceph_assert(res.returned.front().get<1>() == adjusted.second);
- map<int, bufferlist> to_decode;
- bufferlist bl;
- for (map<pg_shard_t, bufferlist>::iterator j =
- res.returned.front().get<2>().begin();
- j != res.returned.front().get<2>().end();
- ++j) {
- to_decode[j->first.shard] = std::move(j->second);
- }
- int r = ECUtil::decode(
- read_pipeline.sinfo,
- read_pipeline.ec_impl,
- to_decode,
- &bl);
- if (r < 0) {
- res.r = r;
- goto out;
- }
- bufferlist trimmed;
- trimmed.substr_of(
- bl,
- read.get<0>() - adjusted.first,
- std::min(read.get<1>(),
- bl.length() - (read.get<0>() - adjusted.first)));
- result.insert(
- read.get<0>(), trimmed.length(), std::move(trimmed));
- res.returned.pop_front();
- }
-out:
- status->complete_object(hoid, res.r, std::move(result));
- read_pipeline.kick_reads();
- }
-
- void finish(int priority) && override
- {
- // NOP
- }
-
- ECCommon::ReadPipeline &read_pipeline;
- ECCommon::ClientAsyncReadStatus *status;
-};
-
-
void ECBackend::objects_read_and_reconstruct(
const map<hobject_t,
std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
reads, fast_read, std::move(func));
}
-void ECCommon::ReadPipeline::get_want_to_read_shards(
- std::set<int> *want_to_read) const
-{
- const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
- for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
- int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
- want_to_read->insert(chunk);
- }
-}
-
-void ECCommon::ReadPipeline::objects_read_and_reconstruct(
- const map<hobject_t,
- std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
- > &reads,
- bool fast_read,
- GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
-{
- in_progress_client_reads.emplace_back(
- reads.size(), std::move(func));
- if (!reads.size()) {
- kick_reads();
- return;
- }
-
- map<hobject_t, set<int>> obj_want_to_read;
- set<int> want_to_read;
- get_want_to_read_shards(&want_to_read);
-
- map<hobject_t, read_request_t> for_read_op;
- for (auto &&to_read: reads) {
- map<pg_shard_t, vector<pair<int, int>>> shards;
- int r = get_min_avail_to_read_shards(
- to_read.first,
- want_to_read,
- false,
- fast_read,
- &shards);
- ceph_assert(r == 0);
-
- for_read_op.insert(
- make_pair(
- to_read.first,
- read_request_t(
- to_read.second,
- shards,
- false)));
- obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
- }
-
- start_read_op(
- CEPH_MSG_PRIO_DEFAULT,
- obj_want_to_read,
- for_read_op,
- OpRequestRef(),
- fast_read,
- false,
- std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
-}
-
-
-int ECCommon::ReadPipeline::send_all_remaining_reads(
- const hobject_t &hoid,
- ReadOp &rop)
-{
- set<int> already_read;
- const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
- for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
- already_read.insert(i->shard);
- dout(10) << __func__ << " have/error shards=" << already_read << dendl;
- map<pg_shard_t, vector<pair<int, int>>> shards;
- int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
- rop.complete[hoid], &shards, rop.for_recovery);
- if (r)
- return r;
-
- list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
- rop.to_read.find(hoid)->second.to_read;
-
- // (Note cuixf) If we need to read attrs and we read failed, try to read again.
- bool want_attrs =
- rop.to_read.find(hoid)->second.want_attrs &&
- (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
- if (want_attrs) {
- dout(10) << __func__ << " want attrs again" << dendl;
- }
-
- rop.to_read.erase(hoid);
- rop.to_read.insert(make_pair(
- hoid,
- read_request_t(
- offsets,
- shards,
- want_attrs)));
- return 0;
-}
-
-void ECCommon::ReadPipeline::kick_reads()
-{
- while (in_progress_client_reads.size() &&
- in_progress_client_reads.front().is_complete()) {
- in_progress_client_reads.front().run();
- in_progress_client_reads.pop_front();
- }
-}
-
void ECBackend::kick_reads() {
read_pipeline.kick_reads();
}
dout(10) << __func__ << ": started " << op << dendl;
}
+void ECCommon::ReadPipeline::get_want_to_read_shards(
+ std::set<int> *want_to_read) const
+{
+ const std::vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();
+ for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
+ int chunk = (int)chunk_mapping.size() > i ? chunk_mapping[i] : i;
+ want_to_read->insert(chunk);
+ }
+}
+
+struct ClientReadCompleter : ECCommon::ReadCompleter {
+ ClientReadCompleter(ECCommon::ReadPipeline &read_pipeline,
+ ECCommon::ClientAsyncReadStatus *status)
+ : read_pipeline(read_pipeline),
+ status(status) {}
+
+ void finish_single_request(
+ const hobject_t &hoid,
+ ECCommon::read_result_t &res,
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
+ {
+ extent_map result;
+ if (res.r != 0)
+ goto out;
+ ceph_assert(res.returned.size() == to_read.size());
+ ceph_assert(res.errors.empty());
+ for (auto &&read: to_read) {
+ pair<uint64_t, uint64_t> adjusted =
+ read_pipeline.sinfo.offset_len_to_stripe_bounds(
+ make_pair(read.get<0>(), read.get<1>()));
+ ceph_assert(res.returned.front().get<0>() == adjusted.first);
+ ceph_assert(res.returned.front().get<1>() == adjusted.second);
+ map<int, bufferlist> to_decode;
+ bufferlist bl;
+ for (map<pg_shard_t, bufferlist>::iterator j =
+ res.returned.front().get<2>().begin();
+ j != res.returned.front().get<2>().end();
+ ++j) {
+ to_decode[j->first.shard] = std::move(j->second);
+ }
+ int r = ECUtil::decode(
+ read_pipeline.sinfo,
+ read_pipeline.ec_impl,
+ to_decode,
+ &bl);
+ if (r < 0) {
+ res.r = r;
+ goto out;
+ }
+ bufferlist trimmed;
+ trimmed.substr_of(
+ bl,
+ read.get<0>() - adjusted.first,
+ std::min(read.get<1>(),
+ bl.length() - (read.get<0>() - adjusted.first)));
+ result.insert(
+ read.get<0>(), trimmed.length(), std::move(trimmed));
+ res.returned.pop_front();
+ }
+out:
+ status->complete_object(hoid, res.r, std::move(result));
+ read_pipeline.kick_reads();
+ }
+
+ void finish(int priority) && override
+ {
+ // NOP
+ }
+
+ ECCommon::ReadPipeline &read_pipeline;
+ ECCommon::ClientAsyncReadStatus *status;
+};
+
+void ECCommon::ReadPipeline::objects_read_and_reconstruct(
+ const map<hobject_t,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
+{
+ in_progress_client_reads.emplace_back(
+ reads.size(), std::move(func));
+ if (!reads.size()) {
+ kick_reads();
+ return;
+ }
+
+ map<hobject_t, set<int>> obj_want_to_read;
+ set<int> want_to_read;
+ get_want_to_read_shards(&want_to_read);
+
+ map<hobject_t, read_request_t> for_read_op;
+ for (auto &&to_read: reads) {
+ map<pg_shard_t, vector<pair<int, int>>> shards;
+ int r = get_min_avail_to_read_shards(
+ to_read.first,
+ want_to_read,
+ false,
+ fast_read,
+ &shards);
+ ceph_assert(r == 0);
+
+ for_read_op.insert(
+ make_pair(
+ to_read.first,
+ read_request_t(
+ to_read.second,
+ shards,
+ false)));
+ obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
+ }
+
+ start_read_op(
+ CEPH_MSG_PRIO_DEFAULT,
+ obj_want_to_read,
+ for_read_op,
+ OpRequestRef(),
+ fast_read,
+ false,
+ std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
+}
+
+
+int ECCommon::ReadPipeline::send_all_remaining_reads(
+ const hobject_t &hoid,
+ ReadOp &rop)
+{
+ set<int> already_read;
+ const set<pg_shard_t>& ots = rop.obj_to_source[hoid];
+ for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+ already_read.insert(i->shard);
+ dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+ map<pg_shard_t, vector<pair<int, int>>> shards;
+ int r = get_remaining_shards(hoid, already_read, rop.want_to_read[hoid],
+ rop.complete[hoid], &shards, rop.for_recovery);
+ if (r)
+ return r;
+
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+ rop.to_read.find(hoid)->second.to_read;
+
+ // (Note cuixf) If we need to read attrs and we read failed, try to read again.
+ bool want_attrs =
+ rop.to_read.find(hoid)->second.want_attrs &&
+ (!rop.complete[hoid].attrs || rop.complete[hoid].attrs->empty());
+ if (want_attrs) {
+ dout(10) << __func__ << " want attrs again" << dendl;
+ }
+
+ rop.to_read.erase(hoid);
+ rop.to_read.insert(make_pair(
+ hoid,
+ read_request_t(
+ offsets,
+ shards,
+ want_attrs)));
+ return 0;
+}
+
+void ECCommon::ReadPipeline::kick_reads()
+{
+ while (in_progress_client_reads.size() &&
+ in_progress_client_reads.front().is_complete()) {
+ in_progress_client_reads.front().run();
+ in_progress_client_reads.pop_front();
+ }
+}
+
+
void ECCommon::RMWPipeline::start_rmw(OpRef op)
{
ceph_assert(op);