SnapTrimEvent::start()
{
ShardServices &shard_services = pg->get_shard_services();
- return interruptor::with_interruption([&shard_services, this] {
+ return enter_stage<interruptor>(
+ client_pp().wait_for_active
+ ).then_interruptible([this] {
+ return with_blocking_event<PGActivationBlocker::BlockingEvent,
+ interruptor>([this] (auto&& trigger) {
+ return pg->wait_for_active_blocker.wait(std::move(trigger));
+ });
+ }).then_interruptible([this] {
return enter_stage<interruptor>(
- client_pp().wait_for_active
- ).then_interruptible([this] {
- return with_blocking_event<PGActivationBlocker::BlockingEvent,
- interruptor>([this] (auto&& trigger) {
- return pg->wait_for_active_blocker.wait(std::move(trigger));
- });
- }).then_interruptible([this] {
- return enter_stage<interruptor>(
- client_pp().recover_missing);
- }).then_interruptible([] {
- //return do_recover_missing(pg, get_target_oid());
- return seastar::now();
- }).then_interruptible([this] {
- return enter_stage<interruptor>(
- client_pp().get_obc);
- }).then_interruptible([this] {
- return pg->background_process_lock.lock_with_op(*this);
- }).then_interruptible([this] {
- return enter_stage<interruptor>(
- client_pp().process);
- }).then_interruptible([&shard_services, this] {
- return interruptor::async([this] {
- using crimson::common::local_conf;
- const auto max =
- local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
- // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
- // the nullopt below and erase snapid.
- auto to_trim = snap_mapper.get_next_objects_to_trim(
- snapid,
- max);
- if (!to_trim.has_value()) {
- return std::vector<hobject_t>{};
- }
- logger().debug("{}: async almost done line {}", *this, __LINE__);
- return std::move(*to_trim);
- }).then_interruptible([&shard_services, this] (const auto& to_trim) {
- if (to_trim.empty()) {
- // the legit ENOENT -> done
- logger().debug("{}: to_trim is empty! Stopping iteration", *this);
- pg->background_process_lock.unlock();
- return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- }
- return [&shard_services, this](const auto &to_trim) {
- for (const auto& object : to_trim) {
- logger().debug("{}: trimming {}", *this, object);
- subop_blocker.emplace_back(
- shard_services.start_operation_may_interrupt<
- interruptor, SnapTrimObjSubEvent>(
- pg,
- object,
- snapid));
- }
+ client_pp().recover_missing);
+ }).then_interruptible([] {
+ //return do_recover_missing(pg, get_target_oid());
+ return seastar::now();
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().get_obc);
+ }).then_interruptible([this] {
+ return pg->background_process_lock.lock_with_op(*this);
+ }).then_interruptible([this] {
+ return enter_stage<interruptor>(
+ client_pp().process);
+ }).then_interruptible([&shard_services, this] {
+ return interruptor::async([this] {
+ using crimson::common::local_conf;
+ const auto max =
+ local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
+ // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
+ // the nullopt below and erase snapid.
+ auto to_trim = snap_mapper.get_next_objects_to_trim(
+ snapid,
+ max);
+ if (!to_trim.has_value()) {
+ return std::vector<hobject_t>{};
+ }
+ logger().debug("{}: async almost done line {}", *this, __LINE__);
+ return std::move(*to_trim);
+ }).then_interruptible([&shard_services, this] (const auto& to_trim) {
+ if (to_trim.empty()) {
+ // the legit ENOENT -> done
+ logger().debug("{}: to_trim is empty! Stopping iteration", *this);
+ pg->background_process_lock.unlock();
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ return [&shard_services, this](const auto &to_trim) {
+ for (const auto& object : to_trim) {
+ logger().debug("{}: trimming {}", *this, object);
+ subop_blocker.emplace_back(
+ shard_services.start_operation_may_interrupt<
+ interruptor, SnapTrimObjSubEvent>(
+ pg,
+ object,
+ snapid));
+ }
+ return interruptor::now();
+ }(to_trim).then_interruptible([this] {
+ return enter_stage<interruptor>(wait_subop);
+ }).then_interruptible([this] {
+ logger().debug("{}: awaiting completion", *this);
+ return subop_blocker.interruptible_wait_completion();
+ }).finally([this] {
+ pg->background_process_lock.unlock();
+ }).si_then([this] {
+ if (!needs_pause) {
return interruptor::now();
- }(to_trim).then_interruptible([this] {
- return enter_stage<interruptor>(wait_subop);
- }).then_interruptible([this] {
- logger().debug("{}: awaiting completion", *this);
- return subop_blocker.interruptible_wait_completion();
- }).finally([this] {
- pg->background_process_lock.unlock();
- }).si_then([this] {
- if (!needs_pause) {
- return interruptor::now();
- }
- // let's know operators we're waiting
- return enter_stage<interruptor>(
- wait_trim_timer
- ).then_interruptible([this] {
- using crimson::common::local_conf;
- const auto time_to_sleep =
- local_conf().template get_val<double>("osd_snap_trim_sleep");
- logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
- // TODO: this logic should be more sophisticated and distinguish
- // between SSDs, HDDs and the hybrid case
- return seastar::sleep(
- std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
- });
- }).si_then([this] {
- logger().debug("{}: all completed", *this);
- return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- });
- }).si_then([this](auto stop) {
- return handle.complete().then([stop] {
- return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
- });
+ }
+ // let's know operators we're waiting
+ return enter_stage<interruptor>(
+ wait_trim_timer
+ ).then_interruptible([this] {
+ using crimson::common::local_conf;
+ const auto time_to_sleep =
+ local_conf().template get_val<double>("osd_snap_trim_sleep");
+ logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
+ // TODO: this logic should be more sophisticated and distinguish
+ // between SSDs, HDDs and the hybrid case
+ return seastar::sleep(
+ std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
+ });
+ }).si_then([this] {
+ logger().debug("{}: all completed", *this);
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ }).si_then([this](auto stop) {
+ return handle.complete().then([stop] {
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
});
});
- }, [this](std::exception_ptr eptr) -> snap_trim_event_ret_t {
- logger().debug("{}: interrupted {}", *this, eptr);
- return crimson::ct_error::eagain::make();
- }, pg).finally([this] {
+ }).finally([this] {
// This SnapTrimEvent op lifetime is maintained within
// PerShardState::start_operation() implementation.
logger().debug("{}: exit", *this);
return nullptr;
}
+PG::interruptible_future<seastar::stop_iteration> PG::trim_snap(
+ snapid_t to_trim,
+ bool needs_pause)
+{
+ return interruptor::repeat([this, to_trim, needs_pause] {
+ logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
+ *this, to_trim);
+ return shard_services.start_operation_may_interrupt<
+ interruptor, SnapTrimEvent>(
+ this,
+ snap_mapper,
+ to_trim,
+ needs_pause
+ ).second.handle_error_interruptible(
+ crimson::ct_error::enoent::handle([this] {
+ logger().error("{}: ENOENT saw, trimming stopped", *this);
+ peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
+ publish_stats_to_osd();
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ })
+ );
+ }).then_interruptible([this, trimmed=to_trim] {
+ logger().debug("{}: trimmed snap={}", *this, trimmed);
+ snap_trimq.erase(trimmed);
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+}
+
void PG::on_active_actmap()
{
logger().debug("{}: {} snap_trimq={}", *this, __func__, snap_trimq);
peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
if (peering_state.is_active() && peering_state.is_clean()) {
+ if (peering_state.state_test(PG_STATE_SNAPTRIM)) {
+ logger().debug("{}: {} already trimming.", *this, __func__);
+ return;
+ }
// loops until snap_trimq is empty or SNAPTRIM_ERROR.
Ref<PG> pg_ref = this;
- std::ignore = seastar::do_until(
- [this] { return snap_trimq.empty()
- || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR);
- },
- [this] {
- peering_state.state_set(PG_STATE_SNAPTRIM);
+ std::ignore = interruptor::with_interruption([this] {
+ return interruptor::repeat(
+ [this]() -> interruptible_future<seastar::stop_iteration> {
+ if (snap_trimq.empty()
+ || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR)) {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ }
+ peering_state.state_set(PG_STATE_SNAPTRIM);
+ publish_stats_to_osd();
+ const auto to_trim = snap_trimq.range_start();
+ const auto needs_pause = !snap_trimq.empty();
+ return trim_snap(to_trim, needs_pause);
+ }
+ ).finally([this] {
+ logger().debug("{}: PG::on_active_actmap() finished trimming",
+ *this);
+ peering_state.state_clear(PG_STATE_SNAPTRIM);
+ peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
publish_stats_to_osd();
- const auto to_trim = snap_trimq.range_start();
- snap_trimq.erase(to_trim);
- const auto needs_pause = !snap_trimq.empty();
- return seastar::repeat([to_trim, needs_pause, this] {
- logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
- *this, to_trim);
- return shard_services.start_operation<SnapTrimEvent>(
- this,
- snap_mapper,
- to_trim,
- needs_pause
- ).second.handle_error(
- crimson::ct_error::enoent::handle([this] {
- logger().error("{}: ENOENT saw, trimming stopped", *this);
- peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
- publish_stats_to_osd();
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- }), crimson::ct_error::eagain::handle([this] {
- logger().info("{}: EAGAIN saw, trimming restarted", *this);
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
- })
- );
- }).then([this, trimmed=to_trim] {
- logger().debug("{}: trimmed snap={}", *this, trimmed);
- });
- }
- ).finally([this, pg_ref=std::move(pg_ref)] {
- logger().debug("{}: PG::on_active_actmap() finished trimming",
- *this);
+ });
+ }, [this](std::exception_ptr eptr) {
+ logger().debug("{}: snap trimming interrupted", *this);
peering_state.state_clear(PG_STATE_SNAPTRIM);
- peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
- publish_stats_to_osd();
- });
+ }, pg_ref);
+ } else {
+ logger().debug("{}: pg not clean, skipping snap trim");
+ assert(!peering_state.state_test(PG_STATE_SNAPTRIM));
}
}