#include <boost/iterator/counting_iterator.hpp>
#include "crimson/common/log.h"
+#include "crimson/osd/pg.h"
#include "crimson/osd/shard_services.h"
#include "ec_backend.h"
}
ECBackend::write_iertr::future<>
-ECBackend::handle_rep_write_op(Ref<MOSDECSubOpWrite>)
+ECBackend::handle_sub_write(
+ pg_shard_t from,
+ ECSubWrite &&op,
+ crimson::osd::PG& pg)
{
- return write_iertr::now();
+ LOG_PREFIX(ECBackend::handle_sub_write);
+ logger().info("{} from {}", __func__, from);
+ if (!op.temp_added.empty()) {
+ add_temp_obj(std::begin(op.temp_added), std::end(op.temp_added));
+ }
+ ceph::os::Transaction txn;
+ if (op.backfill_or_async_recovery) {
+ for (const auto& obj : op.temp_removed) {
+ logger().info("{}: removing object {} since we won't get the transaction",
+ __func__, obj);
+ txn.remove(coll->get_cid(),
+ ghobject_t{obj, ghobject_t::NO_GEN, get_shard()});
+ }
+ }
+ clear_temp_objs(op.temp_removed);
+ logger().debug("{}: missing before {}", __func__, "");
+
+ // flag set to true during async recovery
+ bool async = false;
+ if (pg.is_missing_object(op.soid)) {
+ async = true;
+ logger().debug("{}: {} is missing", __func__, op.soid);
+ for (const auto& e: op.log_entries) {
+ logger().debug("{}: add_next_event entry {}, is_delete {}",
+ __func__, e, e.is_delete());
+ pg.add_local_next_event(e);
+ }
+ }
+ pg.log_operation(
+ std::move(op.log_entries),
+ op.updated_hit_set_history,
+ op.trim_to,
+ op.pg_committed_to,
+ op.pg_committed_to,
+ !op.backfill_or_async_recovery,
+ txn,
+ async);
+ txn.append(op.t); // hack warn
+ logger().debug("{}:{}", __func__, __LINE__);
+ if (op.at_version != eversion_t()) {
+ // dummy rollforward transaction doesn't get at_version
+ // (and doesn't advance it)
+ pg.op_applied(op.at_version);
+ }
+ logger().debug("{}:{}", __func__, __LINE__);
+ return store->do_transaction(coll, std::move(txn)).then([FNAME] {
+ DEBUG("transaction commited!");
+ return write_iertr::now();
+ });
+}
+
+ECBackend::write_iertr::future<>
+ECBackend::handle_rep_write_op(
+ Ref<MOSDECSubOpWrite> m,
+ crimson::osd::PG& pg)
+{
+ LOG_PREFIX(ECBackend::handle_rep_write_op);
+ const auto tid = m->op.tid;
+ DEBUG("tid {} from {}", tid, m->op.from);
+ return handle_sub_write(
+ m->op.from, std::move(m->op), pg
+ ).si_then([&pg] {
+ assert(!pg.pgb_is_primary());
+ return write_iertr::now();
+ }, crimson::ct_error::assert_all{});
}
ECBackend::write_iertr::future<>
namespace crimson::osd {
+class PG;
+
class ECBackend : public PGBackend
{
static ceph::ErasureCodeInterfaceRef create_ec_impl(
}
void on_actingset_changed(bool same_primary) final {}
- write_iertr::future<> handle_rep_write_op(Ref<MOSDECSubOpWrite>);
+ write_iertr::future<> handle_rep_write_op(
+ Ref<MOSDECSubOpWrite>,
+ crimson::osd::PG& pg);
write_iertr::future<> handle_rep_write_reply(Ref<MOSDECSubOpWriteReply>);
ll_read_ierrorator::future<> handle_rep_read_op(Ref<MOSDECSubOpRead>);
ll_read_ierrorator::future<> handle_rep_read_reply(Ref<MOSDECSubOpReadReply>);
osd_op_params_t&& req,
epoch_t min_epoch, epoch_t max_epoch,
std::vector<pg_log_entry_t>&& log_entries) final;
- CollectionRef coll;
seastar::future<> request_committed(const osd_reqid_t& reqid,
const eversion_t& version) final {
return seastar::now();
}
+ write_iertr::future<> handle_sub_write(
+ pg_shard_t from,
+ ECSubWrite&& op,
+ crimson::osd::PG& pg);
+
bool is_single_chunk(const hobject_t& obj, const ECSubRead& op);
ll_read_errorator::future<ceph::bufferlist> maybe_chunked_read(
PG::interruptible_future<> PG::handle_rep_write_op(Ref<MOSDECSubOpWrite> m)
{
+ logger().debug("{}", __func__);
+ if (!is_primary()) {
+ peering_state.update_stats([&new_stats=m->op.stats](auto&, auto &stats) {
+ stats = new_stats;
+ return false;
+ });
+ }
auto* ec_backend=dynamic_cast<::ECBackend*>(&get_backend());
assert(ec_backend);
+ const auto tid = m->op.tid;
return ec_backend->handle_rep_write_op(
- std::move(m)
- ).handle_error_interruptible(crimson::ct_error::assert_all{});
+ std::move(m),
+ *this
+ ).si_then([this, then_lcod=peering_state.get_info().last_complete, tid] {
+ logger().debug("{} sending response", "handle_rep_write_op");
+ peering_state.update_last_complete_ondisk(then_lcod);
+ auto r = crimson::make_message<MOSDECSubOpWriteReply>();
+ r->pgid = spg_t(peering_state.get_info().pgid.pgid, get_primary().shard);
+ r->map_epoch = get_osdmap_epoch();
+ r->min_epoch = peering_state.get_info().history.same_interval_since;
+ r->op.tid = tid;
+ r->op.last_complete = then_lcod;
+ r->op.committed = true;
+ r->op.applied = true;
+ r->op.from = pg_whoami;
+ r->set_priority(CEPH_MSG_PRIO_HIGH);
+ return shard_services.send_to_osd(get_primary().osd, std::move(r), get_osdmap_epoch());
+ }).handle_error_interruptible(crimson::ct_error::assert_all{});
}
PG::interruptible_future<> PG::handle_rep_read_op(Ref<MOSDECSubOpRead> m)
// by crimson yet
}
+void PG::op_applied(const eversion_t &applied_version)
+{
+ logger().info("{}: op_applied version {}", __func__, applied_version);
+ assert(applied_version != eversion_t());
+ assert(applied_version <= peering_state.get_info().last_update);
+ peering_state.local_write_applied(applied_version);
+
+#if 0
+ if (is_primary() && m_scrubber) {
+ // if there's a scrub operation waiting for the selected chunk to be fully updated -
+ // allow it to continue
+ m_scrubber->on_applied_when_primary(recovery_state.get_last_update_applied());
+ }
+#endif
+}
+
PG::interruptible_future<std::optional<PG::complete_op_t>>
PG::already_complete(const osd_reqid_t& reqid)
{
class MQuery;
class OSDMap;
+class ECBackend;
class PGPeeringEvent;
class osd_op_params_t;
friend class WatchTimeoutRequest;
friend class SnapTrimEvent;
friend class SnapTrimObjSubEvent;
+ friend ECBackend;
private:
void enqueue_push_for_backfill(
const std::set<pg_shard_t> &get_actingset() const {
return peering_state.get_actingset();
}
+ void add_local_next_event(const pg_log_entry_t& e) {
+ peering_state.add_local_next_event(e);
+ }
+ void op_applied(const eversion_t &applied_version);
private:
friend class IOInterruptCondition;
#include "os/Transaction.h"
#include "common/Checksummer.h"
#include "common/Clock.h"
+#include "erasure-code/ErasureCodeInterface.h"
+#include "erasure-code/ErasureCodePlugin.h"
#include "crimson/common/coroutine.h"
#include "crimson/common/exception.h"
shard_services{shard_services},
dpp{dpp},
store{&shard_services.get_store()}
-{}
+{
+ logger().info("initialized PGBackend::store with {}", (void*)this->store);
+}
PGBackend::load_metadata_iertr::future
<PGBackend::loaded_object_md_t::ref>
PushOp* push_op)
{
LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op);
- DEBUGDPP("{}", pg, oid);
+ DEBUGDPP("{} progress.first {}", pg, oid, progress.first);
if (!progress.first) {
return seastar::make_ready_future<eversion_t>(ver);
}