From b031ecda4479cf00abce1a4fc3639f1730434bba Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 30 Sep 2019 01:11:42 +0800 Subject: [PATCH] crimson/osd/pg: implement queue_check_readable() and recheck_readable() see also 7aec060e0aff4588ff51f744dd194d1c0f7793c2 Signed-off-by: Kefu Chai --- src/crimson/osd/pg.cc | 68 +++++++++++++++++++++++++++++++++++++++++-- src/crimson/osd/pg.h | 9 ++---- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index c6b4331721e..6de58af4e44 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -30,10 +30,8 @@ #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" -#include "crimson/os/futurized_collection.h" -#include "os/Transaction.h" #include "crimson/os/cyan_store.h" - +#include "crimson/os/futurized_collection.h" #include "crimson/osd/exceptions.h" #include "crimson/osd/pg_meta.h" #include "crimson/osd/pg_backend.h" @@ -46,6 +44,15 @@ namespace { } } +namespace std::chrono { +std::ostream& operator<<(std::ostream& out, const signedspan& d) +{ + auto s = std::chrono::duration_cast(d).count(); + auto ns = std::abs((d % 1s).count()); + fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : ""); +} +} + namespace ceph::osd { using ceph::common::local_conf; @@ -129,6 +136,61 @@ bool PG::try_flush_or_schedule_async() { return false; } +void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay) +{ + seastar::sleep(delay).then([last_peering_reset, this] { + shard_services.start_operation( + this, + shard_services, + pg_whoami, + pgid, + last_peering_reset, + last_peering_reset, + PeeringState::CheckReadable{}); + }); +} + +void PG::recheck_readable() +{ + bool changed = false; + const auto mnow = shard_services.get_mnow(); + if (peering_state.state_test(PG_STATE_WAIT)) { + auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub(); + if (mnow < prior_readable_until_ub) { + logger().info("{} will wait (mnow {} < prior_readable_until_ub {})", + __func__, mnow, prior_readable_until_ub); + } else { + logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})", + __func__, mnow, prior_readable_until_ub); + peering_state.state_clear(PG_STATE_WAIT); + peering_state.clear_prior_readable_until_ub(); + changed = true; + } + } + if (peering_state.state_test(PG_STATE_LAGGY)) { + auto readable_until = peering_state.get_readable_until(); + if (readable_until == readable_until.zero()) { + logger().info("{} still laggy (mnow {}, readable_until zero)", + __func__, mnow); + } else if (mnow >= readable_until) { + logger().info("{} still laggy (mnow {} >= readable_until {})", + __func__, mnow, readable_until); + } else { + logger().info("{} no longer laggy (mnow {} < readable_until {})", + __func__, mnow, readable_until); + peering_state.state_clear(PG_STATE_LAGGY); + changed = true; + } + } + if (changed) { + publish_stats_to_osd(); + if (!peering_state.state_test(PG_STATE_WAIT) && + !peering_state.state_test(PG_STATE_LAGGY)) { + // TODO: requeue ops waiting for readable + } + } +} + void PG::on_activate(interval_set) { projected_last_update = peering_state.get_info().last_update; diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index b25c2f911fc..6a87b701c1d 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -258,12 +258,9 @@ public: // Not needed yet } - void queue_check_readable(epoch_t lpr, ceph::timespan delay) final { -#warning write me - } - void recheck_readable() final { -#warning write me - } + void queue_check_readable(epoch_t last_peering_reset, + ceph::timespan delay) final; + void recheck_readable() final; void on_pool_change() final { // Not needed yet -- 2.39.5