return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
shard_services.start_operation<CompoundPeeringRequest>(
- *this,
+ pg_shard_manager,
conn,
m);
return seastar::now();
seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
Ref<MOSDOp> m)
{
- (void) start_pg_operation<ClientRequest>(
+ (void) pg_shard_manager.start_pg_operation<ClientRequest>(
*this,
conn,
std::move(m));
Ref<MOSDPGUpdateLogMissing> m)
{
m->decode_payload();
- (void) start_pg_operation<LogMissingRequest>(
+ (void) pg_shard_manager.start_pg_operation<LogMissingRequest>(
std::move(conn),
std::move(m));
return seastar::now();
Ref<MOSDPGUpdateLogMissingReply> m)
{
m->decode_payload();
- (void) start_pg_operation<LogMissingRequestReply>(
+ (void) pg_shard_manager.start_pg_operation<LogMissingRequestReply>(
std::move(conn),
std::move(m));
return seastar::now();
Ref<MOSDRepOp> m)
{
m->finish_decode();
- std::ignore = start_pg_operation<RepRequest>(
+ std::ignore = pg_shard_manager.start_pg_operation<RepRequest>(
std::move(conn),
std::move(m));
return seastar::now();
pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
pgid.shard};
PeeringState::RequestScrub scrub_request{m->deep, m->repair};
- return start_pg_operation<RemotePeeringEvent>(
+ return pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
conn,
from_shard,
pgid,
seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
Ref<MOSDFastDispatchOp> m)
{
- std::ignore = start_pg_operation<RecoverySubRequest>(conn, std::move(m));
+ std::ignore = pg_shard_manager.start_pg_operation<RecoverySubRequest>(
+ conn, std::move(m));
return seastar::now();
}
const int from = m->get_source().num();
logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
std::unique_ptr<PGPeeringEvent> evt(m->get_event());
- (void) start_pg_operation<RemotePeeringEvent>(
+ (void) pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
conn,
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
public:
seastar::future<> send_beacon();
- template <typename T, typename... Args>
- auto start_pg_operation(Args&&... args) {
- auto op = shard_services.get_registry().create_operation<T>(
- std::forward<Args>(args)...);
- auto &logger = crimson::get_logger(ceph_subsys_osd);
- logger.debug("{}: starting {}", *op, __func__);
- auto &opref = *op;
-
- auto fut = opref.template enter_stage<>(
- opref.get_connection_pipeline().await_active
- ).then([this, &opref, &logger] {
- logger.debug("{}: start_pg_operation in await_active stage", opref);
- return pg_shard_manager.when_active();
- }).then([&logger, &opref] {
- logger.debug("{}: start_pg_operation active, entering await_map", opref);
- return opref.template enter_stage<>(
- opref.get_connection_pipeline().await_map);
- }).then([this, &logger, &opref] {
- logger.debug("{}: start_pg_operation await_map stage", opref);
- using OSDMapBlockingEvent =
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
- return opref.template with_blocking_event<OSDMapBlockingEvent>(
- [this, &opref](auto &&trigger) {
- return pg_shard_manager.wait_for_map(
- std::move(trigger),
- opref.get_epoch(),
- &shard_services
- );
- });
- }).then([&logger, &opref](auto epoch) {
- logger.debug("{}: got map {}, entering get_pg", opref, epoch);
- return opref.template enter_stage<>(
- opref.get_connection_pipeline().get_pg);
- }).then([this, &logger, &opref] {
- logger.debug("{}: in get_pg", opref);
- if constexpr (T::can_create()) {
- logger.debug("{}: can_create", opref);
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([this, &opref](auto &&trigger) {
- std::ignore = this; // avoid clang warning
- return pg_shard_manager.get_or_create_pg(
- pg_shard_manager,
- pg_shard_manager.get_shard_services(),
- std::move(trigger),
- opref.get_pgid(), opref.get_epoch(),
- std::move(opref.get_create_info()));
- });
- } else {
- logger.debug("{}: !can_create", opref);
- return opref.template with_blocking_event<
- PGMap::PGCreationBlockingEvent
- >([this, &opref](auto &&trigger) {
- std::ignore = this; // avoid clang warning
- return pg_shard_manager.wait_for_pg(
- std::move(trigger), opref.get_pgid());
- });
- }
- }).then([this, &logger, &opref](Ref<PG> pgref) {
- logger.debug("{}: have_pg", opref);
- return opref.with_pg(shard_services, pgref);
- }).then([op] { /* Retain refcount on op until completion */ });
-
- return std::make_pair(std::move(op), std::move(fut));
- }
-
-
private:
LogClient log_client;
LogChannelRef clog;
template <class OpT>
friend class crimson::os::seastore::OperationProxyT;
- // OSD::start_pg_operation needs access to enter_stage, we can make this
+ // PGShardManager::start_pg_operation needs access to enter_stage, we can make this
// more sophisticated later on
- friend class OSD;
+ friend class PGShardManager;
};
/**
#include "crimson/common/exception.h"
#include "crimson/osd/pg.h"
-#include "crimson/osd/osd.h"
+#include "crimson/osd/pg_shard_manager.h"
#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
};
std::vector<crimson::OperationRef> handle_pg_create(
- OSD &osd,
+ PGShardManager &pg_shard_manager,
crimson::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGCreate2> m)
pgid, m->epoch,
pi, history);
} else {
- auto op = osd.start_pg_operation<PeeringSubEvent>(
+ auto op = pg_shard_manager.start_pg_operation<PeeringSubEvent>(
state,
conn,
pg_shard_t(),
namespace crimson::osd {
CompoundPeeringRequest::CompoundPeeringRequest(
- OSD &osd,
+ PGShardManager &pg_shard_manager,
crimson::net::ConnectionRef conn, Ref<Message> m)
- : osd(osd),
+ : pg_shard_manager(pg_shard_manager),
conn(conn),
m(m)
{}
[&] {
assert((m->get_type() == MSG_OSD_PG_CREATE2));
return handle_pg_create(
- osd,
+ pg_shard_manager,
conn,
state,
boost::static_pointer_cast<MOSDPGCreate2>(m));
return trigger.maybe_record_blocking(state->promise.get_future(), *blocker);
}).then([this, blocker=std::move(blocker)](auto &&ctx) {
logger().info("{}: sub events complete", *this);
- return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+ return pg_shard_manager.get_shard_services(
+ ).dispatch_context_messages(std::move(ctx));
}).then([this, ref=std::move(ref)] {
track_event<CompletionEvent>();
logger().info("{}: complete", *this);
namespace crimson::osd {
-class OSD;
+class PGShardManager;
class PG;
using osd_id_t = int;
};
private:
- OSD &osd;
+ PGShardManager &pg_shard_manager;
crimson::net::ConnectionRef conn;
Ref<Message> m;
public:
CompoundPeeringRequest(
- OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m);
+ PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, Ref<Message> m);
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
FORWARD_TO_CORE(stop_pgs)
FORWARD_CONST(get_pg_stats, get_pg_stats, core_state)
- FORWARD_TO_CORE(get_or_create_pg)
- FORWARD_TO_CORE(wait_for_pg)
FORWARD_CONST(for_each_pg, for_each_pg, core_state)
auto get_num_pgs() const { return core_state.pg_map.get_pgs().size(); }
auto with_pg(spg_t pgid, F &&f) {
return std::invoke(std::forward<F>(f), core_state.get_pg(pgid));
}
+
+ template <typename T, typename... Args>
+ auto start_pg_operation(Args&&... args) {
+ auto op = local_state.registry.create_operation<T>(
+ std::forward<Args>(args)...);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ logger.debug("{}: starting {}", *op, __func__);
+ auto &opref = *op;
+
+ auto fut = opref.template enter_stage<>(
+ opref.get_connection_pipeline().await_active
+ ).then([this, &opref, &logger] {
+ logger.debug("{}: start_pg_operation in await_active stage", opref);
+ return core_state.osd_state.when_active();
+ }).then([&logger, &opref] {
+ logger.debug("{}: start_pg_operation active, entering await_map", opref);
+ return opref.template enter_stage<>(
+ opref.get_connection_pipeline().await_map);
+ }).then([this, &logger, &opref] {
+ logger.debug("{}: start_pg_operation await_map stage", opref);
+ using OSDMapBlockingEvent =
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
+ return opref.template with_blocking_event<OSDMapBlockingEvent>(
+ [this, &opref](auto &&trigger) {
+ std::ignore = this;
+ return core_state.osdmap_gate.wait_for_map(
+ std::move(trigger),
+ opref.get_epoch(),
+ &shard_services);
+ });
+ }).then([&logger, &opref](auto epoch) {
+ logger.debug("{}: got map {}, entering get_pg", opref, epoch);
+ return opref.template enter_stage<>(
+ opref.get_connection_pipeline().get_pg);
+ }).then([this, &logger, &opref] {
+ logger.debug("{}: in get_pg", opref);
+ if constexpr (T::can_create()) {
+ logger.debug("{}: can_create", opref);
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([this, &opref](auto &&trigger) {
+ std::ignore = this; // avoid clang warning
+ return core_state.get_or_create_pg(
+ *this,
+ shard_services,
+ std::move(trigger),
+ opref.get_pgid(), opref.get_epoch(),
+ std::move(opref.get_create_info()));
+ });
+ } else {
+ logger.debug("{}: !can_create", opref);
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([this, &opref](auto &&trigger) {
+ std::ignore = this; // avoid clang warning
+ return core_state.wait_for_pg(std::move(trigger), opref.get_pgid());
+ });
+ }
+ }).then([this, &logger, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(get_shard_services(), pgref);
+ }).then([op] { /* Retain refcount on op until completion */ });
+
+ return std::make_pair(std::move(op), std::move(fut));
+ }
};
}