std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
const std::vector<OSDOp>& ops)
{
+ // let's ensure we don't need to inform SnapMapper about this particular
+ // entry.
+ assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP);
std::vector<pg_log_entry_t> log_entries;
log_entries.emplace_back(
obc->obs.exists ?
return log_entries;
}
-void OpsExecuter::snap_map_remove(
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove(
const hobject_t& soid,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn)
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
{
logger().debug("{}: soid {}", __func__, soid);
- const auto r = snap_mapper.remove_oid(soid, &txn);
- if (r) {
- logger().error("{}: remove_oid {} failed with {}",
- __func__, soid, r);
- }
- // On removal tolerate missing key corruption
- assert(r == 0 || r == -ENOENT);
+ return interruptor::async([soid, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ const auto r = snap_mapper.remove_oid(soid, &_t);
+ if (r) {
+ logger().error("{}: remove_oid {} failed with {}",
+ __func__, soid, r);
+ }
+ // On removal tolerate missing key corruption
+ assert(r == 0 || r == -ENOENT);
+ });
}
-void OpsExecuter::snap_map_modify(
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify(
const hobject_t& soid,
const std::set<snapid_t>& snaps,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn)
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
{
logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
- assert(std::size(snaps) > 0);
- [[maybe_unused]] const auto r = snap_mapper.update_snaps(
- soid, snaps, 0, &txn);
- assert(r == 0);
+ return interruptor::async([soid, snaps, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ assert(std::size(snaps) > 0);
+ [[maybe_unused]] const auto r = snap_mapper.update_snaps(
+ soid, snaps, 0, &_t);
+ assert(r == 0);
+ });
}
-void OpsExecuter::snap_map_clone(
+OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone(
const hobject_t& soid,
const std::set<snapid_t>& snaps,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn)
-{
- logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
- assert(std::size(snaps) > 0);
- snap_mapper.add_oid(soid, snaps, &txn);
-}
-
-OpsExecuter::interruptible_future<> OpsExecuter::flush_snap_map(
- const std::vector<pg_log_entry_t>& log_entries,
- SnapMapper& snap_mapper,
OSDriver& osdriver,
ceph::os::Transaction& txn)
{
- logger().debug("{} log_entries.size()={}",
- __func__, std::size(log_entries));
- for (const auto& le : log_entries) {
- if (le.soid.snap >= CEPH_MAXSNAP) {
- logger().debug("{} {} >= CEPH_MAXSNAP",
- __func__, le.soid);
- continue;
- }
- return interruptor::async([_t=osdriver.get_transaction(&txn),
- &le, &snap_mapper]() mutable {
- if (le.is_delete()) {
- logger().debug("flush_snap_map: is_delete()");
- snap_mapper.remove_oid(
- le.soid,
- &_t);
- } else if (le.is_update()) {
- assert(le.snaps.length() > 0);
- std::vector<snapid_t> snaps;
- ceph::bufferlist snapbl = le.snaps;
- auto p = snapbl.cbegin();
- try {
- decode(snaps, p);
- } catch (...) {
- logger().error("flush_snap_map: decode snaps failure on {}", le);
- snaps.clear();
- }
- std::set<snapid_t> _snaps(snaps.begin(), snaps.end());
- if (le.is_clone() || le.is_promote()) {
- logger().debug("flush_snap_map: le.is_clone() || le.is_promote()");
- snap_mapper.add_oid(
- le.soid,
- _snaps,
- &_t);
- } else if (le.is_modify()) {
- logger().debug("flush_snap_map: is_modify()");
- int r = snap_mapper.update_snaps(
- le.soid,
- _snaps,
- 0,
- &_t);
- assert(r == 0);
- } else {
- assert(le.is_clean());
- logger().debug("flush_snap_map: is_clean()");
- }
- }
- });
- }
- return seastar::now();
+ logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
+ return interruptor::async([soid, snaps, &snap_mapper,
+ _t=osdriver.get_transaction(&txn)]() mutable {
+ assert(std::size(snaps) > 0);
+ snap_mapper.add_oid(soid, snaps, &_t);
+ });
}
// Defined here because there is a circular dependency between OpsExecuter and PG
processed_obc.ssc->snapset = std::move(new_snapset);
}
-void OpsExecuter::flush_clone_metadata(
- std::vector<pg_log_entry_t>& log_entries)
+OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
+OpsExecuter::flush_clone_metadata(
+ std::vector<pg_log_entry_t>&& log_entries,
+ SnapMapper& snap_mapper,
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn)
{
assert(!txn.empty());
+ auto maybe_snap_mapped = interruptor::now();
if (cloning_ctx) {
osd_op_params->at_version = pg->next_version();
- std::move(*cloning_ctx).apply_to(osd_op_params->at_version,
- log_entries,
- *obc);
+ std::move(*cloning_ctx).apply_to(
+ osd_op_params->at_version,
+ log_entries,
+ *obc);
+ const auto& coid = log_entries.back().soid;
+ const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
+ maybe_snap_mapped = snap_map_clone(
+ coid,
+ std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
+ snap_mapper,
+ osdriver,
+ txn);
}
if (snapc.seq > obc->ssc->snapset.seq) {
// update snapset with latest snap context
}
logger().debug("{} done, initial snapset={}, new snapset={}",
__func__, obc->obs.oi.soid, obc->ssc->snapset);
+ return std::move(
+ maybe_snap_mapped
+ ).then_interruptible([log_entries=std::move(log_entries)]() mutable {
+ return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
+ std::move(log_entries));
+ });
}
// TODO: make this static
struct OSDOp;
class OSDriver;
class SnapMapper;
-using SnapMapperTransaction =
- MapCacher::Transaction<std::string, ceph::buffer::list>;
namespace crimson::osd {
class PG;
&& snapc.snaps[0] > initial_obc.ssc->snapset.seq; // existing obj is old
}
- void flush_clone_metadata(
- std::vector<pg_log_entry_t>& log_entries);
-
- interruptible_future<> flush_snap_map(
- const std::vector<pg_log_entry_t>& log_entries,
+ interruptible_future<std::vector<pg_log_entry_t>> flush_clone_metadata(
+ std::vector<pg_log_entry_t>&& log_entries,
SnapMapper& snap_mapper,
OSDriver& osdriver,
ceph::os::Transaction& txn);
- static void snap_map_remove(
+ static interruptible_future<> snap_map_remove(
const hobject_t& soid,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn);
- static void snap_map_modify(
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+ static interruptible_future<> snap_map_modify(
const hobject_t& soid,
const std::set<snapid_t>& snaps,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn);
- static void snap_map_clone(
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
+ static interruptible_future<> snap_map_clone(
const hobject_t& soid,
const std::set<snapid_t>& snaps,
SnapMapper& snap_mapper,
- SnapMapperTransaction& txn);
+ OSDriver& osdriver,
+ ceph::os::Transaction& txn);
// this gizmo could be wrapped in std::optional for the sake of lazy
// initialization. we don't need it for ops that doesn't have effect
if (user_modify) {
osd_op_params->user_at_version = osd_op_params->at_version.version;
}
- auto log_entries = prepare_transaction(ops);
- flush_clone_metadata(log_entries);
- auto maybe_snap_mapped = flush_snap_map(std::as_const(log_entries),
- snap_mapper,
- osdriver,
- txn);
- apply_stats();
- maybe_mutated = maybe_snap_mapped.then_interruptible([mut_func=std::move(mut_func),
- log_entries=std::move(log_entries),
- this]() mutable {
+ maybe_mutated = flush_clone_metadata(
+ prepare_transaction(ops),
+ snap_mapper,
+ osdriver,
+ txn
+ ).then_interruptible([mut_func=std::move(mut_func),
+ this](auto&& log_entries) mutable {
+ apply_stats();
auto [submitted, all_completed] =
std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
return maybe_mutated;
} else {
return maybe_mutated.then_unpack_interruptible(
- [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable {
+ // need extra ref pg due to apply_stats() which can be executed after
+ // informing snap mapper
+ [this, pg=this->pg](auto&& submitted, auto&& all_completed) mutable {
return interruptor::make_ready_future<rep_op_fut_tuple>(
std::move(submitted),
all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
#include "crimson/osd/osd_operations/snaptrim_event.h"
#include "crimson/osd/ops_executer.h"
#include "crimson/osd/pg.h"
-#include "include/expected.hpp"
namespace {
seastar::logger& logger() {
});
}
-tl::expected<SnapTrimObjSubEvent::remove_or_update_ret_t, int>
+SnapTrimObjSubEvent::interruptible_future<
+ SnapTrimObjSubEvent::remove_or_update_ret_t>
SnapTrimObjSubEvent::remove_or_update(
ObjectContextRef obc,
ObjectContextRef head_obc)
{
- ceph::os::Transaction txn{};
- std::vector<pg_log_entry_t> log_entries{};
-
- SnapSet& snapset = obc->ssc->snapset;
- auto citer = snapset.clone_snaps.find(coid.snap);
- if (citer == snapset.clone_snaps.end()) {
+ auto citer = obc->ssc->snapset.clone_snaps.find(coid.snap);
+ if (citer == obc->ssc->snapset.clone_snaps.end()) {
logger().error("{}: No clone_snaps in snapset {} for object {}",
*this, snapset, coid);
- return tl::unexpected{-ENOENT};
}
const auto& old_snaps = citer->second;
if (old_snaps.empty()) {
logger().error("{}: no object info snaps for object {}",
*this, coid);
- return tl::unexpected{-ENOENT};
}
if (snapset.seq == 0) {
logger().error("{}: no snapset.seq for object {}",
*this, coid);
- return tl::unexpected{-ENOENT};
}
const OSDMapRef& osdmap = pg->get_osdmap();
std::set<snapid_t> new_snaps;
if (p == snapset.clones.end()) {
logger().error("{}: Snap {} not in clones",
*this, coid.snap);
- return tl::unexpected{-ENOENT};
}
}
+
+ return seastar::do_with(ceph::os::Transaction{}, [=, this](auto&& txn) {
+ std::vector<pg_log_entry_t> log_entries{};
+
int64_t num_objects_before_trim = delta_stats.num_objects;
osd_op_p.at_version = pg->next_version();
object_info_t &coi = obc->obs.oi;
+ auto ret = interruptor::now();
if (new_snaps.empty()) {
// remove clone
logger().info("{}: {} snaps {} -> {} ... deleting",
coi = object_info_t(coid);
- auto smtxn = pg->osdriver.get_transaction(&txn);
- OpsExecuter::snap_map_remove(coid, pg->snap_mapper, smtxn);
+ ret = OpsExecuter::snap_map_remove(coid, pg->snap_mapper, pg->osdriver, txn);
} else {
// save adjusted snaps for this object
logger().info("{}: {} snaps {} -> {}",
coi.mtime,
0}
);
- auto smtxn = pg->osdriver.get_transaction(&txn);
- OpsExecuter::snap_map_modify(coid, new_snaps, pg->snap_mapper, smtxn);
+ ret = OpsExecuter::snap_map_modify(coid, new_snaps, pg->snap_mapper, pg->osdriver, txn);
}
-
- osd_op_p.at_version = pg->next_version();
-
- // save head snapset
- logger().debug("{}: {} new snapset {} on {}",
- *this, coid, snapset, head_obc->obs.oi);
- const auto head_oid = coid.get_head();
- if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
- // NOTE: this arguably constitutes minor interference with the
- // tiering agent if this is a cache tier since a snap trim event
- // is effectively evicting a whiteout we might otherwise want to
- // keep around.
- logger().info("{}: {} removing {}",
- *this, coid, head_oid);
- log_entries.emplace_back(
- pg_log_entry_t{
- pg_log_entry_t::DELETE,
- head_oid,
- osd_op_p.at_version,
- head_obc->obs.oi.version,
- 0,
- osd_reqid_t(),
- coi.mtime, // will be replaced in `apply_to()`
- 0}
- );
- logger().info("{}: remove snap head", *this);
- object_info_t& oi = head_obc->obs.oi;
- delta_stats.num_objects--;
- if (oi.is_dirty()) {
- delta_stats.num_objects_dirty--;
+ return std::move(ret).then_interruptible(
+ [&txn, &snapset, &coi, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this] mutable {
+ osd_op_p.at_version = pg->next_version();
+
+ // save head snapset
+ logger().debug("{}: {} new snapset {} on {}",
+ *this, coid, snapset, head_obc->obs.oi);
+ const auto head_oid = coid.get_head();
+ if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
+ // NOTE: this arguably constitutes minor interference with the
+ // tiering agent if this is a cache tier since a snap trim event
+ // is effectively evicting a whiteout we might otherwise want to
+ // keep around.
+ logger().info("{}: {} removing {}",
+ *this, coid, head_oid);
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::DELETE,
+ head_oid,
+ osd_op_p.at_version,
+ head_obc->obs.oi.version,
+ 0,
+ osd_reqid_t(),
+ coi.mtime, // will be replaced in `apply_to()`
+ 0}
+ );
+ logger().info("{}: remove snap head", *this);
+ object_info_t& oi = head_obc->obs.oi;
+ delta_stats.num_objects--;
+ if (oi.is_dirty()) {
+ delta_stats.num_objects_dirty--;
+ }
+ if (oi.is_omap()) {
+ delta_stats.num_objects_omap--;
+ }
+ if (oi.is_whiteout()) {
+ logger().debug("{}: trimming whiteout on {}",
+ *this, oi.soid);
+ delta_stats.num_whiteouts--;
+ }
+ head_obc->obs.exists = false;
+ head_obc->obs.oi = object_info_t(head_oid);
+ txn.remove(pg->get_collection_ref()->get_cid(),
+ ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
+ } else {
+ snapset.snaps.clear();
+ logger().info("{}: writing updated snapset on {}, snapset is {}",
+ *this, head_oid, snapset);
+ log_entries.emplace_back(
+ pg_log_entry_t{
+ pg_log_entry_t::MODIFY,
+ head_oid,
+ osd_op_p.at_version,
+ head_obc->obs.oi.version,
+ 0,
+ osd_reqid_t(),
+ coi.mtime,
+ 0}
+ );
+
+ head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
+ head_obc->obs.oi.version = osd_op_p.at_version;
+
+ std::map<std::string, ceph::bufferlist, std::less<>> attrs;
+ ceph::bufferlist bl;
+ encode(snapset, bl);
+ attrs[SS_ATTR] = std::move(bl);
+
+ bl.clear();
+ encode(head_obc->obs.oi, bl,
+ pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ attrs[OI_ATTR] = std::move(bl);
+ txn.setattrs(
+ pg->get_collection_ref()->get_cid(),
+ ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
+ attrs);
}
- if (oi.is_omap()) {
- delta_stats.num_objects_omap--;
- }
- if (oi.is_whiteout()) {
- logger().debug("{}: trimming whiteout on {}",
- *this, oi.soid);
- delta_stats.num_whiteouts--;
- }
- head_obc->obs.exists = false;
- head_obc->obs.oi = object_info_t(head_oid);
- txn.remove(pg->get_collection_ref()->get_cid(),
- ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
- } else {
- snapset.snaps.clear();
- logger().info("{}: writing updated snapset on {}, snapset is {}",
- *this, head_oid, snapset);
- log_entries.emplace_back(
- pg_log_entry_t{
- pg_log_entry_t::MODIFY,
- head_oid,
- osd_op_p.at_version,
- head_obc->obs.oi.version,
- 0,
- osd_reqid_t(),
- coi.mtime,
- 0}
- );
-
- head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
- head_obc->obs.oi.version = osd_op_p.at_version;
-
- std::map<std::string, ceph::bufferlist, std::less<>> attrs;
- ceph::bufferlist bl;
- encode(snapset, bl);
- attrs[SS_ATTR] = std::move(bl);
-
- bl.clear();
- encode(head_obc->obs.oi, bl,
- pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
- attrs[OI_ATTR] = std::move(bl);
- txn.setattrs(
- pg->get_collection_ref()->get_cid(),
- ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
- attrs);
- }
- // Stats reporting - Set number of objects trimmed
- if (num_objects_before_trim > delta_stats.num_objects) {
- int64_t num_objects_trimmed =
- num_objects_before_trim - delta_stats.num_objects;
- //add_objects_trimmed_count(num_objects_trimmed);
- }
- return std::make_pair(std::move(txn), std::move(log_entries));
+ // Stats reporting - Set number of objects trimmed
+ if (num_objects_before_trim > delta_stats.num_objects) {
+ //int64_t num_objects_trimmed =
+ // num_objects_before_trim - delta_stats.num_objects;
+ //add_objects_trimmed_count(num_objects_trimmed);
+ }
+ }).then_interruptible(
+ [txn=std::move(txn), log_entries=std::move(log_entries)] () mutable {
+ return interruptor::make_ready_future<remove_or_update_ret_t>(
+ std::make_pair(std::move(txn), std::move(log_entries)));
+ });
+ });
}
seastar::future<> SnapTrimObjSubEvent::with_pg(
pp().process
).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
logger().debug("{}: processing clone_obc={}", *this, clone_obc);
- auto head_obc = clone_obc->head;
- return interruptor::async([=, this]() mutable {
- if (auto ret = remove_or_update(clone_obc, head_obc);
- !ret.has_value()) {
- logger().error("{}: trimmig error {}",
- *this, ret.error());
- //pg->state_set(PG_STATE_SNAPTRIM_ERROR);
- } else {
- auto [txn, log_entries] = std::move(ret).value();
- auto [submitted, all_completed] = pg->submit_transaction(
- std::move(clone_obc),
- std::move(txn),
- std::move(osd_op_p),
- std::move(log_entries));
- submitted.get();
- all_completed.get();
- }
- }).then_interruptible([this] {
- return enter_stage<interruptor>(
- wait_repop
- );
- }).then_interruptible([this, clone_obc=std::move(clone_obc)] {
- return PG::load_obc_iertr::now();
+ return remove_or_update(
+ clone_obc, clone_obc->head
+ ).then_unpack_interruptible([clone_obc, this]
+ (auto&& txn, auto&& log_entries) mutable {
+ auto [submitted, all_completed] = pg->submit_transaction(
+ std::move(clone_obc),
+ std::move(txn),
+ std::move(osd_op_p),
+ std::move(log_entries));
+ return submitted.then_interruptible(
+ [all_completed=std::move(all_completed), this] () mutable {
+ return enter_stage<interruptor>(
+ wait_repop
+ ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
+ return std::move(all_completed);
+ });
+ });
});
});
}).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {