osd_operation.cc
osd_operations/client_request.cc
osd_operations/client_request_common.cc
- osd_operations/compound_peering_request.cc
osd_operations/internal_client_request.cc
osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDPeeringOp.h"
+#include "messages/MOSDPGCreate2.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
#include "messages/MOSDRepOpReply.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
#include "crimson/osd/osd_operations/client_request.h"
-#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
- get_shard_services().start_operation<CompoundPeeringRequest>(
- pg_shard_manager,
- conn,
- m);
+ return handle_pg_create(
+ conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
return seastar::now();
case MSG_COMMAND:
return handle_command(conn, boost::static_pointer_cast<MCommand>(m));
return seastar::now();
}
+seastar::future<> OSD::handle_pg_create(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m)
+{
+ for (auto& [pgid, when] : m->pgs) {
+ const auto &[created, created_stamp] = when;
+ auto q = m->pg_extra.find(pgid);
+ ceph_assert(q != m->pg_extra.end());
+ auto& [history, pi] = q->second;
+ logger().debug(
+ "{}: {} e{} @{} "
+ "history {} pi {}",
+ __func__, pgid, created, created_stamp,
+ history, pi);
+ if (!pi.empty() &&
+ m->epoch < pi.get_bounds().second) {
+ logger().error(
+ "got pg_create on {} epoch {} "
+ "unmatched past_intervals {} (history {})",
+ pgid, m->epoch,
+ pi, history);
+ } else {
+ std::ignore = pg_shard_manager.start_pg_operation<RemotePeeringEvent>(
+ conn,
+ pg_shard_t(),
+ pgid,
+ m->epoch,
+ m->epoch,
+ NullEvt(),
+ true,
+ new PGCreateInfo(pgid, m->epoch, history, pi, true));
+ }
+ }
+ return seastar::now();
+}
+
seastar::future<> OSD::handle_update_log_missing(
crimson::net::ConnectionRef conn,
Ref<MOSDPGUpdateLogMissing> m)
seastar::future<> handle_osd_map(crimson::net::ConnectionRef conn,
Ref<MOSDMap> m);
+ seastar::future<> handle_pg_create(crimson::net::ConnectionRef conn,
+ Ref<MOSDPGCreate2> m);
seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
Ref<MOSDOp> m);
seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
#include "crimson/osd/osdmap_gate.h"
#include "crimson/osd/osd_operations/background_recovery.h"
#include "crimson/osd/osd_operations/client_request.h"
-#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/recovery_subrequest.h"
const Operation&) override {}
};
-struct LttngBackendCompoundPeering
- : CompoundPeeringRequest::StartEvent::Backend,
- CompoundPeeringRequest::SubOpBlocker::BlockingEvent::Backend,
- CompoundPeeringRequest::CompletionEvent::Backend
-{
- void handle(CompoundPeeringRequest::StartEvent&,
- const Operation&) override {}
-
- void handle(CompoundPeeringRequest::SubOpBlocker::BlockingEvent& ev,
- const Operation& op,
- const CompoundPeeringRequest::SubOpBlocker& blocker) override {
- }
-
- void handle(CompoundPeeringRequest::CompletionEvent&,
- const Operation&) override {}
-};
-
struct HistoricBackend
: ClientRequest::StartEvent::Backend,
ConnectionPipeline::AwaitActive::BlockingEvent::Backend,
}
};
-template <>
-struct EventBackendRegistry<osd::CompoundPeeringRequest> {
- static std::tuple<osd::LttngBackendCompoundPeering> get_backends() {
- return { {} };
- }
-};
-
template <>
struct EventBackendRegistry<osd::BackfillRecovery> {
static std::tuple<> get_backends() {
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include <seastar/core/future.hh>
-
-#include "osd/PeeringState.h"
-
-#include "messages/MOSDPGCreate2.h"
-
-#include "common/Formatter.h"
-
-#include "crimson/common/exception.h"
-#include "crimson/osd/pg.h"
-#include "crimson/osd/pg_shard_manager.h"
-#include "crimson/osd/osd_operation_external_tracking.h"
-#include "crimson/osd/osd_operations/compound_peering_request.h"
-
-namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_osd);
- }
-}
-
-namespace {
-using namespace crimson::osd;
-
-struct compound_state {
- seastar::promise<BufferedRecoveryMessages> promise;
- // assuming crimson-osd won't need to be compatible with pre-octopus
- // releases
- BufferedRecoveryMessages ctx;
- compound_state() = default;
- ~compound_state() {
- promise.set_value(std::move(ctx));
- }
-};
-using compound_state_ref = seastar::lw_shared_ptr<compound_state>;
-
-class PeeringSubEvent : public RemotePeeringEvent {
- compound_state_ref state;
-public:
- template <typename... Args>
- PeeringSubEvent(compound_state_ref state, Args &&... args) :
- RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
-
- PeeringEvent::interruptible_future<>
- complete_rctx(
- ShardServices &shard_services,
- Ref<crimson::osd::PG> pg) final {
- logger().debug("{}: submitting ctx transaction", *this);
- state->ctx.accept_buffered_messages(ctx);
- state = {};
- if (!pg) {
- ceph_assert(ctx.transaction.empty());
- return seastar::now();
- } else {
- return shard_services.dispatch_context_transaction(
- pg->get_collection_ref(), ctx);
- }
- }
-};
-
-std::vector<crimson::Operation::id_t> handle_pg_create(
- PGShardManager &pg_shard_manager,
- crimson::net::ConnectionRef conn,
- compound_state_ref state,
- Ref<MOSDPGCreate2> m)
-{
- std::vector<crimson::Operation::id_t> ret;
- for (auto& [pgid, when] : m->pgs) {
- const auto &[created, created_stamp] = when;
- auto q = m->pg_extra.find(pgid);
- ceph_assert(q != m->pg_extra.end());
- auto& [history, pi] = q->second;
- logger().debug(
- "{}: {} e{} @{} "
- "history {} pi {}",
- __func__, pgid, created, created_stamp,
- history, pi);
- if (!pi.empty() &&
- m->epoch < pi.get_bounds().second) {
- logger().error(
- "got pg_create on {} epoch {} "
- "unmatched past_intervals {} (history {})",
- pgid, m->epoch,
- pi, history);
- } else {
- auto [op_id, fut] = pg_shard_manager.start_pg_operation<PeeringSubEvent>(
- state,
- conn,
- pg_shard_t(),
- pgid,
- m->epoch,
- m->epoch,
- NullEvt(),
- true,
- new PGCreateInfo(pgid, m->epoch, history, pi, true));
- ret.push_back(op_id);
- }
- }
- return ret;
-}
-
-} // namespace
-
-namespace crimson::osd {
-
-CompoundPeeringRequest::CompoundPeeringRequest(
- PGShardManager &pg_shard_manager,
- crimson::net::ConnectionRef conn, Ref<Message> m)
- : pg_shard_manager(pg_shard_manager),
- conn(conn),
- m(m)
-{}
-
-void CompoundPeeringRequest::print(std::ostream &lhs) const
-{
- lhs << *m;
-}
-
-void CompoundPeeringRequest::dump_detail(Formatter *f) const
-{
- f->dump_stream("message") << *m;
-}
-
-seastar::future<> CompoundPeeringRequest::start()
-{
- logger().info("{}: starting", *this);
- track_event<StartEvent>();
- auto state = seastar::make_lw_shared<compound_state>();
- auto blocker = std::make_unique<SubOpBlocker>(
- [&] {
- assert((m->get_type() == MSG_OSD_PG_CREATE2));
- return handle_pg_create(
- pg_shard_manager,
- conn,
- state,
- boost::static_pointer_cast<MOSDPGCreate2>(m));
- }());
-
- IRef ref = this;
- logger().info("{}: about to fork future", *this);
- return crimson::common::handle_system_shutdown(
- [this, ref, blocker=std::move(blocker), state]() mutable {
- return with_blocking_event<SubOpBlocker::BlockingEvent>([&] (auto&& trigger) {
- return trigger.maybe_record_blocking(state->promise.get_future(), *blocker);
- }).then([this, blocker=std::move(blocker)](auto &&ctx) {
- logger().info("{}: sub events complete", *this);
- return pg_shard_manager.get_shard_services(
- ).dispatch_context_messages(std::move(ctx));
- }).then([this, ref=std::move(ref)] {
- track_event<CompletionEvent>();
- logger().info("{}: complete", *this);
- });
- });
-}
-
-} // namespace crimson::osd
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <iostream>
-#include <seastar/core/future.hh>
-
-#include "msg/MessageRef.h"
-
-#include "crimson/net/Connection.h"
-#include "crimson/osd/osd_operation.h"
-
-namespace crimson::osd {
-
-class PGShardManager;
-class PG;
-
-using osd_id_t = int;
-
-class CompoundPeeringRequest : public TrackableOperationT<CompoundPeeringRequest> {
- friend class LttngBackendCompoundPeering;
-public:
- static constexpr OperationTypeCode type =
- OperationTypeCode::compound_peering_request;
-
- struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
- static constexpr const char * type_name = "CompoundOpBlocker";
-
- std::vector<crimson::Operation::id_t> subops;
- SubOpBlocker(std::vector<crimson::Operation::id_t> &&subops)
- : subops(subops) {}
-
- virtual void dump_detail(Formatter *f) const {
- f->open_array_section("dependent_operations");
- {
- for (auto &i : subops) {
- f->dump_unsigned("op_id", i);
- }
- }
- f->close_section();
- }
- };
-
-private:
- PGShardManager &pg_shard_manager;
- crimson::net::ConnectionRef conn;
- Ref<Message> m;
-
-public:
- CompoundPeeringRequest(
- PGShardManager &pg_shard_manager, crimson::net::ConnectionRef conn, Ref<Message> m);
-
- void print(std::ostream &) const final;
- void dump_detail(Formatter *f) const final;
- seastar::future<> start();
-
- std::tuple<
- StartEvent,
- SubOpBlocker::BlockingEvent,
- CompletionEvent
- > tracking_events;
-};
-
-}