f->close_section();
}
-seastar::future<> SnapTrimEvent::start()
+seastar::future<seastar::stop_iteration> SnapTrimEvent::start()
{
logger().debug("{}: {}", *this, __func__);
return with_pg(
return pg->request_pg_pipeline;
}
-seastar::future<> SnapTrimEvent::with_pg(
+seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
ShardServices &shard_services, Ref<PG> _pg)
{
return interruptor::with_interruption([&shard_services, this] {
return enter_stage<interruptor>(
pp().process);
}).then_interruptible([&shard_services, this] {
- std::vector<hobject_t> to_trim;
- assert(!to_trim.empty());
- for (const auto& object : to_trim) {
- logger().debug("{}: trimming {}", object);
- // TODO: start subop and add to subop blcoker
- }
- return seastar::now();
+ return interruptor::async([this] {
+ std::vector<hobject_t> to_trim;
+ using crimson::common::local_conf;
+ const auto max =
+ local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
+ // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
+ // the ENOENT below and erase snapid.
+ int r = snap_mapper.get_next_objects_to_trim(
+ snapid,
+ max,
+ &to_trim);
+ if (r == -ENOENT) {
+ to_trim.clear(); // paranoia
+ return to_trim;
+ } else if (r != 0) {
+ logger().error("{}: get_next_objects_to_trim returned {}",
+ *this, cpp_strerror(r));
+ ceph_abort_msg("get_next_objects_to_trim returned an invalid code");
+ } else {
+ assert(!to_trim.empty());
+ }
+ logger().debug("{}: async almost done line {}", *this, __LINE__);
+ return to_trim;
+ }).then_interruptible([&shard_services, this] (const auto& to_trim) {
+ if (to_trim.empty()) {
+ // ENOENT
+ return interruptor::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ for (const auto& object : to_trim) {
+ logger().debug("{}: trimming {}", *this, object);
+ }
+ return subop_blocker.wait_completion().then([] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ });
});
}, [this](std::exception_ptr eptr) {
// TODO: better debug output
logger().debug("{}: interrupted {}", *this, eptr);
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
}, pg);
}
class Formatter;
}
+class SnapMapper;
+
namespace crimson::osd {
class OSD;
class ShardServices;
class PG;
+// trim up to `max` objects for snapshot `snapid
class SnapTrimEvent final : public PhasedOperationT<SnapTrimEvent> {
public:
static constexpr OperationTypeCode type = OperationTypeCode::snaptrim_event;
- SnapTrimEvent(Ref<PG> pg, snapid_t snapid)
+ SnapTrimEvent(Ref<PG> pg, SnapMapper& snap_mapper, snapid_t snapid)
: pg(std::move(pg)),
+ snap_mapper(snap_mapper),
snapid(snapid) {}
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
- seastar::future<> start();
- seastar::future<> with_pg(
+ seastar::future<seastar::stop_iteration> start();
+ seastar::future<seastar::stop_iteration> with_pg(
ShardServices &shard_services, Ref<PG> pg);
private:
} subop_blocker;
PipelineHandle handle;
Ref<PG> pg;
+ SnapMapper& snap_mapper;
const snapid_t snapid;
public: