ECSwitch *s,
ECExtentCache::LRU &ec_extent_cache_lru)
: parent(pg), cct(cct), switcher(s),
+#ifdef WITH_CRIMSON
+ read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener(), *this),
+#else
read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener()),
+#endif
rmw_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener(),
*this, ec_extent_cache_lru),
recovery_backend(cct, switcher->coll, ec_impl, this->sinfo, read_pipeline,
reply->tid = op.tid;
}
+void ECBackend::handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &trace)
+{
+ ECSubReadReply reply;
+ handle_sub_read(from, op, &reply, trace);
+ handle_sub_read_reply(from, reply, trace);
+}
+
void ECBackend::handle_sub_write_reply(
pg_shard_t from,
const ECSubWriteReply &ec_write_reply_op,
ECSubReadReply *reply,
const ZTracer::Trace &trace
);
+ void handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &trace
+#ifdef WITH_CRIMSON
+ ) override;
+#else
+ );
+#endif
void handle_sub_write_reply(
pg_shard_t from,
const ECSubWriteReply &op,
ceph_assert(reads_sent);
}
+ std::optional<ECSubRead> local_read_op;
std::vector<std::pair<int, Message*>> m;
m.reserve(messages.size());
for (auto &&[pg_shard, read]: messages) {
rop.in_progress.insert(pg_shard);
shard_to_read_map[pg_shard].insert(rop.tid);
read.tid = tid;
+#ifdef WITH_CRIMSON // crimson only
+ if (pg_shard == get_parent()->whoami_shard()) {
+ local_read_op = std::move(read);
+ continue;
+ }
+#endif
auto *msg = new MOSDECSubOpRead;
msg->set_priority(priority);
msg->pgid = spg_t(get_info().pgid.pgid, pg_shard.shard);
msg->trace.keyval("shard", pg_shard.shard.id);
}
m.push_back(std::make_pair(pg_shard.osd, msg));
+ dout(10) << __func__ << ": will send msg " << *msg
+ << " to osd." << pg_shard << dendl;
}
if (!m.empty()) {
get_parent()->send_message_osd_cluster(m, get_osdmap_epoch());
}
+#if WITH_CRIMSON
+ if (local_read_op) {
+ dout(10) << __func__ << ": doing local read for " << rop << dendl;
+ handle_sub_read_n_reply(
+ get_parent()->whoami_shard(),
+ *local_read_op,
+ rop.trace);
+ }
+#endif
dout(10) << __func__ << ": started " << rop << dendl;
}
//forward declaration
struct ECBackend;
struct ECSubWrite;
+struct ECSubRead;
struct PGLog;
struct RecoveryMessages;
const ZTracer::Trace &trace,
ECListener &eclistener) = 0;
+#ifdef WITH_CRIMSON
+ virtual void handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &trace
+ ) = 0;
+#endif
+
virtual void objects_read_and_reconstruct(
const std::map<hobject_t, std::list<ec_align_t>> &reads,
bool fast_read,
const ECUtil::stripe_info_t &sinfo;
// TODO: lay an interface down here
ECListener *parent;
+#ifdef WITH_CRIMSON
+ ECCommon &ec_backend;
+#endif
ECListener *get_parent() const { return parent; }
ReadPipeline(CephContext *cct,
ceph::ErasureCodeInterfaceRef ec_impl,
const ECUtil::stripe_info_t &sinfo,
+#ifdef WITH_CRIMSON
+ ECListener *parent,
+ ECCommon &ec_backend)
+#else
ECListener *parent)
+#endif
: cct(cct),
ec_impl(std::move(ec_impl)),
sinfo(sinfo),
+#ifdef WITH_CRIMSON
+ parent(parent),
+ ec_backend(ec_backend) {}
+#else
parent(parent) {}
+#endif
/**
* While get_want_to_read_shards creates a want_to_read based on the EC
const std::optional<std::set<pg_shard_t>> &error_shards = std::nullopt
//< [in] Shards where reads have failed (optional)
); ///< @return error code, 0 on success
+
+#ifdef WITH_CRIMSON
+ void handle_sub_read_n_reply(
+ pg_shard_t from,
+ ECSubRead &op,
+ const ZTracer::Trace &trace) {
+ ec_backend.handle_sub_read_n_reply(from, op, trace);
+ }
+#endif
};
/**