namespace crimson::os {
class FuturizedCollection;
-constexpr core_id_t PRIMARY_CORE = 0;
-
class FuturizedStore {
public:
class Shard {
return osd_singleton_state.start_single(
whoami, std::ref(*cluster_msgr), std::ref(*public_msgr),
std::ref(*monc), std::ref(*mgrc));
+ }).then([this] {
+ return osd_states.start();
}).then([this] {
ceph::mono_time startup_time = ceph::mono_clock::now();
return shard_services.start(
startup_time,
osd_singleton_state.local().perf,
osd_singleton_state.local().recoverystate_perf,
- std::ref(store));
+ std::ref(store),
+ std::ref(osd_states));
}).then([this] {
return shard_dispatchers.start(
std::ref(*this),
osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
return get_pg_shard_manager().update_map(std::move(map));
}).then([this] {
- get_pg_shard_manager().got_map(osdmap->get_epoch());
+ return shard_services.invoke_on_all([this](auto &local_service) {
+ local_service.local_state.osdmap_gate.got_map(osdmap->get_epoch());
+ });
+ }).then([this] {
bind_epoch = osdmap->get_epoch();
return get_pg_shard_manager().load_pgs(store);
}).then([this] {
-
uint64_t osd_required =
CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |
tick_timer.cancel();
// see also OSD::shutdown()
return prepare_to_stop().then([this] {
- get_pg_shard_manager().set_stopping();
+ return get_pg_shard_manager().set_stopping();
+ }).then([this] {
logger().debug("prepared to stop");
public_msgr->stop();
cluster_msgr->stop();
return shard_dispatchers.stop();
}).then([this] {
return shard_services.stop();
+ }).then([this] {
+ return osd_states.stop();
}).then([this] {
return osd_singleton_state.stop();
}).then([this] {
}
});
}).then([m, this] {
+ auto fut = seastar::now();
if (osd.osdmap->is_up(whoami)) {
const auto up_from = osd.osdmap->get_up_from(whoami);
logger().info("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
osd.osdmap->get_addrs(whoami) == osd.public_msgr->get_myaddrs() &&
pg_shard_manager.is_booting()) {
logger().info("osd.{}: activating...", whoami);
- pg_shard_manager.set_active();
- osd.beacon_timer.arm_periodic(
- std::chrono::seconds(local_conf()->osd_beacon_report_interval));
- // timer continuation rearms when complete
- osd.tick_timer.arm(
- std::chrono::seconds(TICK_INTERVAL));
+ fut = pg_shard_manager.set_active().then([this] {
+ osd.beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+ // timer continuation rearms when complete
+ osd.tick_timer.arm(
+ std::chrono::seconds(TICK_INTERVAL));
+ });
}
} else {
if (pg_shard_manager.is_prestop()) {
return seastar::now();
}
}
- return check_osdmap_features().then([this] {
- // yay!
- logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
- " to {} epoch to pgs", whoami, osd.osdmap->get_epoch());
- return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
+ return fut.then([this] {
+ return check_osdmap_features().then([this] {
+ // yay!
+ logger().info("osd.{}: committed_osd_maps: broadcasting osdmaps up"
+ " to {} epoch to pgs", whoami, osd.osdmap->get_epoch());
+ return pg_shard_manager.broadcast_map_to_pgs(osd.osdmap->get_epoch());
+ });
});
}).then([m, this] {
if (pg_shard_manager.is_active()) {
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/pg_map.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/state.h"
#include "messages/MOSDOp.h"
#include "osd/PeeringState.h"
seastar::sharded<PGShardMapping> pg_to_shard_mappings;
seastar::sharded<OSDSingletonState> osd_singleton_state;
+ seastar::sharded<OSDState> osd_states;
seastar::sharded<ShardServices> shard_services;
seastar::sharded<ShardDispatcher> shard_dispatchers;
logger().debug("PGShardManager::broadcast_map_to_pgs "
"broadcasted up to {}",
epoch);
- get_osd_singleton_state().osdmap_gate.got_map(epoch);
- return seastar::now();
+ return shard_services.invoke_on_all([epoch](auto &local_service) {
+ local_service.local_state.osdmap_gate.got_map(epoch);
+ return seastar::now();
+ });
});
}
FORWARD_TO_OSD_SINGLETON(send_pg_created)
// osd state forwards
- FORWARD(is_active, is_active, get_osd_singleton_state().osd_state)
- FORWARD(is_preboot, is_preboot, get_osd_singleton_state().osd_state)
- FORWARD(is_booting, is_booting, get_osd_singleton_state().osd_state)
- FORWARD(is_stopping, is_stopping, get_osd_singleton_state().osd_state)
- FORWARD(is_prestop, is_prestop, get_osd_singleton_state().osd_state)
- FORWARD(is_initializing, is_initializing, get_osd_singleton_state().osd_state)
- FORWARD(set_prestop, set_prestop, get_osd_singleton_state().osd_state)
- FORWARD(set_preboot, set_preboot, get_osd_singleton_state().osd_state)
- FORWARD(set_booting, set_booting, get_osd_singleton_state().osd_state)
- FORWARD(set_stopping, set_stopping, get_osd_singleton_state().osd_state)
- FORWARD(set_active, set_active, get_osd_singleton_state().osd_state)
- FORWARD(when_active, when_active, get_osd_singleton_state().osd_state)
- FORWARD_CONST(get_osd_state_string, to_string, get_osd_singleton_state().osd_state)
-
- FORWARD(got_map, got_map, get_osd_singleton_state().osdmap_gate)
- FORWARD(wait_for_map, wait_for_map, get_osd_singleton_state().osdmap_gate)
+ FORWARD(is_active, is_active, get_shard_services().local_state.osd_state)
+ FORWARD(is_preboot, is_preboot, get_shard_services().local_state.osd_state)
+ FORWARD(is_booting, is_booting, get_shard_services().local_state.osd_state)
+ FORWARD(is_stopping, is_stopping, get_shard_services().local_state.osd_state)
+ FORWARD(is_prestop, is_prestop, get_shard_services().local_state.osd_state)
+ FORWARD(is_initializing, is_initializing, get_shard_services().local_state.osd_state)
+ FORWARD(set_prestop, set_prestop, get_shard_services().local_state.osd_state)
+ FORWARD(set_preboot, set_preboot, get_shard_services().local_state.osd_state)
+ FORWARD(set_booting, set_booting, get_shard_services().local_state.osd_state)
+ FORWARD(set_stopping, set_stopping, get_shard_services().local_state.osd_state)
+ FORWARD(set_active, set_active, get_shard_services().local_state.osd_state)
+ FORWARD(when_active, when_active, get_shard_services().local_state.osd_state)
+ FORWARD_CONST(get_osd_state_string, to_string, get_shard_services().local_state.osd_state)
+
+ FORWARD(got_map, got_map, get_shard_services().local_state.osdmap_gate)
+ FORWARD(wait_for_map, wait_for_map, get_shard_services().local_state.osdmap_gate)
// Metacoll
FORWARD_TO_OSD_SINGLETON(init_meta_coll)
opref.get_connection_pipeline().await_active
).then([this, &opref, &logger] {
logger.debug("{}: start_pg_operation in await_active stage", opref);
- return osd_singleton_state.invoke_on(PRIMARY_CORE, []
- (auto& primary_osd_singleton_state) {
- return primary_osd_singleton_state.osd_state.when_active();
- });
+ return get_shard_services().local_state.osd_state.when_active();
}).then([&logger, &opref] {
logger.debug("{}: start_pg_operation active, entering await_map", opref);
return opref.template enter_stage<>(
return opref.template with_blocking_event<OSDMapBlockingEvent>(
[this, &opref](auto &&trigger) {
std::ignore = this;
- return osd_singleton_state.invoke_on(PRIMARY_CORE,
- [trigger = std::move(trigger), epoch = opref.get_epoch(),
- &shard_services = shard_services]
- (auto& primary_osd_singleton_state) mutable {
- return primary_osd_singleton_state.osdmap_gate.wait_for_map(
+ return get_shard_services().local_state.osdmap_gate.wait_for_map(
std::move(trigger),
- epoch,
- &shard_services.local());
- });
+ opref.get_epoch(),
+ &get_shard_services());
});
}).then([&logger, &opref](auto epoch) {
logger.debug("{}: got map {}, entering get_pg", opref, epoch);
ceph::mono_time startup_time,
PerfCounters *perf,
PerfCounters *recoverystate_perf,
- crimson::os::FuturizedStore &store)
+ crimson::os::FuturizedStore &store,
+ OSDState &osd_state)
: whoami(whoami),
store(store.get_sharded_store()),
+ osd_state(osd_state),
+ osdmap_gate("PerShardState::osdmap_gate"),
perf(perf), recoverystate_perf(recoverystate_perf),
throttler(crimson::common::local_conf()),
next_tid(
crimson::mon::Client &monc,
crimson::mgr::Client &mgrc)
: whoami(whoami),
- osdmap_gate("OSDSingletonState::osdmap_gate"),
cluster_msgr(cluster_msgr),
public_msgr(public_msgr),
monc(monc),
namespace crimson::osd {
-// seastar::sharded puts start_single on core 0
-constexpr core_id_t PRIMARY_CORE = 0;
-
class PGShardManager;
/**
class PerShardState {
friend class ShardServices;
friend class PGShardManager;
+ friend class OSD;
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
crimson::os::FuturizedStore::Shard &store;
crimson::common::CephContext cct;
+ OSDState &osd_state;
+ OSD_OSDMapGate osdmap_gate;
+
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
ceph::mono_time startup_time,
PerfCounters *perf,
PerfCounters *recoverystate_perf,
- crimson::os::FuturizedStore &store);
+ crimson::os::FuturizedStore &store,
+ OSDState& osd_state);
};
/**
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
- OSDState osd_state;
-
SharedLRU<epoch_t, OSDMap> osdmaps;
SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
void update_map(cached_map_t new_osdmap) {
osdmap = std::move(new_osdmap);
}
- OSD_OSDMapGate osdmap_gate;
crimson::net::Messenger &cluster_msgr;
crimson::net::Messenger &public_msgr;
*/
class ShardServices : public OSDMapService {
friend class PGShardManager;
+ friend class OSD;
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
class OSDMap;
-class OSDState {
+namespace crimson::osd {
+
+// seastar::sharded puts start_single on core 0
+constexpr core_id_t PRIMARY_CORE = 0;
+
+/**
+ * OSDState
+ *
+ * Maintains state representing the OSD's progress from booting through
+ * shutdown.
+ *
+ * Shards other than PRIMARY_CORE may use their local instance to check
+ * on ACTIVE and STOPPING. All other methods are restricted to
+ * PRIMARY_CORE (such methods start with an assert to this effect).
+ */
+class OSDState : public seastar::peering_sharded_service<OSDState> {
enum class State {
INITIALIZING,
State state = State::INITIALIZING;
mutable seastar::shared_promise<> wait_for_active;
+ /// Sets local instance state to active, called from set_active
+ void _set_active() {
+ state = State::ACTIVE;
+ wait_for_active.set_value();
+ wait_for_active = {};
+ }
+ /// Sets local instance state to stopping, called from set_stopping
+ void _set_stopping() {
+ state = State::STOPPING;
+ wait_for_active.set_exception(crimson::common::system_shutdown_exception{});
+ wait_for_active = {};
+ }
public:
bool is_initializing() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return state == State::INITIALIZING;
}
bool is_preboot() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return state == State::PREBOOT;
}
bool is_booting() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return state == State::BOOTING;
}
bool is_active() const {
: wait_for_active.get_shared_future();
};
bool is_prestop() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return state == State::PRESTOP;
}
bool is_stopping() const {
return state == State::STOPPING;
}
bool is_waiting_for_healthy() const {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
return state == State::WAITING_FOR_HEALTHY;
}
void set_preboot() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
state = State::PREBOOT;
}
void set_booting() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
state = State::BOOTING;
}
- void set_active() {
- state = State::ACTIVE;
- wait_for_active.set_value();
- wait_for_active = {};
+ /// Sets all shards to active
+ seastar::future<> set_active() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return container().invoke_on_all([](auto& osd_state) {
+ osd_state._set_active();
+ });
}
void set_prestop() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
state = State::PRESTOP;
}
- void set_stopping() {
- state = State::STOPPING;
- wait_for_active.set_exception(crimson::common::system_shutdown_exception{});
- wait_for_active = {};
+ /// Sets all shards to stopping
+ seastar::future<> set_stopping() {
+ ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+ return container().invoke_on_all([](auto& osd_state) {
+ osd_state._set_stopping();
+ });
}
std::string_view to_string() const {
switch (state) {
operator<<(std::ostream& os, const OSDState& s) {
return os << s.to_string();
}
+}