#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
-#include "crimson/os/futurized_collection.h"
-#include "os/Transaction.h"
#include "crimson/os/cyan_store.h"
-
+#include "crimson/os/futurized_collection.h"
#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/pg_backend.h"
}
}
+namespace std::chrono {
+std::ostream& operator<<(std::ostream& out, const signedspan& d)
+{
+ auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
+ auto ns = std::abs((d % 1s).count());
+ fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
+}
+}
+
namespace ceph::osd {
using ceph::common::local_conf;
return false;
}
+void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
+{
+ seastar::sleep(delay).then([last_peering_reset, this] {
+ shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ shard_services,
+ pg_whoami,
+ pgid,
+ last_peering_reset,
+ last_peering_reset,
+ PeeringState::CheckReadable{});
+ });
+}
+
+void PG::recheck_readable()
+{
+ bool changed = false;
+ const auto mnow = shard_services.get_mnow();
+ if (peering_state.state_test(PG_STATE_WAIT)) {
+ auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
+ if (mnow < prior_readable_until_ub) {
+ logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
+ __func__, mnow, prior_readable_until_ub);
+ } else {
+ logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
+ __func__, mnow, prior_readable_until_ub);
+ peering_state.state_clear(PG_STATE_WAIT);
+ peering_state.clear_prior_readable_until_ub();
+ changed = true;
+ }
+ }
+ if (peering_state.state_test(PG_STATE_LAGGY)) {
+ auto readable_until = peering_state.get_readable_until();
+ if (readable_until == readable_until.zero()) {
+ logger().info("{} still laggy (mnow {}, readable_until zero)",
+ __func__, mnow);
+ } else if (mnow >= readable_until) {
+ logger().info("{} still laggy (mnow {} >= readable_until {})",
+ __func__, mnow, readable_until);
+ } else {
+ logger().info("{} no longer laggy (mnow {} < readable_until {})",
+ __func__, mnow, readable_until);
+ peering_state.state_clear(PG_STATE_LAGGY);
+ changed = true;
+ }
+ }
+ if (changed) {
+ publish_stats_to_osd();
+ if (!peering_state.state_test(PG_STATE_WAIT) &&
+ !peering_state.state_test(PG_STATE_LAGGY)) {
+ // TODO: requeue ops waiting for readable
+ }
+ }
+}
+
void PG::on_activate(interval_set<snapid_t>)
{
projected_last_update = peering_state.get_info().last_update;