uint64_t stripe_width,
bool fast_read,
bool allows_ecoverwrites,
- DoutPrefixProvider &dpp)
+ DoutPrefixProvider &dpp,
+ ECListener &eclistener)
: PGBackend{whoami, coll, shard_services, dpp},
ec_impl{create_ec_impl(ec_profile)},
sinfo(ec_impl, stripe_width),
fast_read{fast_read},
- allows_ecoverwrites{allows_ecoverwrites}
+ allows_ecoverwrites{allows_ecoverwrites},
+ read_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener},
+ rmw_pipeline{shard_services.get_cct(), ec_impl, sinfo, &eclistener, *this}
{
}
ECBackend::handle_sub_write(
pg_shard_t from,
ECSubWrite &&op,
- crimson::osd::PG& pg)
+ ECListener& pg)
{
LOG_PREFIX(ECBackend::handle_sub_write);
logger().info("{} from {}", __func__, from);
});
}
+void ECBackend::handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace,
+ ECListener& eclistener)
+{
+ std::ignore = handle_sub_write(from, std::move(op), eclistener);
+}
+
ECBackend::write_iertr::future<>
ECBackend::handle_rep_write_op(
Ref<MOSDECSubOpWrite> m,
}
}
+void ECBackend::objects_read_and_reconstruct(
+ const std::map<hobject_t, std::list<ec_align_t>> &reads,
+ bool fast_read,
+ GenContextURef<ec_extents_t &&> &&func)
+{
+ // TODO XXX FIXME
+}
+
ECBackend::ll_read_ierrorator::future<>
ECBackend::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
{
#include "messages/MOSDECSubOpWriteReply.h"
#include "messages/MOSDECSubOpRead.h"
#include "messages/MOSDECSubOpReadReply.h"
+#include "osd/ECCommon.h"
#include "osd/ECUtil.h"
#include "osd/osd_types.h"
#include "pg_backend.h"
class PG;
-class ECBackend : public PGBackend
+class ECBackend : public PGBackend,
+ public ECCommon
{
static ceph::ErasureCodeInterfaceRef create_ec_impl(
const ec_profile_t& ec_profile);
uint64_t stripe_width,
bool fast_read,
bool allows_ecoverwrites,
- DoutPrefixProvider &dpp);
+ DoutPrefixProvider &dpp,
+ ECListener &eclistener);
seastar::future<> stop() final {
return seastar::now();
}
write_iertr::future<> handle_sub_write(
pg_shard_t from,
ECSubWrite&& op,
- crimson::osd::PG& pg);
+ ECListener& pg);
+
+ void handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ const ZTracer::Trace &trace,
+ ECListener& eclistener) override;
bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
std::uint64_t size,
std::uint32_t flags);
+ void objects_read_and_reconstruct(
+ const std::map<hobject_t, std::list<ec_align_t>> &reads,
+ bool fast_read,
+ GenContextURef<ec_extents_t &&> &&func) override;
+
ceph::ErasureCodeInterfaceRef ec_impl;
const ECUtil::stripe_info_t sinfo;
const bool fast_read;
const bool allows_ecoverwrites;
+
+ ECCommon::ReadPipeline read_pipeline;
+ ECCommon::RMWPipeline rmw_pipeline;
};
}
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDOpReply.h"
#include "os/Transaction.h"
+#include "osd/ECCommon.h"
#include "osd/osd_types.h"
#include "osd/osd_types_fmt.h"
#include "crimson/osd/object_context.h"
class PGBackend;
class ReplicatedBackend;
-class PG : public boost::intrusive_ref_counter<
- PG,
- boost::thread_unsafe_counter>,
+class PG
+: public boost::intrusive_ref_counter<PG, boost::thread_unsafe_counter>,
public PGRecoveryListener,
+ public ECListener,
PeeringState::PeeringListener,
DoutPrefixProvider
{
~PG();
+ // ECListener begins
+ const OSDMapRef& pgb_get_osdmap() const override final {
+ return peering_state.get_osdmap();
+ }
+ epoch_t pgb_get_osdmap_epoch() const override final {
+ return get_osdmap_epoch();
+ }
+ void cancel_pull(const hobject_t &soid) override {
+ // TODO
+ }
+ const std::set<pg_shard_t> &get_acting_shards() const override {
+ return get_actingset();
+ }
+ const std::set<pg_shard_t> &get_backfill_shards() const override {
+ return peering_state.get_backfill_targets();
+ }
+ const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+ return peering_state.get_peer_info();
+ }
+ const pg_info_t &get_shard_info(pg_shard_t peer) const override {
+ if (peer == get_primary()) {
+ return get_info();
+ } else {
+ std::map<pg_shard_t, pg_info_t>::const_iterator i =
+ get_shard_info().find(peer);
+ ceph_assert(i != get_shard_info().end());
+ return i->second;
+ }
+ }
+ ceph_tid_t get_tid() override final {
+ return shard_services.get_tid();
+ }
+ pg_shard_t whoami_shard() const override {
+ return get_pg_whoami();
+ }
+ void send_message_osd_cluster(std::vector<std::pair<int, Message*>>& messages,
+ epoch_t from_epoch) override final {
+ std::ignore = seastar::do_with(std::move(messages),
+ [this, from_epoch](auto&& messages) {
+ return seastar::do_for_each(messages, [this, from_epoch] (auto&& im) {
+ auto& [osd_id, msg] = im;
+ return shard_services.send_to_osd(osd_id, MessageURef{msg}, from_epoch);
+ });
+ });
+ }
+ std::ostream& gen_dbg_prefix(std::ostream& out) const override final {
+ return gen_prefix(out);
+ }
+ const pg_pool_t &get_pool() const override {
+ return peering_state.get_pgpool().info;
+ }
+ const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+ return get_acting_recovery_backfill();
+ }
+ bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override {
+ if (peer == get_primary()) {
+ // TODO XXX FIXME
+ assert(peer == get_primary());
+ return true;
+ }
+ abort();
+ }
+ spg_t primary_spg_t() const override {
+ return spg_t(get_info().pgid.pgid, get_primary().shard);
+ }
+ const PGLog &get_log() const override {
+ return peering_state.get_pg_log();
+ }
+ DoutPrefixProvider *get_dpp() override {
+ return this;
+ }
+ // ECListener ends
+
const pg_shard_t& get_pg_whoami() const final {
return pg_whoami;
}
public:
- cached_map_t get_osdmap() { return peering_state.get_osdmap(); }
+ cached_map_t get_osdmap() const { return peering_state.get_osdmap(); }
eversion_t get_next_version() {
return eversion_t(get_osdmap_epoch(),
projected_last_update.version + 1);
pg_stat_t get_stats() const;
void apply_stats(
const hobject_t &soid,
- const object_stat_sum_t &delta_stats);
+ const object_stat_sum_t &delta_stats) final;
private:
std::optional<pg_stat_t> pg_stats;
epoch_t get_interval_start_epoch() const {
return get_info().history.same_interval_since;
}
- const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
- if (shard == pg_whoami)
+ const pg_missing_const_i* maybe_get_shard_missing(pg_shard_t shard) const {
+ if (shard == pg_whoami) {
return &get_local_missing();
- else {
+ } else {
auto it = peering_state.get_peer_missing().find(shard);
- if (it == peering_state.get_peer_missing().end())
+ if (it == peering_state.get_peer_missing().end()) {
return nullptr;
- else
+ } else {
return &it->second;
+ }
}
}
+ const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const override {
+ auto m = maybe_get_shard_missing(peer);
+ assert(m);
+ return *m;
+ }
struct complete_op_t {
const version_t user_version;
bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const;
bool can_discard_op(const MOSDOp& m) const;
void context_registry_on_change();
- bool is_missing_object(const hobject_t& soid) const {
+ bool is_missing_object(const hobject_t& soid) const final {
return get_local_missing().is_missing(soid);
}
bool is_unreadable_object(const hobject_t &oid,
const std::set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
}
- void add_local_next_event(const pg_log_entry_t& e) {
+ void add_local_next_event(const pg_log_entry_t& e) override final {
peering_state.add_local_next_event(e);
}
- void op_applied(const eversion_t &applied_version);
+ void op_applied(const eversion_t &applied_version) override final;
private:
friend class IOInterruptCondition;