}
}
+void BackfillState::enqueue_standalone_push(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers) {
+ progress_tracker->enqueue_push(obj);
+ backfill_machine.backfill_listener.enqueue_push(obj, v, peers);
+}
+
} // namespace crimson::osd
backfill_machine.process_event(*std::move(evt));
}
+ void enqueue_standalone_push(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers);
+
+ bool is_triggered() const {
+ return backfill_machine.triggering_event() != nullptr;
+ }
+
hobject_t get_last_backfill_started() const {
return last_backfill_started;
}
ECBackend::rep_op_fut_t
ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
+ crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
rep_op_fut_t
submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
+ crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& req,
epoch_t min_epoch, epoch_t max_epoch,
};
encode(cloned_snaps, cloning_ctx->log_entry.snaps);
cloning_ctx->log_entry.clean_regions.mark_data_region_dirty(0, initial_obs.oi.size);
+ cloning_ctx->clone_obc = clone_obc;
return cloning_ctx;
}
void OpsExecuter::CloningContext::apply_to(
std::vector<pg_log_entry_t>& log_entries,
- ObjectContext& processed_obc) &&
+ ObjectContext& processed_obc)
{
log_entry.mtime = processed_obc.obs.oi.mtime;
log_entries.insert(log_entries.begin(), std::move(log_entry));
assert(!txn.empty());
update_clone_overlap();
if (cloning_ctx) {
- std::move(*cloning_ctx).apply_to(log_entries, *obc);
+ cloning_ctx->apply_to(log_entries, *obc);
}
if (snapc.seq > obc->ssc->snapset.seq) {
// update snapset with latest snap context
struct CloningContext {
SnapSet new_snapset;
pg_log_entry_t log_entry;
+ ObjectContextRef clone_obc;
void apply_to(
std::vector<pg_log_entry_t>& log_entries,
- ObjectContext& processed_obc) &&;
+ ObjectContext& processed_obc);
};
std::unique_ptr<CloningContext> cloning_ctx;
std::move(txn),
std::move(obc),
std::move(*osd_op_params),
- std::move(log_entries));
+ std::move(log_entries),
+ cloning_ctx
+ ? std::move(cloning_ctx->clone_obc)
+ : nullptr);
submitted = std::move(_submitted);
all_completed = std::move(_all_completed);
auto [submitted, all_completed] = co_await pg->submit_transaction(
std::move(clone_obc),
+ nullptr,
std::move(txn),
std::move(osd_op_p),
std::move(log_entries)
}
}
+void PG::enqueue_push_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers)
+{
+ assert(recovery_handler);
+ assert(recovery_handler->backfill_state);
+ auto backfill_state = recovery_handler->backfill_state.get();
+ backfill_state->enqueue_standalone_push(obj, v, peers);
+}
+
PG::interruptible_future<
std::tuple<PG::interruptible_future<>,
PG::interruptible_future<>>>
PG::submit_transaction(
ObjectContextRef&& obc,
+ ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
std::vector<pg_log_entry_t>&& log_entries)
auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
+ std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
peering_state.get_last_peering_reset(),
co_return std::make_tuple(
std::move(submitted),
all_completed.then_interruptible(
- [this, at_version,
- last_complete=peering_state.get_info().last_complete](auto acked) {
+ [this, last_complete=peering_state.get_info().last_complete, at_version]
+ (auto acked) {
for (const auto& peer : acked) {
peering_state.update_peer_last_complete_ondisk(
peer.shard, peer.last_complete_ondisk);
[FNAME, this](auto&& txn,
auto&& obc,
auto&& osd_op_p,
- auto&& log_entries) {
+ auto&& log_entries,
+ auto&& new_clone) {
DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
mutate_object(obc, txn, osd_op_p);
return submit_transaction(
std::move(obc),
+ std::move(new_clone),
std::move(txn),
std::move(osd_op_p),
std::move(log_entries));
class MQuery;
class OSDMap;
class PGBackend;
+class ReplicatedBackend;
class PGPeeringEvent;
class osd_op_params_t;
std::tuple<interruptible_future<>, interruptible_future<>>>
submit_transaction(
ObjectContextRef&& obc,
+ ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& oop,
std::vector<pg_log_entry_t>&& log_entries);
friend class SnapTrimObjSubEvent;
private:
+ void enqueue_push_for_backfill(
+ const hobject_t &obj,
+ const eversion_t &v,
+ const std::vector<pg_shard_t> &peers);
void mutate_object(
ObjectContextRef& obc,
ceph::os::Transaction& txn,
private:
friend class IOInterruptCondition;
+ friend class ::ReplicatedBackend;
struct log_update_t {
std::set<pg_shard_t> waiting_on;
seastar::shared_promise<> all_committed;
virtual rep_op_fut_t
submit_transaction(const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
+ crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
seastar::future<> stop() { return seastar::now(); }
void on_pg_clean();
+ void enqueue_push(
+ const hobject_t& obj,
+ const eversion_t& v,
+ const std::vector<pg_shard_t> &peers) final;
private:
PGRecoveryListener* pg;
size_t start_primary_recovery_ops(
const hobject_t& end) final;
void request_primary_scan(
const hobject_t& begin) final;
- void enqueue_push(
- const hobject_t& obj,
- const eversion_t& v,
- const std::vector<pg_shard_t> &peers) final;
void enqueue_drop(
const pg_shard_t& target,
const hobject_t& obj,
epoch_t min_epoch,
epoch_t map_epoch,
const std::vector<pg_log_entry_t> &log_entries,
+ bool send_op,
ceph_tid_t tid)
{
ceph_assert(pg_shard != whoami);
min_epoch,
tid,
osd_op_p.at_version);
- if (pg.should_send_op(pg_shard, hoid)) {
+ if (send_op) {
m->set_data(encoded_txn);
} else {
ceph::os::Transaction t;
}
ReplicatedBackend::rep_op_fut_t
-ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
- const hobject_t& hoid,
- ceph::os::Transaction&& t,
- osd_op_params_t&& opp,
- epoch_t min_epoch, epoch_t map_epoch,
- std::vector<pg_log_entry_t>&& logv)
+ReplicatedBackend::submit_transaction(
+ const std::set<pg_shard_t> &pg_shards,
+ const hobject_t& hoid,
+ crimson::osd::ObjectContextRef &&new_clone,
+ ceph::os::Transaction&& t,
+ osd_op_params_t&& opp,
+ epoch_t min_epoch, epoch_t map_epoch,
+ std::vector<pg_log_entry_t>&& logv)
{
LOG_PREFIX(ReplicatedBackend::submit_transaction);
DEBUGDPP("object {}", dpp, hoid);
auto log_entries = std::move(logv);
auto txn = std::move(t);
auto osd_op_p = std::move(opp);
+ auto _new_clone = std::move(new_clone);
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
le.mark_unrollbackable();
}
+ std::vector<pg_shard_t> to_push_clone;
auto sends = std::make_unique<std::vector<seastar::future<>>>();
- for (auto pg_shard : pg_shards) {
- if (pg_shard != whoami) {
- auto m = new_repop_msg(
+ for (auto &pg_shard : pg_shards) {
+ if (pg_shard == whoami) {
+ continue;
+ }
+ MURef<MOSDRepOp> m;
+ if (pg.should_send_op(pg_shard, hoid)) {
+ m = new_repop_msg(
+ pg_shard, hoid, encoded_txn, osd_op_p,
+ min_epoch, map_epoch, log_entries, true, tid);
+ } else {
+ m = new_repop_msg(
pg_shard, hoid, encoded_txn, osd_op_p,
- min_epoch, map_epoch, log_entries, tid);
- pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
- // TODO: set more stuff. e.g., pg_states
- sends->emplace_back(
- shard_services.send_to_osd(
- pg_shard.osd, std::move(m), map_epoch));
+ min_epoch, map_epoch, log_entries, false, tid);
+ if (_new_clone && pg.is_missing_on_peer(pg_shard, hoid)) {
+ // The head is in the push queue but hasn't been pushed yet.
+ // We need to ensure that the newly created clone will be
+ // pushed as well, otherwise we might skip it.
+ // See: https://tracker.ceph.com/issues/68808
+ to_push_clone.push_back(pg_shard);
+ }
}
+ pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
+ // TODO: set more stuff. e.g., pg_states
+ sends->emplace_back(
+ shard_services.send_to_osd(
+ pg_shard.osd, std::move(m), map_epoch));
}
co_await pg.update_snap_map(log_entries, txn);
return seastar::now();
}
return peers->all_committed.get_shared_future();
- }).then_interruptible([pending_txn, this] {
+ }).then_interruptible([pending_txn, this, _new_clone,
+ to_push_clone=std::move(to_push_clone)] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
+ if (_new_clone && !to_push_clone.empty()) {
+ pg.enqueue_push_for_backfill(
+ _new_clone->obs.oi.soid,
+ _new_clone->obs.oi.version,
+ to_push_clone);
+ }
return seastar::make_ready_future<
crimson::osd::acked_peers_t>(std::move(acked_peers));
});
rep_op_fut_t submit_transaction(
const std::set<pg_shard_t> &pg_shards,
const hobject_t& hoid,
+ crimson::osd::ObjectContextRef&& new_clone,
ceph::os::Transaction&& txn,
osd_op_params_t&& osd_op_p,
epoch_t min_epoch, epoch_t max_epoch,
epoch_t min_epoch,
epoch_t map_epoch,
const std::vector<pg_log_entry_t> &log_entries,
+ bool send_op,
ceph_tid_t tid);
seastar::future<> request_committed(