unsigned priority,
PGPeeringEventURef on_grant,
PGPeeringEventURef on_preempt) final {
- shard_services.local_request_reservation(
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.local_request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
void update_local_background_io_priority(
unsigned priority) final {
- shard_services.local_update_priority(
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.local_update_priority(
pgid,
priority);
}
void cancel_local_background_io_reservation() final {
- shard_services.local_cancel_reservation(
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.local_cancel_reservation(
pgid);
}
unsigned priority,
PGPeeringEventURef on_grant,
PGPeeringEventURef on_preempt) final {
- shard_services.remote_request_reservation(
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.remote_request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}
void cancel_remote_recovery_reservation() final {
- shard_services.remote_cancel_reservation(
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.remote_cancel_reservation(
pgid);
}
// Not needed yet
}
void queue_want_pg_temp(const std::vector<int> &wanted) final {
- shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.queue_want_pg_temp(pgid.pgid, wanted);
}
void clear_want_pg_temp() final {
- shard_services.remove_want_pg_temp(pgid.pgid);
+ // TODO -- we probably want to add a mechanism for blocking on this
+ // after handling the peering event
+ std::ignore = shard_services.remove_want_pg_temp(pgid.pgid);
}
void check_recovery_sources(const OSDMapRef& newmap) final {
// Not needed yet
crimson::mgr::Client &mgrc,
crimson::os::FuturizedStore &store)
{
- osd_singleton_state.reset(
- new OSDSingletonState(whoami, cluster_msgr, public_msgr,
- monc, mgrc));
- shard_services.reset(
- new ShardServices(*osd_singleton_state, whoami, store));
- return seastar::now();
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return osd_singleton_state.start_single(
+ whoami, std::ref(cluster_msgr), std::ref(public_msgr),
+ std::ref(monc), std::ref(mgrc)
+ ).then([this, whoami, &store] {
+ ceph::mono_time startup_time = ceph::mono_clock::now();
+ return shard_services.start(
+ std::ref(osd_singleton_state), whoami, startup_time, std::ref(store));
+ });
}
seastar::future<> PGShardManager::stop()
{
- shard_services.reset();
- osd_singleton_state.reset();
- return seastar::now();
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_services.stop(
+ ).then([this] {
+ return osd_singleton_state.stop();
+ });
}
seastar::future<> PGShardManager::load_pgs()
seastar::future<> PGShardManager::stop_pgs()
{
- return get_local_state().stop_pgs();
+ return shard_services.invoke_on_all([](auto &local_service) {
+ return local_service.local_state.stop_pgs();
+ });
}
seastar::future<std::map<pg_t, pg_stat_t>>
PGShardManager::get_pg_stats() const
{
- return seastar::make_ready_future<std::map<pg_t, pg_stat_t>>(
- get_local_state().get_pg_stats());
+ return shard_services.map_reduce0(
+ [](auto &local) {
+ return local.local_state.get_pg_stats();
+ },
+ std::map<pg_t, pg_stat_t>(),
+ [](auto &&left, auto &&right) {
+ left.merge(std::move(right));
+ return std::move(left);
+ });
}
seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
{
- return get_local_state().broadcast_map_to_pgs(
- get_shard_services(), epoch
- ).then([this, epoch] {
+ return shard_services.invoke_on_all([epoch](auto &local_service) {
+ return local_service.local_state.broadcast_map_to_pgs(
+ local_service, epoch
+ );
+ }).then([this, epoch] {
get_osd_singleton_state().osdmap_gate.got_map(epoch);
return seastar::now();
});
}
seastar::future<> PGShardManager::set_up_epoch(epoch_t e) {
- local_state.set_up_epoch(e);
- return seastar::now();
+ return shard_services.invoke_on_all(
+ seastar::smp_submit_to_options{},
+ [e](auto &local_service) {
+ local_service.local_state.set_up_epoch(e);
+ return seastar::now();
+ });
}
}
* etc)
*/
class PGShardManager {
- std::unique_ptr<OSDSingletonState> osd_singleton_state;
- std::unique_ptr<ShardServices> shard_services;
+ seastar::sharded<OSDSingletonState> osd_singleton_state;
+ seastar::sharded<ShardServices> shard_services;
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
crimson::os::FuturizedStore &store);
seastar::future<> stop();
- auto &get_osd_singleton_state() { return *osd_singleton_state; }
- auto &get_osd_singleton_state() const { return *osd_singleton_state; }
- auto &get_shard_services() { return *shard_services; }
- auto &get_shard_services() const { return *shard_services; }
- auto &get_local_state() { return shard_services->local_state; }
- auto &get_local_state() const { return shard_services->local_state; }
+ auto &get_osd_singleton_state() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return osd_singleton_state.local();
+ }
+ auto &get_osd_singleton_state() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return osd_singleton_state.local();
+ }
+ auto &get_shard_services() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_services.local();
+ }
+ auto &get_shard_services() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return shard_services.local();
+ }
+ auto &get_local_state() { return get_shard_services().local_state; }
+ auto &get_local_state() const { return get_shard_services().local_state; }
seastar::future<> update_map(local_cached_map_t &&map) {
- auto fmap = make_local_shared_foreign(std::move(map));
- get_osd_singleton_state().update_map(fmap);
- get_local_state().update_map(std::move(fmap));
- return seastar::now();
+ get_osd_singleton_state().update_map(
+ make_local_shared_foreign(local_cached_map_t(map))
+ );
+ /* We need each core to get its own foreign_ptr<local_cached_map_t>.
+ * foreign_ptr can't be cheaply copied, so we make one for each core
+ * up front. */
+ return seastar::do_with(
+ std::vector<seastar::foreign_ptr<local_cached_map_t>>(),
+ [this, map](auto &fmaps) {
+ fmaps.resize(seastar::smp::count);
+ for (auto &i: fmaps) {
+ i = seastar::foreign_ptr(map);
+ }
+ return shard_services.invoke_on_all(
+ [&fmaps](auto &local) mutable {
+ local.local_state.update_map(
+ make_local_shared_foreign(
+ std::move(fmaps[seastar::this_shard_id()])
+ ));
+ });
+ });
}
- auto stop_registries() {
- return get_local_state().stop_registry();
+ seastar::future<> stop_registries() {
+ return shard_services.invoke_on_all([](auto &local) {
+ return local.local_state.stop_registry();
+ });
}
FORWARD_TO_OSD_SINGLETON(send_pg_created)
template <typename F>
auto with_remote_shard_state(core_id_t core, F &&f) {
- ceph_assert(core == 0);
- auto &local_state_ref = get_local_state();
- auto &shard_services_ref = get_shard_services();
- return seastar::smp::submit_to(
- core,
- [f=std::forward<F>(f), &local_state_ref, &shard_services_ref]() mutable {
+ return shard_services.invoke_on(
+ core, [f=std::move(f)](auto &target_shard_services) mutable {
return std::invoke(
- std::move(f), local_state_ref, shard_services_ref);
+ std::move(f), target_shard_services.local_state,
+ target_shard_services);
});
}
*/
template <typename F>
seastar::future<> for_each_pg(F &&f) const {
- for (auto &&pg: get_local_state().pg_map.get_pgs()) {
- std::apply(f, pg);
- }
- return seastar::now();
+ return sharded_map_seq(
+ shard_services,
+ [f=std::move(f)](const auto &local_service) mutable {
+ for (auto &pg: local_service.local_state.pg_map.get_pgs()) {
+ std::apply(f, pg);
+ }
+ return seastar::now();
+ });
}
auto get_num_pgs() const {
PerShardState::PerShardState(
int whoami,
+ ceph::mono_time startup_time,
crimson::os::FuturizedStore &store)
: whoami(whoami),
store(store),
obc_registry(crimson::common::local_conf()),
next_tid(
static_cast<ceph_tid_t>(seastar::this_shard_id()) <<
- (std::numeric_limits<ceph_tid_t>::digits - 8))
+ (std::numeric_limits<ceph_tid_t>::digits - 8)),
+ startup_time(startup_time)
{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
namespace crimson::osd {
+// seastar::sharded puts start_single on core 0
+constexpr core_id_t PRIMARY_CORE = 0;
+
class PGShardManager;
/**
HeartbeatStampsRef get_hb_stamps(int peer);
std::map<int, HeartbeatStampsRef> heartbeat_stamps;
+ // Time state
+ const ceph::mono_time startup_time;
+ ceph::signedspan get_mnow() const {
+ return ceph::mono_clock::now() - startup_time;
+ }
+
public:
PerShardState(
int whoami,
+ ceph::mono_time startup_time,
crimson::os::FuturizedStore &store);
};
crimson::mon::Client &monc,
crimson::mgr::Client &mgrc);
+private:
const int whoami;
crimson::common::CephContext cct;
seastar::future<> send_pg_temp();
// TODO: add config to control mapping
- PGShardMapping pg_to_shard_mapping{0, 1};
+ PGShardMapping pg_to_shard_mapping{0, seastar::smp::count};
std::set<pg_t> pg_created;
seastar::future<> send_pg_created(pg_t pgid);
seastar::future<> send_pg_created();
void prune_pg_created();
- // Time state
- ceph::mono_time startup_time = ceph::mono_clock::now();
- ceph::signedspan get_mnow() const {
- return ceph::mono_clock::now() - startup_time;
- }
-
struct DirectFinisher {
void queue(Context *c) {
c->complete(0);
using local_cached_map_t = OSDMapService::local_cached_map_t;
PerShardState local_state;
- OSDSingletonState &osd_singleton_state;
+ seastar::sharded<OSDSingletonState> &osd_singleton_state;
+
+ template <typename F, typename... Args>
+ auto with_singleton(F &&f, Args&&... args) {
+ return osd_singleton_state.invoke_on(
+ PRIMARY_CORE,
+ std::forward<F>(f),
+ std::forward<Args>(args)...
+ );
+ }
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
}
#define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
-#define FORWARD_TO_OSD_SINGLETON(METHOD) \
- FORWARD(METHOD, METHOD, osd_singleton_state)
- template <typename F, typename... Args>
- auto with_singleton(F &&f, Args&&... args) {
- return std::invoke(f, osd_singleton_state, std::forward<Args>(args)...);
+#define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET) \
+ template <typename... Args> \
+ auto METHOD(Args&&... args) { \
+ return with_singleton( \
+ [](auto &local_state, auto&&... args) { \
+ return local_state.TARGET( \
+ std::forward<decltype(args)>(args)...); \
+ }, std::forward<Args>(args)...); \
}
+#define FORWARD_TO_OSD_SINGLETON(METHOD) \
+ FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
+
public:
template <typename... PSSArgs>
ShardServices(
- OSDSingletonState &osd_singleton_state,
+ seastar::sharded<OSDSingletonState> &osd_singleton_state,
PSSArgs&&... args)
: local_state(std::forward<PSSArgs>(args)...),
osd_singleton_state(osd_singleton_state) {}
FORWARD_TO_OSD_SINGLETON(send_pg_created)
FORWARD_TO_OSD_SINGLETON(send_alive)
FORWARD_TO_OSD_SINGLETON(send_pg_temp)
- FORWARD_CONST(get_mnow, get_mnow, osd_singleton_state)
+ FORWARD_CONST(get_mnow, get_mnow, local_state)
FORWARD_TO_LOCAL(get_hb_stamps)
FORWARD(pg_created, pg_created, local_state.pg_map)
FORWARD(
get_cached_obc, get_cached_obc, local_state.obc_registry)
- FORWARD(
- local_request_reservation, request_reservation,
- osd_singleton_state.local_reserver)
- FORWARD(
- local_update_priority, update_priority,
- osd_singleton_state.local_reserver)
- FORWARD(
- local_cancel_reservation, cancel_reservation,
- osd_singleton_state.local_reserver)
- FORWARD(
- remote_request_reservation, request_reservation,
- osd_singleton_state.remote_reserver)
- FORWARD(
- remote_cancel_reservation, cancel_reservation,
- osd_singleton_state.remote_reserver)
+ FORWARD_TO_OSD_SINGLETON_TARGET(
+ local_update_priority,
+ local_reserver.update_priority)
+ FORWARD_TO_OSD_SINGLETON_TARGET(
+ local_cancel_reservation,
+ local_reserver.cancel_reservation)
+ FORWARD_TO_OSD_SINGLETON_TARGET(
+ remote_cancel_reservation,
+ remote_reserver.cancel_reservation)
+
+ Context *invoke_context_on_core(core_id_t core, Context *c) {
+ if (!c) return nullptr;
+ return new LambdaContext([core, c](int code) {
+ std::ignore = seastar::smp::submit_to(
+ core,
+ [c, code] {
+ c->complete(code);
+ });
+ });
+ }
+ seastar::future<> local_request_reservation(
+ spg_t item,
+ Context *on_reserved,
+ unsigned prio,
+ Context *on_preempt) {
+ return with_singleton(
+ [item, prio](OSDSingletonState &singleton,
+ Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
+ return singleton.local_reserver.request_reservation(
+ item,
+ wrapped_on_reserved,
+ prio,
+ wrapped_on_preempt);
+ },
+ invoke_context_on_core(seastar::this_shard_id(), on_reserved),
+ invoke_context_on_core(seastar::this_shard_id(), on_preempt));
+ }
+ seastar::future<> remote_request_reservation(
+ spg_t item,
+ Context *on_reserved,
+ unsigned prio,
+ Context *on_preempt) {
+ return with_singleton(
+ [item, prio](OSDSingletonState &singleton,
+ Context *wrapped_on_reserved, Context *wrapped_on_preempt) {
+ return singleton.remote_reserver.request_reservation(
+ item,
+ wrapped_on_reserved,
+ prio,
+ wrapped_on_preempt);
+ },
+ invoke_context_on_core(seastar::this_shard_id(), on_reserved),
+ invoke_context_on_core(seastar::this_shard_id(), on_preempt));
+ }
#undef FORWARD_CONST
#undef FORWARD