#include "crimson/osd/pg.h"
#include "crimson/osd/watch.h"
#include "osd/ClassHandler.h"
+#include "osd/SnapMapper.h"
namespace {
seastar::logger& logger() {
return log_entries;
}
+void OpsExecuter::snap_map_remove(
+ const hobject_t& soid,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn)
+{
+ logger().debug("{}: soid {}", __func__, soid);
+ const auto r = snap_mapper.remove_oid(soid, &txn);
+ if (r) {
+ logger().error("{}: remove_oid {} failed with {}",
+ __func__, soid, r);
+ }
+ // On removal tolerate missing key corruption
+ assert(r == 0 || r == -ENOENT);
+}
+
+void OpsExecuter::snap_map_modify(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn)
+{
+ logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+ assert(std::size(snaps) > 0);
+ [[maybe_unused]] const auto r = snap_mapper.update_snaps(
+ soid, snaps, 0, &txn);
+ assert(r == 0);
+}
+
+void OpsExecuter::snap_map_clone(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn)
+{
+ logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+ assert(std::size(snaps) > 0);
+ snap_mapper.add_oid(soid, snaps, &txn);
+}
+
+OpsExecuter::interruptible_future<> OpsExecuter::flush_snap_map(
+ const std::vector<pg_log_entry_t>& log_entries,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
+{
+ logger().debug("{} log_entries.size()={}",
+ __func__, std::size(log_entries));
+ for (const auto& le : log_entries) {
+ if (le.soid.snap >= CEPH_MAXSNAP) {
+ logger().debug("{} {} >= CEPH_MAXSNAP",
+ __func__, le.soid);
+ continue;
+ }
+ return interruptor::async([_t=osdriver.get_transaction(&txn),
+ &le, &snap_mapper]() mutable {
+ if (le.is_delete()) {
+ logger().debug("flush_snap_map: is_delete()");
+ snap_mapper.remove_oid(
+ le.soid,
+ &_t);
+ } else if (le.is_update()) {
+ assert(le.snaps.length() > 0);
+ std::vector<snapid_t> snaps;
+ ceph::bufferlist snapbl = le.snaps;
+ auto p = snapbl.cbegin();
+ try {
+ decode(snaps, p);
+ } catch (...) {
+ logger().error("flush_snap_map: decode snaps failure on {}", le);
+ snaps.clear();
+ }
+ std::set<snapid_t> _snaps(snaps.begin(), snaps.end());
+ if (le.is_clone() || le.is_promote()) {
+ logger().debug("flush_snap_map: le.is_clone() || le.is_promote()");
+ snap_mapper.add_oid(
+ le.soid,
+ _snaps,
+ &_t);
+ } else if (le.is_modify()) {
+ logger().debug("flush_snap_map: is_modify()");
+ int r = snap_mapper.update_snaps(
+ le.soid,
+ _snaps,
+ 0,
+ &_t);
+ assert(r == 0);
+ } else {
+ assert(le.is_clean());
+ logger().debug("flush_snap_map: is_clean()");
+ }
+ }
+ });
+ }
+ return seastar::now();
+}
+
// Defined here because there is a circular dependency between OpsExecuter and PG
uint32_t OpsExecuter::get_pool_stripe_width() const {
return pg->get_pgpool().info.get_stripe_width();
#include <seastar/core/shared_ptr.hh>
#include "common/dout.h"
+#include "common/map_cacher.hpp"
#include "common/static_ptr.h"
#include "messages/MOSDOp.h"
#include "os/Transaction.h"
struct ObjectState;
struct OSDOp;
+class OSDriver;
+class SnapMapper;
+using SnapMapperTransaction =
+ MapCacher::Transaction<std::string, ceph::buffer::list>;
namespace crimson::osd {
class PG;
void flush_clone_metadata(
std::vector<pg_log_entry_t>& log_entries);
+ interruptible_future<> flush_snap_map(
+ const std::vector<pg_log_entry_t>& log_entries,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+
+ static void snap_map_remove(
+ const hobject_t& soid,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn);
+ static void snap_map_modify(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn);
+ static void snap_map_clone(
+ const hobject_t& soid,
+ const std::set<snapid_t>& snaps,
+ SnapMapper& snap_mapper,
+ SnapMapperTransaction& txn);
+
// this gizmo could be wrapped in std::optional for the sake of lazy
// initialization. we don't need it for ops that doesn't have effect
// TODO: verify the init overhead of chunked_fifo
using rep_op_fut_t =
interruptible_future<rep_op_fut_tuple>;
template <typename MutFunc>
- rep_op_fut_t flush_changes_n_do_ops_effects(const std::vector<OSDOp>& ops,
+ rep_op_fut_t flush_changes_n_do_ops_effects(
+ const std::vector<OSDOp>& ops,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
MutFunc&& mut_func) &&;
std::vector<pg_log_entry_t> prepare_transaction(
const std::vector<OSDOp>& ops);
OpsExecuter::rep_op_fut_t
OpsExecuter::flush_changes_n_do_ops_effects(
const std::vector<OSDOp>& ops,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
MutFunc&& mut_func) &&
{
const bool want_mutate = !txn.empty();
}
auto log_entries = prepare_transaction(ops);
flush_clone_metadata(log_entries);
- auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
- std::move(obc),
- std::move(*osd_op_params),
- std::move(log_entries));
- maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
+ auto maybe_snap_mapped = flush_snap_map(std::as_const(log_entries),
+ snap_mapper,
+ osdriver,
+ txn);
+ apply_stats();
+ maybe_mutated = maybe_snap_mapped.then_interruptible([mut_func=std::move(mut_func),
+ log_entries=std::move(log_entries),
+ this]() mutable {
+ auto [submitted, all_completed] =
+ std::forward<MutFunc>(mut_func)(std::move(txn),
+ std::move(obc),
+ std::move(*osd_op_params),
+ std::move(log_entries));
+ return interruptor::make_ready_future<rep_op_fut_tuple>(
std::move(submitted),
osd_op_ierrorator::future<>(std::move(all_completed)));
+ });
}
apply_stats();