From b59694686fd807352ef0a38bc725adc98c32971a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 21 Jun 2019 17:46:01 -0700 Subject: [PATCH] crimson: fix state machine operations in advance_pg_to and pg creation Both pg creation and advance_pg_to process statemachine events and therefore need to be under the pg process pipeline stage. Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/osd.cc | 48 ++-------- src/crimson/osd/osd.h | 2 +- src/crimson/osd/osd_operation.h | 14 ++- .../osd/osd_operations/client_request.cc | 3 +- .../compound_peering_request.cc | 57 ++++++----- .../osd/osd_operations/peering_event.cc | 3 +- .../osd/osd_operations/peering_event.h | 1 + .../osd/osd_operations/pg_advance_map.cc | 95 +++++++++++++++++++ .../osd/osd_operations/pg_advance_map.h | 48 ++++++++++ src/crimson/osd/pg.h | 9 ++ src/crimson/osd/pg_map.h | 1 + src/crimson/osd/shard_services.h | 5 +- 13 files changed, 204 insertions(+), 83 deletions(-) create mode 100644 src/crimson/osd/osd_operations/pg_advance_map.cc create mode 100644 src/crimson/osd/osd_operations/pg_advance_map.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 0d3da12a113..7c3f0f34d46 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -14,6 +14,7 @@ add_executable(crimson-osd osd_operations/client_request.cc osd_operations/peering_event.cc osd_operations/compound_peering_request.cc + osd_operations/pg_advance_map.cc osdmap_gate.cc pg_map.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index ab3a4fc7b56..32694a87a8b 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -34,6 +34,7 @@ #include "osd/PeeringState.h" #include "crimson/osd/osd_operations/compound_peering_request.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/pg_advance_map.h" #include "crimson/osd/osd_operations/client_request.h" namespace { @@ -638,19 +639,11 @@ seastar::future> OSD::handle_pg_create_info( false, rctx.transaction); - pg->handle_initialize(rctx); - pg->handle_activate_map(rctx); - - logger().info("{} new pg {}", __func__, *pg); - pg_map.pg_created(info->pgid, pg); - - return seastar::when_all_succeed( - advance_pg_to(pg, osdmap->get_epoch()), - pg->get_need_up_thru() ? _send_alive() : seastar::now(), - shard_services.dispatch_context( - pg->get_collection_ref(), - std::move(rctx)).then( - [pg]() { return seastar::make_ready_future>(pg); })); + return shard_services.start_operation( + *this, pg, pg->get_osdmap_epoch(), + osdmap->get_epoch(), std::move(rctx), true).second.then([pg] { + return seastar::make_ready_future>(pg); + }); }); }); } @@ -898,7 +891,8 @@ seastar::future<> OSD::consume_map(epoch_t epoch) // todo: m-to-n: broadcast this news to all shards auto &pgs = pg_map.get_pgs(); return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) { - return advance_pg_to(pg.second, epoch); + return shard_services.start_operation( + *this, pg.second, pg.second->get_osdmap_epoch(), epoch).second; }).then([epoch, this] { osdmap_gate.got_map(epoch); return seastar::make_ready_future(); @@ -926,30 +920,4 @@ blocking_future> OSD::wait_for_pg( return pg_map.get_pg(pgid).first; } -seastar::future<> OSD::advance_pg_to(Ref pg, epoch_t to) -{ - auto from = pg->get_osdmap_epoch(); - // todo: merge/split support - return seastar::do_with( - PeeringCtx{}, - [this, pg, from, to](auto &rctx) { - return seastar::do_for_each( - boost::make_counting_iterator(from + 1), - boost::make_counting_iterator(to + 1), - [this, pg, &rctx](epoch_t next_epoch) { - return get_map(next_epoch).then( - [pg, this, &rctx] (cached_map_t&& next_map) { - pg->handle_advance_map(next_map, rctx); - }); - }).then([this, &rctx, pg] { - pg->handle_activate_map(rctx); - return seastar::when_all_succeed( - pg->get_need_up_thru() ? _send_alive() : seastar::now(), - shard_services.dispatch_context( - pg->get_collection_ref(), - std::move(rctx))); - }); - }); -} - } diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5cf093972af..802aa8c101f 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -188,7 +188,6 @@ public: blocking_future> wait_for_pg( spg_t pgid); - seastar::future<> advance_pg_to(Ref pg, epoch_t to); bool should_restart() const; seastar::future<> restart(); seastar::future<> shutdown(); @@ -196,6 +195,7 @@ public: seastar::future<> send_beacon(); void update_heartbeat_peers(); + friend class PGAdvanceMap; }; } diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 73ce75476d5..0152b3f836f 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -20,15 +20,19 @@ namespace ceph::osd { enum class OperationTypeCode { client_request = 0, - peering_event, - compound_peering_request, - last_op + peering_event = 1, + compound_peering_request = 2, + pg_advance_map = 3, + pg_creation = 4, + last_op = 5 }; static constexpr const char* const OP_NAMES[] = { - "client_write", + "client_request", "peering_event", - "compound_peering_request" + "compound_peering_request", + "pg_advance_map", + "pg_creation", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index d65887d303d..8bd81e9816a 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -48,7 +48,7 @@ seastar::future<> ClientRequest::start() logger().debug("{}: start", *this); IRef ref = this; - with_blocking_future(handle.enter(cp().await_map)) + return with_blocking_future(handle.enter(cp().await_map)) .then([this]() { return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch())); }).then([this](epoch_t epoch) { @@ -70,7 +70,6 @@ seastar::future<> ClientRequest::start() }); }); }); - return seastar::make_ready_future(); } } diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index f1abf29c59d..fee6a8794fa 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -85,22 +85,23 @@ std::vector handle_pg_create( q->second.first); } else { auto op = osd.get_shard_services().start_operation( - state, - osd, - conn, - osd.get_shard_services(), - pg_shard_t(), - pgid, - m->epoch, - m->epoch, - NullEvt(), - true, - new PGCreateInfo( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(), pgid, m->epoch, - q->second.first, - q->second.second, - true)); + m->epoch, + NullEvt(), + true, + new PGCreateInfo( + pgid, + m->epoch, + q->second.first, + q->second.second, + true)).first; + ret.push_back(op); } } return ret; @@ -141,8 +142,7 @@ std::vector handle_pg_notify( pg_notify.query_epoch, notify, true, // requires_pg - create_info); - op->start(); + create_info).first; ret.push_back(op); } return ret; @@ -173,7 +173,7 @@ std::vector handle_pg_info( pgid, pg_notify.epoch_sent, pg_notify.query_epoch, - std::move(info)); + std::move(info)).first; ret.push_back(op); } return ret; @@ -213,15 +213,15 @@ std::vector handle_pg_query( pg_query, pg_query.epoch_sent}; logger().debug("handle_pg_query on {} from {}", pgid, from); auto op = osd.get_shard_services().start_operation( - state, - osd, - conn, - osd.get_shard_services(), - pg_shard_t(from, pg_query.from), - pgid, - pg_query.epoch_sent, - pg_query.epoch_sent, - std::move(query)); + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(from, pg_query.from), + pgid, + pg_query.epoch_sent, + pg_query.epoch_sent, + std::move(query)).first; ret.push_back(op); } return ret; @@ -305,7 +305,7 @@ seastar::future<> CompoundPeeringRequest::start() add_blocker(blocker.get()); IRef ref = this; logger().info("{}: about to fork future", *this); - state->promise.get_future().then( + return state->promise.get_future().then( [this, blocker=std::move(blocker)](auto &&ctx) { clear_blocker(blocker.get()); logger().info("{}: sub events complete", *this); @@ -313,9 +313,6 @@ seastar::future<> CompoundPeeringRequest::start() }).then([this, ref=std::move(ref)] { logger().info("{}: complete", *this); }); - - logger().info("{}: forked, returning", *this); - return seastar::now(); } } // namespace ceph::osd diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index 6112d1fb4c3..79584fe4e23 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -51,7 +51,7 @@ seastar::future<> PeeringEvent::start() logger().debug("{}: start", *this); IRef ref = this; - get_pg().then([this](Ref pg) { + return get_pg().then([this](Ref pg) { if (!pg) { logger().debug("{}: pg absent, did not create", *this); on_pg_absent(); @@ -74,7 +74,6 @@ seastar::future<> PeeringEvent::start() }).then([this, ref=std::move(ref)] { logger().debug("{}: complete", *this); }); - return seastar::make_ready_future(); } void PeeringEvent::on_pg_absent() diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 995df4b5b27..62ecccfbe72 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -29,6 +29,7 @@ public: "PeeringEvent::PGPipeline::process" }; friend class PeeringEvent; + friend class PGAdvanceMap; }; protected: diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc new file mode 100644 index 00000000000..2d3061c3252 --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -0,0 +1,95 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include +#include "include/types.h" +#include "crimson/osd/osd_operations/pg_advance_map.h" +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "common/Formatter.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +PGAdvanceMap::PGAdvanceMap( + OSD &osd, Ref pg, epoch_t from, epoch_t to) + : osd(osd), pg(pg), from(from), to(to), do_init(false) {} + +PGAdvanceMap::PGAdvanceMap( + OSD &osd, Ref pg, epoch_t from, epoch_t to, + PeeringCtx &&rctx, bool do_init) + : osd(osd), pg(pg), from(from), to(to), + rctx(std::move(rctx)), do_init(do_init) {} + +PGAdvanceMap::~PGAdvanceMap() {} + +void PGAdvanceMap::print(std::ostream &lhs) const +{ + lhs << "PGAdvanceMap(" + << "pg=" << pg->get_pgid() + << " from=" << from + << " to=" << to; + if (do_init) { + lhs << " do_init"; + } + lhs << ")"; +} + +void PGAdvanceMap::dump_detail(Formatter *f) const +{ + f->open_object_section("PGAdvanceMap"); + f->dump_stream("pgid") << pg->get_pgid(); + f->dump_int("from", from); + f->dump_int("to", to); + f->dump_bool("do_init", do_init); + f->close_section(); +} + +seastar::future<> PGAdvanceMap::start() +{ + using cached_map_t = boost::local_shared_ptr; + + logger().debug("{}: start", *this); + + IRef ref = this; + return with_blocking_future( + handle.enter(pg->peering_request_pg_pipeline.process)) + .then([this] { + if (do_init) { + pg->handle_initialize(rctx); + pg->handle_activate_map(rctx); + } + return seastar::do_for_each( + boost::make_counting_iterator(from + 1), + boost::make_counting_iterator(to + 1), + [this](epoch_t next_epoch) { + return osd.get_map(next_epoch).then( + [this] (cached_map_t&& next_map) { + pg->handle_advance_map(next_map, rctx); + }); + }).then([this] { + pg->handle_activate_map(rctx); + handle.exit(); + if (do_init) { + osd.pg_map.pg_created(pg->get_pgid(), pg); + logger().info("{} new pg {}", __func__, *pg); + } + return seastar::when_all_succeed( + pg->get_need_up_thru() ? osd._send_alive() : seastar::now(), + osd.shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(rctx))); + }); + }).then([this, ref=std::move(ref)] { + logger().debug("{}: complete", *this); + }); +} + +} diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h new file mode 100644 index 00000000000..7225ca9de81 --- /dev/null +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" +#include "crimson/common/type_helpers.h" +#include "osd/PeeringState.h" + +namespace ceph::osd { + +class OSD; +class PG; + +class PGAdvanceMap : public OperationT { +public: + static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; + +protected: + OrderedPipelinePhase::Handle handle; + + OSD &osd; + Ref pg; + + epoch_t from; + epoch_t to; + + PeeringCtx rctx; + const bool do_init; + +public: + PGAdvanceMap( + OSD &osd, Ref pg, epoch_t from, epoch_t to); + PGAdvanceMap( + OSD &osd, Ref pg, epoch_t from, epoch_t to, + PeeringCtx &&rctx, bool do_init); + ~PGAdvanceMap(); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); +}; + +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 7c8e4518d5d..4af2424963e 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -71,6 +71,14 @@ public: ~PG(); + const pg_shard_t &get_pg_whoami() const { + return pg_whoami; + } + + const spg_t&get_pgid() const { + return pgid; + } + // EpochSource epoch_t get_osdmap_epoch() const final { return peering_state.get_osdmap_epoch(); @@ -437,6 +445,7 @@ private: friend std::ostream& operator<<(std::ostream&, const PG& pg); friend class ClientRequest; friend class PeeringEvent; + friend class PGAdvanceMap; }; std::ostream& operator<<(std::ostream&, const PG& pg); diff --git a/src/crimson/osd/pg_map.h b/src/crimson/osd/pg_map.h index 8b4086efd88..a17388c1b3b 100644 --- a/src/crimson/osd/pg_map.h +++ b/src/crimson/osd/pg_map.h @@ -11,6 +11,7 @@ #include "include/types.h" #include "crimson/common/type_helpers.h" #include "crimson/osd/osd_operation.h" +#include "crimson/osd/pg.h" #include "osd/osd_types.h" namespace ceph::osd { diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 9d04ba2e9dc..62b3932c411 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -74,10 +74,9 @@ public: OperationRegistry registry; template - typename T::IRef start_operation(Args&&... args) { + auto start_operation(Args&&... args) { auto op = registry.create_operation(std::forward(args)...); - op->start(); - return op; + return std::make_pair(op, op->start()); } // Loggers -- 2.47.3