beacon_timer{[this] { (void)send_beacon(); }},
cluster_msgr{cluster_msgr},
public_msgr{public_msgr},
+ hb_front_msgr{hb_front_msgr},
+ hb_back_msgr{hb_back_msgr},
monc{new crimson::mon::Client{*public_msgr, *this}},
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{store},
- pg_shard_manager{
- whoami, *cluster_msgr,
- *public_msgr, *monc, *mgrc, store},
- shard_services{pg_shard_manager.get_shard_services()},
- heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background -- continuation rearms timer when complete
tick_timer{[this] {
std::ignore = update_heartbeat_peers(
startup_time = ceph::mono_clock::now();
- return store.start().then([this] {
+ return pg_shard_manager.start(
+ whoami, *cluster_msgr,
+ *public_msgr, *monc, *mgrc, store
+ ).then([this] {
+ heartbeat.reset(new Heartbeat{
+ whoami, get_shard_services(),
+ *monc, hb_front_msgr, hb_back_msgr});
+ return store.start();
+ }).then([this] {
return store.mount().handle_error(
crimson::stateful_ec::handle([] (const auto& ec) {
logger().error("error mounting object store in {}: ({}) {}",
}
// get all the latest maps
if (osdmap->get_epoch() + 1 >= oldest) {
- return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ return get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false);
} else {
- return shard_services.osdmap_subscribe(oldest - 1, true);
+ return get_shard_services().osdmap_subscribe(oldest - 1, true);
}
}
return monc->stop();
}).then([this] {
return mgrc->stop();
+ }).then([this] {
+ return pg_shard_manager.stop();
}).then([fut=std::move(gate_close_fut)]() mutable {
return std::move(fut);
}).then([this] {
case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
- shard_services.start_operation<CompoundPeeringRequest>(
+ get_shard_services().start_operation<CompoundPeeringRequest>(
pg_shard_manager,
conn,
m);
logger().info("handle_osd_map message skips epochs {}..{}",
start, first - 1);
if (m->oldest_map <= start) {
- return shard_services.osdmap_subscribe(start, false);
+ return get_shard_services().osdmap_subscribe(start, false);
}
// always try to get the full range of maps--as many as we can. this
// 1- is good to have
// 2- is at present the only way to ensure that we get a *full* map as
// the first map!
if (m->oldest_map < first) {
- return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
+ return get_shard_services().osdmap_subscribe(m->oldest_map - 1, true);
}
skip_maps = true;
start = first;
namespace crimson::osd {
-PGShardManager::PGShardManager(
+seastar::future<> PGShardManager::start(
const int whoami,
crimson::net::Messenger &cluster_msgr,
crimson::net::Messenger &public_msgr,
crimson::mon::Client &monc,
crimson::mgr::Client &mgrc,
crimson::os::FuturizedStore &store)
- : osd_singleton_state(whoami, cluster_msgr, public_msgr,
- monc, mgrc),
- local_state(whoami, store),
- shard_services(osd_singleton_state, local_state)
-{}
+{
+ osd_singleton_state.reset(
+ new OSDSingletonState(whoami, cluster_msgr, public_msgr,
+ monc, mgrc));
+ shard_services.reset(
+ new ShardServices(*osd_singleton_state, whoami, store));
+ return seastar::now();
+}
+
+seastar::future<> PGShardManager::stop()
+{
+ shard_services.reset();
+ osd_singleton_state.reset();
+ return seastar::now();
+}
seastar::future<> PGShardManager::load_pgs()
{
- return local_state.store.list_collections(
+ return get_local_state().store.list_collections(
).then([this](auto colls) {
return seastar::parallel_for_each(
colls,
[this](auto coll) {
spg_t pgid;
if (coll.is_pg(&pgid)) {
- auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ auto core = get_osd_singleton_state(
+ ).pg_to_shard_mapping.maybe_create_pg(
pgid);
return with_remote_shard_state(
core,
seastar::future<> PGShardManager::stop_pgs()
{
- return local_state.stop_pgs();
+ return get_local_state().stop_pgs();
}
seastar::future<std::map<pg_t, pg_stat_t>>
PGShardManager::get_pg_stats() const
{
return seastar::make_ready_future<std::map<pg_t, pg_stat_t>>(
- local_state.get_pg_stats());
+ get_local_state().get_pg_stats());
}
seastar::future<> PGShardManager::broadcast_map_to_pgs(epoch_t epoch)
{
- return local_state.broadcast_map_to_pgs(
- shard_services, epoch
+ return get_local_state().broadcast_map_to_pgs(
+ get_shard_services(), epoch
).then([this, epoch] {
- osd_singleton_state.osdmap_gate.got_map(epoch);
+ get_osd_singleton_state().osdmap_gate.got_map(epoch);
return seastar::now();
});
}
* etc)
*/
class PGShardManager {
- OSDSingletonState osd_singleton_state;
- PerShardState local_state;
- ShardServices shard_services;
+ std::unique_ptr<OSDSingletonState> osd_singleton_state;
+ std::unique_ptr<ShardServices> shard_services;
+
+#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
+ template <typename... Args> \
+ auto FROM_METHOD(Args&&... args) const { \
+ return TARGET.TO_METHOD(std::forward<Args>(args)...); \
+ }
+
+#define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \
+ template <typename... Args> \
+ auto FROM_METHOD(Args&&... args) { \
+ return TARGET.TO_METHOD(std::forward<Args>(args)...); \
+ }
+
+#define FORWARD_TO_OSD_SINGLETON(METHOD) \
+ FORWARD(METHOD, METHOD, get_osd_singleton_state())
public:
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
- PGShardManager(
+ PGShardManager() = default;
+
+ seastar::future<> start(
const int whoami,
crimson::net::Messenger &cluster_msgr,
crimson::net::Messenger &public_msgr,
crimson::mon::Client &monc,
crimson::mgr::Client &mgrc,
crimson::os::FuturizedStore &store);
+ seastar::future<> stop();
- auto &get_shard_services() { return shard_services; }
+ auto &get_osd_singleton_state() { return *osd_singleton_state; }
+ auto &get_osd_singleton_state() const { return *osd_singleton_state; }
+ auto &get_shard_services() { return *shard_services; }
+ auto &get_shard_services() const { return *shard_services; }
+ auto &get_local_state() { return shard_services->local_state; }
+ auto &get_local_state() const { return shard_services->local_state; }
seastar::future<> update_map(local_cached_map_t &&map) {
auto fmap = make_local_shared_foreign(std::move(map));
- osd_singleton_state.update_map(fmap);
- local_state.update_map(std::move(fmap));
+ get_osd_singleton_state().update_map(fmap);
+ get_local_state().update_map(std::move(fmap));
return seastar::now();
}
auto stop_registries() {
- return local_state.stop_registry();
+ return get_local_state().stop_registry();
}
FORWARD_TO_OSD_SINGLETON(send_pg_created)
// osd state forwards
- FORWARD(is_active, is_active, osd_singleton_state.osd_state)
- FORWARD(is_preboot, is_preboot, osd_singleton_state.osd_state)
- FORWARD(is_booting, is_booting, osd_singleton_state.osd_state)
- FORWARD(is_stopping, is_stopping, osd_singleton_state.osd_state)
- FORWARD(is_prestop, is_prestop, osd_singleton_state.osd_state)
- FORWARD(is_initializing, is_initializing, osd_singleton_state.osd_state)
- FORWARD(set_prestop, set_prestop, osd_singleton_state.osd_state)
- FORWARD(set_preboot, set_preboot, osd_singleton_state.osd_state)
- FORWARD(set_booting, set_booting, osd_singleton_state.osd_state)
- FORWARD(set_stopping, set_stopping, osd_singleton_state.osd_state)
- FORWARD(set_active, set_active, osd_singleton_state.osd_state)
- FORWARD(when_active, when_active, osd_singleton_state.osd_state)
- FORWARD_CONST(get_osd_state_string, to_string, osd_singleton_state.osd_state)
-
- FORWARD(got_map, got_map, osd_singleton_state.osdmap_gate)
- FORWARD(wait_for_map, wait_for_map, osd_singleton_state.osdmap_gate)
+ FORWARD(is_active, is_active, get_osd_singleton_state().osd_state)
+ FORWARD(is_preboot, is_preboot, get_osd_singleton_state().osd_state)
+ FORWARD(is_booting, is_booting, get_osd_singleton_state().osd_state)
+ FORWARD(is_stopping, is_stopping, get_osd_singleton_state().osd_state)
+ FORWARD(is_prestop, is_prestop, get_osd_singleton_state().osd_state)
+ FORWARD(is_initializing, is_initializing, get_osd_singleton_state().osd_state)
+ FORWARD(set_prestop, set_prestop, get_osd_singleton_state().osd_state)
+ FORWARD(set_preboot, set_preboot, get_osd_singleton_state().osd_state)
+ FORWARD(set_booting, set_booting, get_osd_singleton_state().osd_state)
+ FORWARD(set_stopping, set_stopping, get_osd_singleton_state().osd_state)
+ FORWARD(set_active, set_active, get_osd_singleton_state().osd_state)
+ FORWARD(when_active, when_active, get_osd_singleton_state().osd_state)
+ FORWARD_CONST(get_osd_state_string, to_string, get_osd_singleton_state().osd_state)
+
+ FORWARD(got_map, got_map, get_osd_singleton_state().osdmap_gate)
+ FORWARD(wait_for_map, wait_for_map, get_osd_singleton_state().osdmap_gate)
// Metacoll
FORWARD_TO_OSD_SINGLETON(init_meta_coll)
template <typename F>
auto with_remote_shard_state(core_id_t core, F &&f) {
ceph_assert(core == 0);
- auto &local_state_ref = local_state;
- auto &shard_services_ref = shard_services;
+ auto &local_state_ref = get_local_state();
+ auto &shard_services_ref = get_shard_services();
return seastar::smp::submit_to(
core,
[f=std::forward<F>(f), &local_state_ref, &shard_services_ref]() mutable {
static_assert(T::can_create());
logger.debug("{}: can_create", *op);
- auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
op->get_pgid());
- local_state.registry.remove_from_registry(*op);
+ get_local_state().registry.remove_from_registry(*op);
return with_remote_shard_state(
core,
[op=std::move(op)](
static_assert(!T::can_create());
logger.debug("{}: !can_create", *op);
- auto core = osd_singleton_state.pg_to_shard_mapping.maybe_create_pg(
+ auto core = get_osd_singleton_state().pg_to_shard_mapping.maybe_create_pg(
op->get_pgid());
- local_state.registry.remove_from_registry(*op);
+ get_local_state().registry.remove_from_registry(*op);
return with_remote_shard_state(
core,
[op=std::move(op)](
*/
template <typename F>
seastar::future<> for_each_pg(F &&f) const {
- for (auto &&pg: local_state.pg_map.get_pgs()) {
+ for (auto &&pg: get_local_state().pg_map.get_pgs()) {
std::apply(f, pg);
}
return seastar::now();
}
auto get_num_pgs() const {
- return osd_singleton_state.pg_to_shard_mapping.get_num_pgs();
+ return get_osd_singleton_state().pg_to_shard_mapping.get_num_pgs();
}
seastar::future<> broadcast_map_to_pgs(epoch_t epoch);
template <typename F>
auto with_pg(spg_t pgid, F &&f) {
- core_id_t core = osd_singleton_state.pg_to_shard_mapping.get_pg_mapping(
- pgid);
+ core_id_t core = get_osd_singleton_state(
+ ).pg_to_shard_mapping.get_pg_mapping(pgid);
return with_remote_shard_state(
core,
[pgid, f=std::move(f)](auto &local_state, auto &local_service) mutable {
template <typename T, typename... Args>
auto start_pg_operation(Args&&... args) {
- auto op = local_state.registry.create_operation<T>(
+ auto op = get_local_state().registry.create_operation<T>(
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
logger.debug("{}: starting {}", *op, __func__);
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
logger.debug("{}: start_pg_operation in await_active stage", opref);
- return osd_singleton_state.osd_state.when_active();
+ return get_osd_singleton_state().osd_state.when_active();
}).then([&logger, &opref] {
logger.debug("{}: start_pg_operation active, entering await_map", opref);
return opref.template enter_stage<>(
return opref.template with_blocking_event<OSDMapBlockingEvent>(
[this, &opref](auto &&trigger) {
std::ignore = this;
- return osd_singleton_state.osdmap_gate.wait_for_map(
+ return get_osd_singleton_state().osdmap_gate.wait_for_map(
std::move(trigger),
opref.get_epoch(),
- &shard_services);
+ &get_shard_services());
});
}).then([&logger, &opref](auto epoch) {
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
});
return std::make_pair(id, std::move(fut));
}
+
+#undef FORWARD
+#undef FORWARD_CONST
+#undef FORWARD_TO_OSD_SINGLETON
};
}
epoch_t start, Ref<MOSDMap> m);
};
+/**
+ * Represents services available to each PG
+ */
+class ShardServices : public OSDMapService {
+ friend class PGShardManager;
+ using cached_map_t = OSDMapService::cached_map_t;
+ using local_cached_map_t = OSDMapService::local_cached_map_t;
+
+ PerShardState local_state;
+ OSDSingletonState &osd_singleton_state;
+
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
template <typename... Args> \
auto FROM_METHOD(Args&&... args) const { \
#define FORWARD_TO_OSD_SINGLETON(METHOD) \
FORWARD(METHOD, METHOD, osd_singleton_state)
-/**
- * Represents services available to each PG
- */
-class ShardServices : public OSDMapService {
- using cached_map_t = OSDMapService::cached_map_t;
- using local_cached_map_t = OSDMapService::local_cached_map_t;
-
- OSDSingletonState &osd_singleton_state;
- PerShardState &local_state;
-
template <typename F, typename... Args>
auto with_singleton(F &&f, Args&&... args) {
return std::invoke(f, osd_singleton_state, std::forward<Args>(args)...);
}
public:
+ template <typename... PSSArgs>
ShardServices(
OSDSingletonState &osd_singleton_state,
- PerShardState &local_state)
- : osd_singleton_state(osd_singleton_state), local_state(local_state) {}
+ PSSArgs&&... args)
+ : local_state(std::forward<PSSArgs>(args)...),
+ osd_singleton_state(osd_singleton_state) {}
FORWARD_TO_OSD_SINGLETON(send_to_osd)
FORWARD(
remote_cancel_reservation, cancel_reservation,
osd_singleton_state.remote_reserver)
+
+#undef FORWARD_CONST
+#undef FORWARD
+#undef FORWARD_TO_OSD_SINGLETON
+#undef FORWARD_TO_LOCAL
};
}