std::vector<pg_log_entry_t>&& log_entries)
{
// todo
- return {seastar::now(),
- seastar::make_ready_future<crimson::osd::acked_peers_t>()};
+ return make_ready_future<rep_op_ret_t>(seastar::now(),
+ seastar::make_ready_future<crimson::osd::acked_peers_t>());
}
update_clone_overlap();
if (cloning_ctx) {
std::move(*cloning_ctx).apply_to(log_entries, *obc);
- const auto& coid = log_entries.front().soid;
- const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
- maybe_snap_mapped = snap_map_clone(
- coid,
- std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
- snap_mapper,
- osdriver,
- txn);
}
if (snapc.seq > obc->ssc->snapset.seq) {
// update snapset with latest snap context
OSDriver& osdriver,
ceph::os::Transaction& txn);
+public:
static interruptible_future<> snap_map_remove(
const hobject_t& soid,
SnapMapper& snap_mapper,
OSDriver& osdriver,
ceph::os::Transaction& txn);
+private:
// this gizmo could be wrapped in std::optional for the sake of lazy
// initialization. we don't need it for ops that doesn't have effect
// TODO: verify the init overhead of chunked_fifo
if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) {
ceph_assert(log_rit->version == osd_op_params->at_version);
}
- auto [submitted, all_completed] =
- std::forward<MutFunc>(mut_func)(std::move(txn),
- std::move(obc),
- std::move(*osd_op_params),
- std::move(log_entries));
- return interruptor::make_ready_future<rep_op_fut_tuple>(
- std::move(submitted),
- osd_op_ierrorator::future<>(std::move(all_completed)));
+ return std::forward<MutFunc>(mut_func)(std::move(txn),
+ std::move(obc),
+ std::move(*osd_op_params),
+ std::move(log_entries)
+ ).then_interruptible([](auto p) {
+ auto &submitted = std::get<0>(p);
+ auto &all_completed = std::get<1>(p);
+ return interruptor::make_ready_future<rep_op_fut_tuple>(
+ std::move(submitted),
+ osd_op_ierrorator::future<>(std::move(all_completed)));
+ });
});
}
apply_stats();
logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
return remove_or_update(
clone_obc, head_obc
- ).safe_then_interruptible([clone_obc, this](auto&& txn) mutable {
- auto [submitted, all_completed] = pg->submit_transaction(
+ ).safe_then_interruptible(
+ [clone_obc, this](auto&& txn) mutable {
+ return pg->submit_transaction(
std::move(clone_obc),
std::move(txn),
std::move(osd_op_p),
- std::move(log_entries));
- return submitted.then_interruptible(
- [this, all_completed=std::move(all_completed)]() mutable {
- return enter_stage<interruptor>(
- client_pp().wait_repop
- ).then_interruptible([all_completed=std::move(all_completed)]() mutable{
- return std::move(all_completed);
+ std::move(log_entries)
+ ).then_interruptible([this](auto p) {
+ auto &submitted = std::get<0>(p);
+ auto &all_completed = std::get<1>(p);
+ return submitted.then_interruptible(
+ [this, all_completed=std::move(all_completed)]() mutable {
+ return enter_stage<interruptor>(
+ client_pp().wait_repop
+ ).then_interruptible([all_completed=std::move(all_completed)]() mutable{
+ return std::move(all_completed);
+ });
});
- });
+ });
});
});
},
}
}
-std::tuple<PG::interruptible_future<>,
- PG::interruptible_future<>>
+PG::interruptible_future<
+ std::tuple<PG::interruptible_future<>,
+ PG::interruptible_future<>>>
PG::submit_transaction(
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
std::vector<pg_log_entry_t>&& log_entries)
{
if (__builtin_expect(stopping, false)) {
- return {seastar::make_exception_future<>(
- crimson::common::system_shutdown_exception()),
- seastar::now()};
+ co_return std::make_tuple(
+ interruptor::make_interruptible(seastar::make_exception_future<>(
+ crimson::common::system_shutdown_exception())),
+ interruptor::now());
}
epoch_t map_epoch = get_osdmap_epoch();
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
projected_last_update = log_entries.rbegin()->version;
- auto [submitted, all_completed] = backend->submit_transaction(
+ auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
std::move(txn),
peering_state.get_last_peering_reset(),
map_epoch,
std::move(log_entries));
- return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
- [this, last_complete=peering_state.get_info().last_complete,
+ co_return std::make_tuple(
+ std::move(submitted),
+ all_completed.then_interruptible(
+ [this, last_complete=peering_state.get_info().last_complete,
at_version=osd_op_p.at_version](auto acked) {
- for (const auto& peer : acked) {
- peering_state.update_peer_last_complete_ondisk(
- peer.shard, peer.last_complete_ondisk);
- }
- peering_state.complete_write(at_version, last_complete);
- return seastar::now();
- }));
+ for (const auto& peer : acked) {
+ peering_state.update_peer_last_complete_ondisk(
+ peer.shard, peer.last_complete_ondisk);
+ }
+ peering_state.complete_write(at_version, last_complete);
+ return seastar::now();
+ })
+ );
}
PG::interruptible_future<> PG::repair_object(
std::vector<pg_log_entry_t> log_entries;
decode(log_entries, p);
update_stats(req->pg_stats);
+
+ co_await update_snap_map(
+ log_entries,
+ txn);
+
log_operation(std::move(log_entries),
req->pg_trim_to,
req->version,
);
co_return;
}
+
+PG::interruptible_future<> PG::update_snap_map(
+ const std::vector<pg_log_entry_t> &log_entries,
+ ObjectStore::Transaction& t)
+{
+ LOG_PREFIX(PG::update_snap_map);
+ for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
+ OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+ if (i->soid.snap < CEPH_MAXSNAP) {
+ if (i->is_delete()) {
+ co_await OpsExecuter::snap_map_remove(
+ i->soid,
+ snap_mapper,
+ osdriver,
+ t);
+ } else if (i->is_update()) {
+ ceph_assert(i->snaps.length() > 0);
+ vector<snapid_t> snaps;
+ bufferlist snapbl = i->snaps;
+ auto p = snapbl.cbegin();
+ try {
+ decode(snaps, p);
+ } catch (...) {
+ ERRORDPP("Failed to decode snaps on {}", *this, *i);
+ snaps.clear();
+ }
+ set<snapid_t> _snaps(snaps.begin(), snaps.end());
+
+ if (i->is_clone() || i->is_promote()) {
+ co_await OpsExecuter::snap_map_clone(
+ i->soid,
+ _snaps,
+ snap_mapper,
+ osdriver,
+ t);
+ } else if (i->is_modify()) {
+ co_await OpsExecuter::snap_map_modify(
+ i->soid,
+ _snaps,
+ snap_mapper,
+ osdriver,
+ t);
+ } else {
+ ceph_assert(i->is_clean());
+ }
+ }
+ }
+ }
}
void PG::log_operation(
interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
void update_stats(const pg_stat_t &stat);
+ interruptible_future<> update_snap_map(
+ const std::vector<pg_log_entry_t> &log_entries,
+ ObjectStore::Transaction& t);
void log_operation(
std::vector<pg_log_entry_t>&& logv,
const eversion_t &trim_to,
SuccessFunc&& success_func,
FailureFunc&& failure_func);
interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
- std::tuple<interruptible_future<>, interruptible_future<>>
+ interruptible_future<
+ std::tuple<interruptible_future<>, interruptible_future<>>>
submit_transaction(
ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
using interruptible_future =
::crimson::interruptible::interruptible_future<
::crimson::osd::IOInterruptCondition, T>;
- using rep_op_fut_t =
+ using rep_op_ret_t =
std::tuple<interruptible_future<>,
interruptible_future<crimson::osd::acked_peers_t>>;
+ using rep_op_fut_t = interruptible_future<rep_op_ret_t>;
PGBackend(shard_id_t shard, CollectionRef coll,
crimson::osd::ShardServices &shard_services,
DoutPrefixProvider &dpp);
#include "messages/MOSDRepOpReply.h"
+#include "crimson/common/coroutine.h"
#include "crimson/common/exception.h"
#include "crimson/common/log.h"
#include "crimson/os/futurized_store.h"
ReplicatedBackend::rep_op_fut_t
ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
const hobject_t& hoid,
- ceph::os::Transaction&& txn,
- osd_op_params_t&& osd_op_p,
+ ceph::os::Transaction&& t,
+ osd_op_params_t&& opp,
epoch_t min_epoch, epoch_t map_epoch,
- std::vector<pg_log_entry_t>&& log_entries)
+ std::vector<pg_log_entry_t>&& logv)
{
LOG_PREFIX(ReplicatedBackend::submit_transaction);
DEBUGDPP("object {}", dpp, hoid);
+ auto log_entries = std::move(logv);
+ auto txn = std::move(t);
+ auto osd_op_p = std::move(opp);
const ceph_tid_t tid = shard_services.get_tid();
auto pending_txn =
}
}
+ co_await pg.update_snap_map(log_entries, txn);
+
pg.log_operation(
std::move(log_entries),
osd_op_p.pg_trim_to,
false);
auto all_completed = interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(txn))
- ).then_interruptible([FNAME, this,
+ shard_services.get_store().do_transaction(coll, std::move(txn))
+ ).then_interruptible([FNAME, this,
peers=pending_txn->second.weak_from_this()] {
if (!peers) {
// for now, only actingset_changed can cause peers
}).then_interruptible([pending_txn, this] {
auto acked_peers = std::move(pending_txn->second.acked_peers);
pending_trans.erase(pending_txn);
- return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
+ return seastar::make_ready_future<
+ crimson::osd::acked_peers_t>(std::move(acked_peers));
});
auto sends_complete = seastar::when_all_succeed(
sends->begin(), sends->end()
).finally([sends=std::move(sends)] {});
- return {std::move(sends_complete), std::move(all_completed)};
+ co_return std::make_tuple(std::move(sends_complete), std::move(all_completed));
}
void ReplicatedBackend::on_actingset_changed(bool same_primary)