#include <boost/smart_ptr/local_shared_ptr.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
+#include <seastar/core/sleep.hh>
#include "common/dout.h"
#include "crimson/net/Fwd.h"
// will be needed for unblocking IO operations/peering
}
+ template <typename T>
+ void start_peering_event_operation(T &&evt) {
+ shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ shard_services,
+ pg_whoami,
+ pgid,
+ std::forward<T>(evt));
+ }
+
void schedule_event_after(
PGPeeringEventRef event,
float delay) final {
- ceph_assert(0 == "Not implemented yet");
+ // TODO: this is kind of a hack -- once the start_operation call
+ // happens, the operation will be registered, but during the delay
+ // it's just a dangling future. It would be nice for the
+ // operation machinery to have something to take care of this.
+ (void)seastar::sleep(std::chrono::milliseconds(std::lround(delay*1000))).then(
+ [this, event=std::move(event)]() {
+ start_peering_event_operation(std::move(*event));
+ });
}
void request_local_background_io_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
- ceph_assert(0 == "Not implemented yet");
+ shard_services.local_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
}
void update_local_background_io_priority(
unsigned priority) final {
- ceph_assert(0 == "Not implemented yet");
+ shard_services.local_reserver.update_priority(
+ pgid,
+ priority);
}
void cancel_local_background_io_reservation() final {
- // Not implemented yet, but gets called on exit() from some states
+ shard_services.local_reserver.cancel_reservation(
+ pgid);
}
void request_remote_recovery_reservation(
unsigned priority,
PGPeeringEventRef on_grant,
PGPeeringEventRef on_preempt) final {
- ceph_assert(0 == "Not implemented yet");
+ shard_services.remote_reserver.request_reservation(
+ pgid,
+ on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
+ start_peering_event_operation(std::move(*on_grant));
+ }) : nullptr,
+ priority,
+ on_preempt ? make_lambda_context(
+ [this, on_preempt=std::move(on_preempt)] (int) {
+ start_peering_event_operation(std::move(*on_preempt));
+ }) : nullptr);
}
void cancel_remote_recovery_reservation() final {
- // Not implemented yet, but gets called on exit() from some states
+ shard_services.remote_reserver.cancel_reservation(
+ pgid);
}
void schedule_event_on_commit(
ceph::os::Transaction &t,
PGPeeringEventRef on_commit) final {
t.register_on_commit(
- new LambdaContext(
- [this, on_commit=std::move(on_commit)](int r){
- shard_services.start_operation<LocalPeeringEvent>(
- this,
- shard_services,
- pg_whoami,
- pgid,
- std::move(*on_commit));
+ make_lambda_context(
+ [this, on_commit=std::move(on_commit)](int) {
+ start_peering_event_operation(std::move(*on_commit));
}));
}
#include "osd/osd_perf_counters.h"
#include "osd/PeeringState.h"
+#include "crimson/common/config_proxy.h"
#include "crimson/mgr/client.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Messenger.h"
monc(monc),
mgrc(mgrc),
store(store),
- obc_registry(crimson::common::local_conf())
+ obc_registry(crimson::common::local_conf()),
+ local_reserver(
+ &cct,
+ &finisher,
+ crimson::common::local_conf()->osd_max_backfills,
+ crimson::common::local_conf()->osd_min_recovery_priority),
+ remote_reserver(
+ &cct,
+ &finisher,
+ crimson::common::local_conf()->osd_max_backfills,
+ crimson::common::local_conf()->osd_min_recovery_priority)
{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
recoverystate_perf = build_recoverystate_perf(&cct);
cct.get_perfcounters_collection()->add(recoverystate_perf);
+
+ crimson::common::local_conf().add_observer(this);
+}
+
+const char** ShardServices::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_max_backfills",
+ "osd_min_recovery_priority",
+ nullptr
+ };
+ return KEYS;
+}
+
+void ShardServices::handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed)
+{
+ if (changed.count("osd_max_backfills")) {
+ local_reserver.set_max(conf->osd_max_backfills);
+ remote_reserver.set_max(conf->osd_max_backfills);
+ }
+ if (changed.count("osd_min_recovery_priority")) {
+ local_reserver.set_min_priority(conf->osd_min_recovery_priority);
+ remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+ }
}
seastar::future<> ShardServices::send_to_osd(
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/object_context.h"
+#include "common/AsyncReserver.h"
namespace crimson::net {
class Messenger;
/**
* Represents services available to each PG
*/
-class ShardServices {
+class ShardServices : public md_config_obs_t {
using cached_map_t = boost::local_shared_ptr<const OSDMap>;
OSDMapService &osdmap_service;
crimson::net::Messenger &cluster_msgr;
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed) final;
public:
ShardServices(
OSDMapService &osdmap_service,
crimson::osd::ObjectContextRegistry obc_registry;
+ // Async Reservers
private:
unsigned num_pgs = 0;
+
+ struct DirectFinisher {
+ void queue(Context *c) {
+ c->complete(0);
+ }
+ } finisher;
+public:
+ AsyncReserver<spg_t, DirectFinisher> local_reserver;
+ AsyncReserver<spg_t, DirectFinisher> remote_reserver;
};
}