pg_meta.cc
replicated_backend.cc
shard_services.cc
+ pg_shard_manager.cc
object_context.cc
ops_executer.cc
osd_operation.cc
monc{new crimson::mon::Client{*public_msgr, *this}},
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{store},
- shard_services{*this, whoami, *cluster_msgr, *public_msgr, *monc, *mgrc, store},
+ pg_shard_manager{
+ static_cast<OSDMapService&>(*this), whoami, *cluster_msgr,
+ *public_msgr, *monc, *mgrc, store},
+ shard_services{pg_shard_manager.get_shard_services()},
heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
// do this in background
tick_timer{[this] {
superblock = std::move(sb);
return get_map(superblock.current_epoch);
}).then([this](cached_map_t&& map) {
- shard_services.update_map(map);
osdmap_gate.got_map(map->get_epoch());
+ pg_shard_manager.update_map(map);
osdmap = std::move(map);
bind_epoch = osdmap->get_epoch();
return load_pgs();
asok->register_command(make_asok_hook<pg::QueryCommand>(*this));
asok->register_command(make_asok_hook<pg::MarkUnfoundLostCommand>(*this));
// ops commands
- asok->register_command(make_asok_hook<DumpInFlightOpsHook>(
- std::as_const(get_shard_services().registry)));
- asok->register_command(make_asok_hook<DumpHistoricOpsHook>(
- std::as_const(get_shard_services().registry)));
- asok->register_command(make_asok_hook<DumpSlowestHistoricOpsHook>(
- std::as_const(get_shard_services().registry)));
+ asok->register_command(
+ make_asok_hook<DumpInFlightOpsHook>(
+ std::as_const(get_shard_services().get_registry())));
+ asok->register_command(
+ make_asok_hook<DumpHistoricOpsHook>(
+ std::as_const(get_shard_services().get_registry())));
+ asok->register_command(
+ make_asok_hook<DumpSlowestHistoricOpsHook>(
+ std::as_const(get_shard_services().get_registry())));
});
}
return asok->stop().then([this] {
return heartbeat->stop();
}).then([this] {
- return shard_services.stop();
+ return pg_shard_manager.stop_registries();
}).then([this] {
return store.umount();
}).then([this] {
[this](epoch_t cur) {
return get_map(cur).then([this](cached_map_t&& o) {
osdmap = std::move(o);
- shard_services.update_map(osdmap);
+ pg_shard_manager.update_map(osdmap);
if (up_epoch == 0 &&
osdmap->is_up(whoami) &&
osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
#include "crimson/mgr/client.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/osd/osdmap_service.h"
-#include "crimson/osd/state.h"
-#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_shard_manager.h"
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/pg_map.h"
#include "crimson/osd/osd_operations/peering_event.h"
void handle_authentication(const EntityName& name,
const AuthCapsInfo& caps) final;
- crimson::osd::ShardServices shard_services;
+ crimson::osd::PGShardManager pg_shard_manager;
+ crimson::osd::ShardServices &shard_services;
std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> tick_timer;
OSD_OSDMapGate osdmap_gate;
ShardServices &get_shard_services() {
- return shard_services;
+ return pg_shard_manager.get_shard_services();
}
seastar::future<> consume_map(epoch_t epoch);
template <typename T, typename... Args>
auto start_pg_operation(Args&&... args) {
- auto op = shard_services.registry.create_operation<T>(
+ auto op = shard_services.get_registry().create_operation<T>(
std::forward<Args>(args)...);
auto &logger = crimson::get_logger(ceph_subsys_osd);
logger.debug("{}: starting {}", *op, __func__);
void handle(ClientRequest::CompletionEvent&, const Operation& op) override {
if (crimson::common::local_conf()->osd_op_history_size) {
- const auto& client_op = to_client_request(op);
- client_op.osd.get_shard_services().registry.put_historic(client_op);
+ to_client_request(op).put_historic();
}
}
};
return maybe_delay.then([ref, this] {
return this->template with_blocking_event<OperationThrottler::BlockingEvent>(
[ref, this] (auto&& trigger) {
- return ss.throttler.with_throttle_while(
+ return ss.with_throttle_while(
std::move(trigger),
this, get_scheduler_params(), [this] {
return T::interruptor::with_interruption([this] {
return true;
}
+void ClientRequest::put_historic() const
+{
+ osd.get_shard_services().get_registry().put_historic(*this);
+}
+
}
auto get_completed() const {
return get_event<CompletionEvent>().get_timestamp();
};
+
+ void put_historic() const;
};
}
PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
{
auto [obc, existed] =
- shard_services.obc_registry.get_cached_obc(std::move(oid));
+ shard_services.get_cached_obc(std::move(oid));
return with_head_obc<State>(std::move(obc), existed, std::move(func));
}
logger().error("with_clone_obc: {} clone not found", coid);
return load_obc_ertr::make_ready_future<>();
}
- auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+ auto [clone, existed] = shard_services.get_cached_obc(*coid);
return clone->template with_lock<State>(
[coid=*coid, existed=existed,
head=std::move(head), clone=std::move(clone),
unsigned priority,
PGPeeringEventURef on_grant,
PGPeeringEventURef on_preempt) final {
- shard_services.local_reserver.request_reservation(
+ shard_services.local_request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
void update_local_background_io_priority(
unsigned priority) final {
- shard_services.local_reserver.update_priority(
+ shard_services.local_update_priority(
pgid,
priority);
}
void cancel_local_background_io_reservation() final {
- shard_services.local_reserver.cancel_reservation(
+ shard_services.local_cancel_reservation(
pgid);
}
unsigned priority,
PGPeeringEventURef on_grant,
PGPeeringEventURef on_preempt) final {
- shard_services.remote_reserver.request_reservation(
+ shard_services.remote_request_reservation(
pgid,
on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
start_peering_event_operation(std::move(*on_grant));
}
void cancel_remote_recovery_reservation() final {
- shard_services.remote_reserver.cancel_reservation(
+ shard_services.remote_cancel_reservation(
pgid);
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/pg_shard_manager.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+PGShardManager::PGShardManager(
+ OSDMapService &osdmap_service,
+ const int whoami,
+ crimson::net::Messenger &cluster_msgr,
+ crimson::net::Messenger &public_msgr,
+ crimson::mon::Client &monc,
+ crimson::mgr::Client &mgrc,
+ crimson::os::FuturizedStore &store)
+ : core_state(whoami, osdmap_service, cluster_msgr, public_msgr,
+ monc, mgrc, store),
+ local_state(whoami),
+ shard_services(core_state, local_state)
+{}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
+
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_map.h"
+
+namespace crimson::osd {
+
+/**
+ * PGShardManager
+ *
+ * Manages all state required to partition PGs over seastar reactors
+ * as well as state required to route messages to pgs. Mediates access to
+ * shared resources required by PGs (objectstore, messenger, monclient,
+ * etc)
+ */
+class PGShardManager {
+ CoreState core_state;
+ PerShardState local_state;
+ ShardServices shard_services;
+
+public:
+ PGShardManager(
+ OSDMapService &osdmap_service,
+ const int whoami,
+ crimson::net::Messenger &cluster_msgr,
+ crimson::net::Messenger &public_msgr,
+ crimson::mon::Client &monc,
+ crimson::mgr::Client &mgrc,
+ crimson::os::FuturizedStore &store);
+
+ auto &get_shard_services() { return shard_services; }
+
+ void update_map(OSDMapService::cached_map_t map) {
+ core_state.update_map(map);
+ local_state.update_map(map);
+ }
+
+ auto stop_registries() {
+ return local_state.stop_registry();
+ }
+
+ FORWARD_TO_CORE(send_pg_created)
+};
+
+}
-> interruptible_future<> {
crimson::osd::ObjectContextRef obc;
if (pg.is_primary()) {
- obc = shard_services.obc_registry.maybe_get_cached_obc(object);
+ obc = shard_services.maybe_get_cached_obc(object);
}
if (obc) {
if (obc->obs.exists) {
namespace crimson::osd {
-ShardServices::ShardServices(
- OSDMapService &osdmap_service,
- const int whoami,
- crimson::net::Messenger &cluster_msgr,
- crimson::net::Messenger &public_msgr,
- crimson::mon::Client &monc,
- crimson::mgr::Client &mgrc,
- crimson::os::FuturizedStore &store)
- : osdmap_service(osdmap_service),
- whoami(whoami),
- cluster_msgr(cluster_msgr),
- public_msgr(public_msgr),
- monc(monc),
- mgrc(mgrc),
- store(store),
- throttler(crimson::common::local_conf()),
- obc_registry(crimson::common::local_conf()),
- local_reserver(
- &cct,
- &finisher,
- crimson::common::local_conf()->osd_max_backfills,
- crimson::common::local_conf()->osd_min_recovery_priority),
- remote_reserver(
- &cct,
- &finisher,
- crimson::common::local_conf()->osd_max_backfills,
- crimson::common::local_conf()->osd_min_recovery_priority)
+PerShardState::PerShardState(
+ int whoami)
+ : whoami(whoami),
+ throttler(crimson::common::local_conf()),
+ obc_registry(crimson::common::local_conf())
{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
recoverystate_perf = build_recoverystate_perf(&cct);
cct.get_perfcounters_collection()->add(recoverystate_perf);
-
- crimson::common::local_conf().add_observer(this);
-}
-
-const char** ShardServices::get_tracked_conf_keys() const
-{
- static const char* KEYS[] = {
- "osd_max_backfills",
- "osd_min_recovery_priority",
- nullptr
- };
- return KEYS;
}
-void ShardServices::handle_conf_change(const ConfigProxy& conf,
- const std::set <std::string> &changed)
+CoreState::CoreState(
+ int whoami,
+ OSDMapService &osdmap_service,
+ crimson::net::Messenger &cluster_msgr,
+ crimson::net::Messenger &public_msgr,
+ crimson::mon::Client &monc,
+ crimson::mgr::Client &mgrc,
+ crimson::os::FuturizedStore &store)
+ : whoami(whoami),
+ osdmap_service(osdmap_service),
+ cluster_msgr(cluster_msgr),
+ public_msgr(public_msgr),
+ monc(monc),
+ mgrc(mgrc),
+ store(store),
+ local_reserver(
+ &cct,
+ &finisher,
+ crimson::common::local_conf()->osd_max_backfills,
+ crimson::common::local_conf()->osd_min_recovery_priority),
+ remote_reserver(
+ &cct,
+ &finisher,
+ crimson::common::local_conf()->osd_max_backfills,
+ crimson::common::local_conf()->osd_min_recovery_priority)
{
- if (changed.count("osd_max_backfills")) {
- local_reserver.set_max(conf->osd_max_backfills);
- remote_reserver.set_max(conf->osd_max_backfills);
- }
- if (changed.count("osd_min_recovery_priority")) {
- local_reserver.set_min_priority(conf->osd_min_recovery_priority);
- remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
- }
+ crimson::common::local_conf().add_observer(this);
}
-seastar::future<> ShardServices::send_to_osd(
+seastar::future<> CoreState::send_to_osd(
int peer, MessageURef m, epoch_t from_epoch)
{
if (osdmap->is_down(peer)) {
}
}
-seastar::future<> ShardServices::dispatch_context_transaction(
- crimson::os::CollectionRef col, PeeringCtx &ctx) {
- if (ctx.transaction.empty()) {
- logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
- return seastar::now();
- }
-
- logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
- auto ret = store.do_transaction(
- col,
- std::move(ctx.transaction));
- ctx.reset_transaction();
- return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context_messages(
- BufferedRecoveryMessages &&ctx)
-{
- auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
- [this](auto& osd_messages) {
- auto& [peer, messages] = osd_messages;
- logger().debug("dispatch_context_messages sending messages to {}", peer);
- return seastar::parallel_for_each(
- std::move(messages), [=, peer=peer](auto& m) {
- return send_to_osd(peer, std::move(m), osdmap->get_epoch());
- });
- });
- ctx.message_map.clear();
- return ret;
-}
-
-seastar::future<> ShardServices::dispatch_context(
- crimson::os::CollectionRef col,
- PeeringCtx &&ctx)
+seastar::future<> CoreState::osdmap_subscribe(version_t epoch, bool force_request)
{
- ceph_assert(col || ctx.transaction.empty());
- return seastar::when_all_succeed(
- dispatch_context_messages(
- BufferedRecoveryMessages{ctx}),
- col ? dispatch_context_transaction(col, ctx) : seastar::now()
- ).then_unpack([] {
+ logger().info("{}({})", __func__, epoch);
+ if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+ force_request) {
+ return monc.renew_subs();
+ } else {
return seastar::now();
- });
+ }
}
-void ShardServices::queue_want_pg_temp(pg_t pgid,
- const vector<int>& want,
- bool forced)
+void CoreState::queue_want_pg_temp(pg_t pgid,
+ const vector<int>& want,
+ bool forced)
{
auto p = pg_temp_pending.find(pgid);
if (p == pg_temp_pending.end() ||
}
}
-void ShardServices::remove_want_pg_temp(pg_t pgid)
+void CoreState::remove_want_pg_temp(pg_t pgid)
{
pg_temp_wanted.erase(pgid);
pg_temp_pending.erase(pgid);
}
-void ShardServices::requeue_pg_temp()
+void CoreState::requeue_pg_temp()
{
unsigned old_wanted = pg_temp_wanted.size();
unsigned old_pending = pg_temp_pending.size();
pg_temp_wanted.size());
}
-std::ostream& operator<<(
- std::ostream& out,
- const ShardServices::pg_temp_t& pg_temp)
-{
- out << pg_temp.acting;
- if (pg_temp.forced) {
- out << " (forced)";
- }
- return out;
-}
-
-seastar::future<> ShardServices::send_pg_temp()
+seastar::future<> CoreState::send_pg_temp()
{
if (pg_temp_wanted.empty())
return seastar::now();
});
}
-void ShardServices::update_map(cached_map_t new_osdmap)
-{
- osdmap = std::move(new_osdmap);
-}
-
-ShardServices::cached_map_t &ShardServices::get_osdmap()
+std::ostream& operator<<(
+ std::ostream& out,
+ const CoreState::pg_temp_t& pg_temp)
{
- return osdmap;
+ out << pg_temp.acting;
+ if (pg_temp.forced) {
+ out << " (forced)";
+ }
+ return out;
}
-seastar::future<> ShardServices::send_pg_created(pg_t pgid)
+seastar::future<> CoreState::send_pg_created(pg_t pgid)
{
logger().debug(__func__);
auto o = get_osdmap();
return monc.send_message(crimson::make_message<MOSDPGCreated>(pgid));
}
-seastar::future<> ShardServices::send_pg_created()
+seastar::future<> CoreState::send_pg_created()
{
logger().debug(__func__);
auto o = get_osdmap();
});
}
-void ShardServices::prune_pg_created()
+void CoreState::prune_pg_created()
{
logger().debug(__func__);
auto o = get_osdmap();
}
}
-seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request)
-{
- logger().info("{}({})", __func__, epoch);
- if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
- force_request) {
- return monc.renew_subs();
- } else {
- return seastar::now();
- }
-}
-
-HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
+HeartbeatStampsRef CoreState::get_hb_stamps(int peer)
{
auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
if (added) {
return stamps->second;
}
-seastar::future<> ShardServices::send_alive(const epoch_t want)
+seastar::future<> CoreState::send_alive(const epoch_t want)
{
logger().info(
"{} want={} up_thru_wanted={}",
}
}
+const char** CoreState::get_tracked_conf_keys() const
+{
+ static const char* KEYS[] = {
+ "osd_max_backfills",
+ "osd_min_recovery_priority",
+ nullptr
+ };
+ return KEYS;
+}
+
+void CoreState::handle_conf_change(const ConfigProxy& conf,
+ const std::set <std::string> &changed)
+{
+ if (changed.count("osd_max_backfills")) {
+ local_reserver.set_max(conf->osd_max_backfills);
+ remote_reserver.set_max(conf->osd_max_backfills);
+ }
+ if (changed.count("osd_min_recovery_priority")) {
+ local_reserver.set_min_priority(conf->osd_min_recovery_priority);
+ remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
+ }
+}
+
+seastar::future<> ShardServices::dispatch_context_transaction(
+ crimson::os::CollectionRef col, PeeringCtx &ctx) {
+ if (ctx.transaction.empty()) {
+ logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
+ return seastar::now();
+ }
+
+ logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
+ auto ret = get_store().do_transaction(
+ col,
+ std::move(ctx.transaction));
+ ctx.reset_transaction();
+ return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context_messages(
+ BufferedRecoveryMessages &&ctx)
+{
+ auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
+ [this](auto& osd_messages) {
+ auto& [peer, messages] = osd_messages;
+ logger().debug("dispatch_context_messages sending messages to {}", peer);
+ return seastar::parallel_for_each(
+ std::move(messages), [=, peer=peer](auto& m) {
+ return send_to_osd(peer, std::move(m), local_state.osdmap->get_epoch());
+ });
+ });
+ ctx.message_map.clear();
+ return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context(
+ crimson::os::CollectionRef col,
+ PeeringCtx &&ctx)
+{
+ ceph_assert(col || ctx.transaction.empty());
+ return seastar::when_all_succeed(
+ dispatch_context_messages(
+ BufferedRecoveryMessages{ctx}),
+ col ? dispatch_context_transaction(col, ctx) : seastar::now()
+ ).then_unpack([] {
+ return seastar::now();
+ });
+}
+
+
};
#include "crimson/os/futurized_collection.h"
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/object_context.h"
+#include "crimson/osd/state.h"
#include "common/AsyncReserver.h"
namespace crimson::net {
namespace crimson::osd {
/**
- * Represents services available to each PG
+ * PerShardState
+ *
+ * Per-shard state holding instances local to each shard.
*/
-class ShardServices : public md_config_obs_t {
- using cached_map_t = boost::local_shared_ptr<const OSDMap>;
- OSDMapService &osdmap_service;
- const int whoami;
- crimson::net::Messenger &cluster_msgr;
- crimson::net::Messenger &public_msgr;
- crimson::mon::Client &monc;
- crimson::mgr::Client &mgrc;
- crimson::os::FuturizedStore &store;
+class PerShardState {
+ friend class ShardServices;
+ friend class PGShardManager;
+ const int whoami;
crimson::common::CephContext cct;
PerfCounters *perf = nullptr;
PerfCounters *recoverystate_perf = nullptr;
- const char** get_tracked_conf_keys() const final;
- void handle_conf_change(const ConfigProxy& conf,
- const std::set <std::string> &changed) final;
-
-public:
- ShardServices(
- OSDMapService &osdmap_service,
- const int whoami,
- crimson::net::Messenger &cluster_msgr,
- crimson::net::Messenger &public_msgr,
- crimson::mon::Client &monc,
- crimson::mgr::Client &mgrc,
- crimson::os::FuturizedStore &store);
-
- seastar::future<> send_to_osd(
- int peer,
- MessageURef m,
- epoch_t from_epoch);
+ // Op Management
+ OSDOperationRegistry registry;
+ OperationThrottler throttler;
- crimson::os::FuturizedStore &get_store() {
- return store;
+ OSDMapService::cached_map_t osdmap;
+ OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
+ void update_map(OSDMapService::cached_map_t new_osdmap) {
+ osdmap = std::move(new_osdmap);
}
- crimson::common::CephContext *get_cct() {
- return &cct;
- }
+ crimson::osd::ObjectContextRegistry obc_registry;
- // OSDMapService
- const OSDMapService &get_osdmap_service() const {
- return osdmap_service;
+ // prevent creating new osd operations when system is shutting down,
+ // this is necessary because there are chances that a new operation
+ // is created, after the interruption of all ongoing operations, and
+ // creats and waits on a new and may-never-resolve future, in which
+ // case the shutdown may never succeed.
+ bool stopping = false;
+ seastar::future<> stop_registry() {
+ crimson::get_logger(ceph_subsys_osd).info("PerShardState::{}", __func__);
+ stopping = true;
+ return registry.stop();
}
- // Op Management
- OSDOperationRegistry registry;
- OperationThrottler throttler;
-
template <typename T, typename... Args>
auto start_operation(Args&&... args) {
if (__builtin_expect(stopping, false)) {
return std::make_pair(std::move(op), std::move(fut));
}
- seastar::future<> stop() {
- crimson::get_logger(ceph_subsys_osd).info("ShardServices::{}", __func__);
- stopping = true;
- return registry.stop();
- }
+ PerShardState(int whoami);
+};
- // Loggers
- PerfCounters &get_recoverystate_perf_logger() {
- return *recoverystate_perf;
- }
- PerfCounters &get_perf_logger() {
- return *perf;
- }
+/**
+ * CoreState
+ *
+ * OSD-wide singleton holding instances that need to be accessible
+ * from all PGs.
+ */
+class CoreState : public md_config_obs_t {
+ friend class ShardServices;
+ friend class PGShardManager;
+ CoreState(
+ int whoami,
+ OSDMapService &osdmap_service,
+ crimson::net::Messenger &cluster_msgr,
+ crimson::net::Messenger &public_msgr,
+ crimson::mon::Client &monc,
+ crimson::mgr::Client &mgrc,
+ crimson::os::FuturizedStore &store);
- /// Dispatch and reset ctx transaction
- seastar::future<> dispatch_context_transaction(
- crimson::os::CollectionRef col, PeeringCtx &ctx);
+ const int whoami;
- /// Dispatch and reset ctx messages
- seastar::future<> dispatch_context_messages(
- BufferedRecoveryMessages &&ctx);
+ crimson::common::CephContext cct;
- /// Dispatch ctx and dispose of context
- seastar::future<> dispatch_context(
- crimson::os::CollectionRef col,
- PeeringCtx &&ctx);
- /// Dispatch ctx and dispose of ctx, transaction must be empty
- seastar::future<> dispatch_context(
- PeeringCtx &&ctx) {
- return dispatch_context({}, std::move(ctx));
+ OSDMapService &osdmap_service;
+ OSDMapService::cached_map_t osdmap;
+ OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
+ void update_map(OSDMapService::cached_map_t new_osdmap) {
+ osdmap = std::move(new_osdmap);
}
- // -- tids --
- // for ops i issue
+ crimson::net::Messenger &cluster_msgr;
+ crimson::net::Messenger &public_msgr;
+
+ seastar::future<> send_to_osd(int peer, MessageURef m, epoch_t from_epoch);
+
+ crimson::mon::Client &monc;
+ seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+
+ crimson::mgr::Client &mgrc;
+
+ crimson::os::FuturizedStore &store;
+
+ // tids for ops i issue
unsigned int next_tid{0};
ceph_tid_t get_tid() {
return (ceph_tid_t)next_tid++;
}
- // PG Temp State
-private:
- // TODO: hook into map processing and some kind of heartbeat/peering
- // message processing
+ // global pg temp state
struct pg_temp_t {
std::vector<int> acting;
bool forced = false;
std::map<pg_t, pg_temp_t> pg_temp_wanted;
std::map<pg_t, pg_temp_t> pg_temp_pending;
friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
-public:
+
void queue_want_pg_temp(pg_t pgid, const std::vector<int>& want,
bool forced = false);
void remove_want_pg_temp(pg_t pgid);
void requeue_pg_temp();
seastar::future<> send_pg_temp();
- // Shard-local OSDMap
-private:
- cached_map_t osdmap;
-public:
- void update_map(cached_map_t new_osdmap);
- cached_map_t &get_osdmap();
-
- // PG Created State
-private:
- std::set<pg_t> pg_created;
-public:
- seastar::future<> send_pg_created(pg_t pgid);
- seastar::future<> send_pg_created();
- void prune_pg_created();
-
+ unsigned num_pgs = 0;
unsigned get_pg_num() const {
return num_pgs;
}
--num_pgs;
}
- seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+ std::set<pg_t> pg_created;
+ seastar::future<> send_pg_created(pg_t pgid);
+ seastar::future<> send_pg_created();
+ void prune_pg_created();
// Time state
ceph::mono_time startup_time = ceph::mono_clock::now();
ceph::signedspan get_mnow() const {
return ceph::mono_clock::now() - startup_time;
}
+
HeartbeatStampsRef get_hb_stamps(int peer);
std::map<int, HeartbeatStampsRef> heartbeat_stamps;
- crimson::osd::ObjectContextRegistry obc_registry;
-
- // Async Reservers
-private:
- unsigned num_pgs = 0;
-
struct DirectFinisher {
void queue(Context *c) {
c->complete(0);
}
} finisher;
- // prevent creating new osd operations when system is shutting down,
- // this is necessary because there are chances that a new operation
- // is created, after the interruption of all ongoing operations, and
- // creats and waits on a new and may-never-resolve future, in which
- // case the shutdown may never succeed.
- bool stopping = false;
-public:
AsyncReserver<spg_t, DirectFinisher> local_reserver;
AsyncReserver<spg_t, DirectFinisher> remote_reserver;
-private:
epoch_t up_thru_wanted = 0;
-public:
seastar::future<> send_alive(epoch_t want);
+
+ const char** get_tracked_conf_keys() const final;
+ void handle_conf_change(
+ const ConfigProxy& conf,
+ const std::set <std::string> &changed) final;
+};
+
+#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
+ template <typename... Args> \
+ auto FROM_METHOD(Args&&... args) const { \
+ return TARGET.TO_METHOD(std::forward<Args>(args)...); \
+ }
+
+#define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \
+ template <typename... Args> \
+ auto FROM_METHOD(Args&&... args) { \
+ return TARGET.TO_METHOD(std::forward<Args>(args)...); \
+ }
+
+#define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
+#define FORWARD_TO_CORE(METHOD) FORWARD(METHOD, METHOD, core_state)
+
+/**
+ * Represents services available to each PG
+ */
+class ShardServices {
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+ CoreState &core_state;
+ PerShardState &local_state;
+public:
+ ShardServices(
+ CoreState &core_state,
+ PerShardState &local_state)
+ : core_state(core_state), local_state(local_state) {}
+
+ FORWARD_TO_CORE(send_to_osd)
+
+ crimson::os::FuturizedStore &get_store() {
+ return core_state.store;
+ }
+
+ crimson::common::CephContext *get_cct() {
+ return &(local_state.cct);
+ }
+
+ // OSDMapService
+ const OSDMapService &get_osdmap_service() const {
+ return core_state.osdmap_service;
+ }
+
+ template <typename T, typename... Args>
+ auto start_operation(Args&&... args) {
+ return local_state.start_operation<T>(std::forward<Args>(args)...);
+ }
+
+ auto &get_registry() { return local_state.registry; }
+
+ // Loggers
+ PerfCounters &get_recoverystate_perf_logger() {
+ return *local_state.recoverystate_perf;
+ }
+ PerfCounters &get_perf_logger() {
+ return *local_state.perf;
+ }
+
+ /// Dispatch and reset ctx transaction
+ seastar::future<> dispatch_context_transaction(
+ crimson::os::CollectionRef col, PeeringCtx &ctx);
+
+ /// Dispatch and reset ctx messages
+ seastar::future<> dispatch_context_messages(
+ BufferedRecoveryMessages &&ctx);
+
+ /// Dispatch ctx and dispose of context
+ seastar::future<> dispatch_context(
+ crimson::os::CollectionRef col,
+ PeeringCtx &&ctx);
+
+ /// Dispatch ctx and dispose of ctx, transaction must be empty
+ seastar::future<> dispatch_context(
+ PeeringCtx &&ctx) {
+ return dispatch_context({}, std::move(ctx));
+ }
+
+ FORWARD_TO_LOCAL(get_osdmap)
+ FORWARD_TO_CORE(get_pg_num)
+ FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
+
+ FORWARD_TO_CORE(osdmap_subscribe)
+ FORWARD_TO_CORE(get_tid)
+ FORWARD_TO_CORE(queue_want_pg_temp)
+ FORWARD_TO_CORE(remove_want_pg_temp)
+ FORWARD_TO_CORE(requeue_pg_temp)
+ FORWARD_TO_CORE(send_pg_created)
+ FORWARD_TO_CORE(inc_pg_num)
+ FORWARD_TO_CORE(dec_pg_num)
+ FORWARD_TO_CORE(send_alive)
+ FORWARD_TO_CORE(send_pg_temp)
+ FORWARD_CONST(get_mnow, get_mnow, core_state)
+ FORWARD_TO_CORE(get_hb_stamps)
+
+ FORWARD(
+ maybe_get_cached_obc, maybe_get_cached_obc, local_state.obc_registry)
+ FORWARD(
+ get_cached_obc, get_cached_obc, local_state.obc_registry)
+
+ FORWARD(
+ local_request_reservation, request_reservation, core_state.local_reserver)
+ FORWARD(
+ local_update_priority, update_priority, core_state.local_reserver)
+ FORWARD(
+ local_cancel_reservation, cancel_reservation, core_state.local_reserver)
+ FORWARD(
+ remote_request_reservation, request_reservation, core_state.remote_reserver)
+ FORWARD(
+ remote_cancel_reservation, cancel_reservation, core_state.remote_reserver)
};
}