From e6c6de335b6a662b83749897bdbee697a2d288cc Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 10 Jun 2019 17:31:20 -0700 Subject: [PATCH] crimson/osd/: restructure client and peering operation handling This patch: * Breaks pg map handling out into a separate module (pg_map.*) * Breaks osdmap waiting into a separate module - Ops actually need to wait twice (potentially): once at the osd and a second time at the pg. The same structure is used for both. * Op ordering is enforced via OrderedExclusivePipelineStages defined in osd_operations.h. - Peering and client ops each define a pipeline section centered on the connection as well as one centered on the pg. Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 5 + src/crimson/osd/osd.cc | 352 +++--------------- src/crimson/osd/osd.h | 98 +---- src/crimson/osd/osd_connection_priv.h | 25 ++ .../osd/osd_operations/client_request.cc | 76 ++++ .../osd/osd_operations/client_request.h | 55 +++ .../compound_peering_request.cc | 321 ++++++++++++++++ .../osd_operations/compound_peering_request.h | 40 ++ .../osd/osd_operations/peering_event.cc | 120 ++++++ .../osd/osd_operations/peering_event.h | 123 ++++++ src/crimson/osd/osdmap_gate.cc | 54 +++ src/crimson/osd/osdmap_gate.h | 67 ++++ src/crimson/osd/pg.cc | 36 +- src/crimson/osd/pg.h | 33 +- src/crimson/osd/pg_map.cc | 70 ++++ src/crimson/osd/pg_map.h | 69 ++++ src/crimson/osd/shard_services.cc | 36 +- src/crimson/osd/shard_services.h | 24 +- 18 files changed, 1178 insertions(+), 426 deletions(-) create mode 100644 src/crimson/osd/osd_connection_priv.h create mode 100644 src/crimson/osd/osd_operations/client_request.cc create mode 100644 src/crimson/osd/osd_operations/client_request.h create mode 100644 src/crimson/osd/osd_operations/compound_peering_request.cc create mode 100644 src/crimson/osd/osd_operations/compound_peering_request.h create mode 100644 src/crimson/osd/osd_operations/peering_event.cc create mode 100644 src/crimson/osd/osd_operations/peering_event.h create mode 100644 src/crimson/osd/osdmap_gate.cc create mode 100644 src/crimson/osd/osdmap_gate.h create mode 100644 src/crimson/osd/pg_map.cc create mode 100644 src/crimson/osd/pg_map.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 77ac3858b69..0d3da12a113 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -11,6 +11,11 @@ add_executable(crimson-osd replicated_backend.cc shard_services.cc osd_operation.cc + osd_operations/client_request.cc + osd_operations/peering_event.cc + osd_operations/compound_peering_request.cc + osdmap_gate.cc + pg_map.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 707abacfc25..ab3a4fc7b56 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -15,12 +15,8 @@ #include "messages/MOSDBoot.h" #include "messages/MOSDMap.h" #include "messages/MOSDOp.h" -#include "messages/MOSDPGInfo.h" #include "messages/MOSDPGLog.h" -#include "messages/MOSDPGNotify.h" -#include "messages/MOSDPGQuery.h" #include "messages/MPGStats.h" -#include "messages/MOSDPGCreate2.h" #include "crimson/mon/MonClient.h" #include "crimson/net/Connection.h" @@ -36,6 +32,9 @@ #include "crimson/osd/pg_meta.h" #include "osd/PGPeeringEvent.h" #include "osd/PeeringState.h" +#include "crimson/osd/osd_operations/compound_peering_request.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/client_request.h" namespace { seastar::logger& logger() { @@ -66,7 +65,8 @@ OSD::OSD(int id, uint32_t nonce, store{ceph::os::FuturizedStore::create( local_conf().get_val("osd_objectstore"), local_conf().get_val("osd_data"))}, - shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store} + shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store}, + osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services))) { osdmaps[0] = boost::make_local_shared(); for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr), @@ -193,6 +193,7 @@ seastar::future<> OSD::start() return get_map(superblock.current_epoch); }).then([this](cached_map_t&& map) { shard_services.update_map(osdmap); + osdmap_gate.got_map(map->get_epoch()); osdmap = std::move(map); return load_pgs(); }).then([this] { @@ -264,9 +265,9 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest) } // get all the latest maps if (osdmap->get_epoch() + 1 >= oldest) { - return osdmap_subscribe(osdmap->get_epoch() + 1, false); + return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false); } else { - return osdmap_subscribe(oldest - 1, true); + return shard_services.osdmap_subscribe(oldest - 1, true); } } @@ -328,7 +329,7 @@ seastar::future<> OSD::load_pgs() if (coll.is_pg(&pgid)) { return load_pg(pgid).then([pgid, this](auto&& pg) { logger().info("load_pgs: loaded {}", pgid); - pgs.emplace(pgid, std::move(pg)); + pg_map.pg_loaded(pgid, std::move(pg)); return seastar::now(); }); } else if (coll.is_temp(&pgid)) { @@ -401,16 +402,17 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m) return handle_osd_map(conn, boost::static_pointer_cast(m)); case CEPH_MSG_OSD_OP: return handle_osd_op(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_CREATE2: case MSG_OSD_PG_NOTIFY: - return handle_pg_notify(conn, boost::static_pointer_cast(m)); case MSG_OSD_PG_INFO: - return handle_pg_info(conn, boost::static_pointer_cast(m)); case MSG_OSD_PG_QUERY: - return handle_pg_query(conn, boost::static_pointer_cast(m)); + shard_services.start_operation( + *this, + conn->get_shared(), + m); + return seastar::now(); case MSG_OSD_PG_LOG: return handle_pg_log(conn, boost::static_pointer_cast(m)); - case MSG_OSD_PG_CREATE2: - return handle_pg_create(conn, boost::static_pointer_cast(m)); default: logger().info("{} unhandled message {}", __func__, *m); return seastar::now(); @@ -452,7 +454,7 @@ MessageRef OSD::get_stats() // MPGStats::had_map_for is not used since PGMonitor was removed auto m = make_message(monc->get_fsid(), osdmap->get_epoch()); - for (auto [pgid, pg] : pgs) { + for (auto [pgid, pg] : pg_map.get_pgs()) { if (pg->is_primary()) { auto stats = pg->get_stats(); // todo: update reported_epoch,reported_seq,last_fresh @@ -543,17 +545,6 @@ seastar::future<> OSD::store_maps(ceph::os::Transaction& t, }); } -seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) -{ - logger().info("{}({})", __func__, epoch); - if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || - force_request) { - return monc->renew_subs(); - } else { - return seastar::now(); - } -} - bool OSD::require_mon_peer(ceph::net::Connection *conn, Ref m) { if (!conn->peer_is_mon()) { @@ -651,7 +642,8 @@ seastar::future> OSD::handle_pg_create_info( pg->handle_activate_map(rctx); logger().info("{} new pg {}", __func__, *pg); - pgs.emplace(info->pgid, pg); + pg_map.pg_created(info->pgid, pg); + return seastar::when_all_succeed( advance_pg_to(pg, osdmap->get_epoch()), pg->get_need_up_thru() ? _send_alive() : seastar::now(), @@ -663,59 +655,6 @@ seastar::future> OSD::handle_pg_create_info( }); } -seastar::future<> OSD::handle_pg_create( - ceph::net::Connection* conn, - Ref m) -{ - logger().info("{}: {} from {}", __func__, *m, m->get_source()); - if (!require_mon_peer(conn, m)) { - return seastar::now(); - } - return handle_batch_pg_message( - m->pgs, - [this, conn, m](auto p) - -> std::optional>> { - const spg_t &pgid = p.first; - const auto &[created, created_stamp] = p.second; - - auto q = m->pg_extra.find(pgid); - ceph_assert(q != m->pg_extra.end()); - logger().debug( - "{} {} e{} @{} history {} pi {}", - __func__, - pgid, - created, - created_stamp, - q->second.first, - q->second.second); - if (!q->second.second.empty() && - m->epoch < q->second.second.get_bounds().second) { - logger().error( - "got pg_create on {} epoch {} unmatched past_intervals (history {})", - pgid, - m->epoch, - q->second.second, - q->second.first); - return std::nullopt; - } else { - return std::make_optional( - std::make_tuple( - pgid, - std::make_unique( - m->epoch, - m->epoch, - NullEvt(), - true, - new PGCreateInfo( - pgid, - m->epoch, - q->second.first, - q->second.second, - true)))); - } - }); -} - seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn, Ref m) { @@ -745,14 +684,14 @@ seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn, logger().info("handle_osd_map message skips epochs {}..{}", start, first - 1); if (m->oldest_map <= start) { - return osdmap_subscribe(start, false); + return shard_services.osdmap_subscribe(start, false); } // always try to get the full range of maps--as many as we can. this // 1- is good to have // 2- is at present the only way to ensure that we get a *full* map as // the first map! if (m->oldest_map < first) { - return osdmap_subscribe(m->oldest_map - 1, true); + return shard_services.osdmap_subscribe(m->oldest_map - 1, true); } skip_maps = true; start = first; @@ -851,19 +790,11 @@ seastar::future<> OSD::committed_osd_maps(version_t first, seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn, Ref m) { - return wait_for_map(m->get_map_epoch()).then([=](epoch_t epoch) { - if (auto found = pgs.find(m->get_spg()); found != pgs.end()) { - return found->second->handle_op(conn, std::move(m)); - } else if (osdmap->is_up_acting_osd_shard(m->get_spg(), whoami)) { - logger().info("no pg, should exist e{}, will wait", epoch); - // todo, wait for peering, etc - return seastar::now(); - } else { - logger().info("no pg, shouldn't exist e{}, dropping", epoch); - // todo: share map with client - return seastar::now(); - } - }); + shard_services.start_operation( + *this, + conn->get_shared(), + std::move(m)); + return seastar::now(); } bool OSD::should_restart() const @@ -923,7 +854,7 @@ void OSD::update_heartbeat_peers() if (!state.is_active()) { return; } - for (auto& pg : pgs) { + for (auto& pg : pg_map.get_pgs()) { vector up, acting; osdmap->pg_to_up_acting_osds(pg.first.pgid, &up, nullptr, @@ -937,115 +868,20 @@ void OSD::update_heartbeat_peers() heartbeat->update_peers(whoami); } -seastar::future<> OSD::handle_pg_notify( - ceph::net::Connection* conn, - Ref m) -{ - // assuming all pgs reside in a single shard - // see OSD::dequeue_peering_evt() - const int from = m->get_source().num(); - return handle_batch_pg_message( - m->get_pg_list(), - [from, this](pair p) { - auto& [pg_notify, past_intervals] = p; - spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; - MNotifyRec notify{pgid, - pg_shard_t{from, pg_notify.from}, - pg_notify, - 0, // the features is not used - past_intervals}; - logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from); - auto create_info = new PGCreateInfo{ - pgid, - pg_notify.query_epoch, - pg_notify.info.history, - past_intervals, - false}; - return std::make_optional( - std::make_tuple( - pgid, - std::make_unique( - pg_notify.epoch_sent, - pg_notify.query_epoch, - notify, - true, // requires_pg - create_info))); - }); -} - -seastar::future<> OSD::handle_pg_info( - ceph::net::Connection* conn, - Ref m) -{ - // assuming all pgs reside in a single shard - // see OSD::dequeue_peering_evt() - const int from = m->get_source().num(); - return handle_batch_pg_message( - m->pg_list, - [from, this](pair p) { - auto& pg_notify = p.first; - spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; - logger().debug("handle_pg_info on {} from {}", pgid.pgid, from); - MInfoRec info{pg_shard_t{from, pg_notify.from}, - pg_notify.info, - pg_notify.epoch_sent}; - return std::make_optional( - std::tuple( - pgid, - std::make_unique( - pg_notify.epoch_sent, - pg_notify.query_epoch, - std::move(info)))); - }); -} - -seastar::future<> OSD::handle_pg_query(ceph::net::Connection* conn, - Ref m) -{ - // assuming all pgs reside in a single shard - // see OSD::dequeue_peering_evt() - const int from = m->get_source().num(); - // TODO: handle missing pg -- handle_batch_pg_message ignores pgs - // that don't exist - return handle_batch_pg_message_with_missing_handler( - m->pg_list, - [from, this](pair p) { - auto& [pgid, pg_query] = p; - MQuery query{pgid, pg_shard_t{from, pg_query.from}, - pg_query, pg_query.epoch_sent}; - logger().debug("handle_pg_query on {} from {}", pgid, from); - return std::make_optional( - std::make_tuple( - pgid, - std::make_unique( - pg_query.epoch_sent, - pg_query.epoch_sent, - std::move(query)))); - }, - [this, from](pair p, PeeringCtx &ctx) { - auto &[pgid, query] = p; - logger().debug("handle_pg_query on absent pg {} from {}", pgid, from); - pg_info_t empty(spg_t(pgid.pgid, query.to)); - ceph_assert(query.type == pg_query_t::INFO); - ctx.notify_list[from].emplace_back( - pg_notify_t( - query.from, query.to, - query.epoch_sent, - osdmap->get_epoch(), - empty), - PastIntervals()); - }); -} - seastar::future<> OSD::handle_pg_log( ceph::net::Connection* conn, Ref m) { const int from = m->get_source().num(); logger().debug("handle_pg_log on {} from {}", m->get_spg(), from); - return do_peering_event_and_dispatch( - m->get_spg(), - PGPeeringEventURef(m->get_event())); + shard_services.start_operation( + *this, + conn->get_shared(), + shard_services, + pg_shard_t(from, m->from), + spg_t(m->info.pgid.pgid, m->to), + std::move(*m->get_event())); + return seastar::now(); } void OSD::check_osdmap_features() @@ -1060,124 +896,34 @@ void OSD::check_osdmap_features() seastar::future<> OSD::consume_map(epoch_t epoch) { // todo: m-to-n: broadcast this news to all shards + auto &pgs = pg_map.get_pgs(); return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) { return advance_pg_to(pg.second, epoch); }).then([epoch, this] { - auto first = waiting_peering.begin(); - auto last = waiting_peering.upper_bound(epoch); - std::for_each(first, last, [epoch, this](auto& blocked_requests) { - blocked_requests.second.set_value(epoch); - }); - waiting_peering.erase(first, last); - return seastar::now(); + osdmap_gate.got_map(epoch); + return seastar::make_ready_future(); }); } -seastar::future> -OSD::get_pg( +blocking_future> +OSD::get_or_create_pg( spg_t pgid, epoch_t epoch, std::unique_ptr info) { - return wait_for_map(epoch).then([this, pgid, epoch, info=std::move(info)](epoch_t) mutable { - if (auto pg = pgs.find(pgid); pg != pgs.end()) { - return advance_pg_to(pg->second, epoch).then([pg=pg->second]() { - return seastar::make_ready_future>(pg); - }); - } else if (!info) { - return seastar::make_ready_future>(); - } else { - auto creating = pgs_creating.find(pgid); - if (creating == pgs_creating.end()) { - creating = pgs_creating.emplace( - pgid, - seastar::shared_future>(handle_pg_create_info(std::move(info)).then([this, pgid](auto pg) { - pgs_creating.erase(pgid); - return seastar::make_ready_future>(pg); - }))).first; - } - return creating->second.get_future().then([this, epoch](auto pg) { - return advance_pg_to(pg, epoch).then([pg]() { - return seastar::make_ready_future>(pg); - }); - }); - } - }); -} - -seastar::future> -OSD::do_peering_event( - spg_t pgid, - PGPeeringEventURef evt, - PeeringCtx &rctx) -{ - return get_pg(pgid, evt->get_epoch_sent(), std::move(evt->create_info)) - .then([this, evt=std::move(evt), &rctx](Ref pg) mutable { - if (pg) { - pg->do_peering_event(std::move(evt), rctx); - } - return seastar::make_ready_future>(pg); - }); -} - -seastar::future -OSD::do_peering_event_and_dispatch_transaction( - spg_t pgid, - std::unique_ptr evt, - PeeringCtx &rctx) -{ - return do_peering_event(pgid, std::move(evt), rctx).then( - [this, pgid, &rctx](Ref pg) mutable { - if (pg) { - return seastar::when_all_succeed( - pg->get_need_up_thru() ? _send_alive() : seastar::now(), - shard_services.dispatch_context_transaction( - pg->get_collection_ref(), rctx)).then([] { return true; }); - } else { - return seastar::make_ready_future(false); - } - }); -} - -seastar::future<> -OSD::do_peering_event_and_dispatch( - spg_t pgid, - std::unique_ptr evt) -{ - return seastar::do_with( - PeeringCtx{}, - [this, pgid, evt=std::move(evt)](auto &rctx) mutable { - return do_peering_event(pgid, std::move(evt), rctx).then( - [this, pgid, &rctx](Ref pg) mutable { - if (pg) { - return seastar::when_all_succeed( - pg->get_need_up_thru() ? _send_alive() : seastar::now(), - shard_services.dispatch_context( - pg->get_collection_ref(), std::move(rctx))); - } else { - return seastar::now(); - } - }); - }).handle_exception([](auto ep) { - logger().error("do_peering_event_and_dispatch saw {}", ep); - return seastar::make_exception_future<>(ep); - }); + auto [fut, creating] = pg_map.get_pg(pgid, bool(info)); + if (!creating && info) { + pg_map.set_creating(pgid); + handle_pg_create_info(std::move(info)); + } + return std::move(fut); } -seastar::future OSD::wait_for_map(epoch_t epoch) +blocking_future> OSD::wait_for_pg( + spg_t pgid) { - const auto mine = osdmap->get_epoch(); - if (mine >= epoch) { - return seastar::make_ready_future(mine); - } else { - logger().info("evt epoch is {}, i have {}, will wait", epoch, mine); - auto fut = waiting_peering[epoch].get_shared_future(); - return osdmap_subscribe(osdmap->get_epoch(), true).then( - [fut=std::move(fut)]() mutable { - return std::move(fut); - }); - } + return pg_map.get_pg(pgid).first; } seastar::future<> OSD::advance_pg_to(Ref pg, epoch_t to) @@ -1193,7 +939,7 @@ seastar::future<> OSD::advance_pg_to(Ref pg, epoch_t to) [this, pg, &rctx](epoch_t next_epoch) { return get_map(next_epoch).then( [pg, this, &rctx] (cached_map_t&& next_map) { - return pg->handle_advance_map(next_map, rctx); + pg->handle_advance_map(next_map, rctx); }); }).then([this, &rctx, pg] { pg->handle_activate_map(rctx); diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index a87ba49db27..5cf093972af 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -23,6 +23,9 @@ #include "crimson/osd/osdmap_service.h" #include "crimson/osd/state.h" #include "crimson/osd/shard_services.h" +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/pg_map.h" +#include "crimson/osd/osd_operations/peering_event.h" #include "osd/PeeringState.h" #include "osd/osd_types.h" @@ -152,100 +155,38 @@ private: seastar::future> handle_pg_create_info( std::unique_ptr info); - template - seastar::future<> handle_batch_pg_message_with_missing_handler( - const C &c, - F &&f, - G &&on_missing_pg) { - using mapped_type = const typename C::value_type &; - using event_type = std::optional>>; - return seastar::do_with( - PeeringCtx{}, - std::move(f), - std::move(on_missing_pg), - [this, &c] (auto &rctx, auto &f, auto &on_missing_pg) { - return seastar::parallel_for_each( - c, - [this, &rctx, &f, &on_missing_pg](mapped_type m) { - event_type result = f(m); - if (result) { - auto [pgid, event] = std::move(*result); - return do_peering_event_and_dispatch_transaction( - pgid, - std::move(event), - rctx).then([m, &on_missing_pg, &rctx] (bool found) { - if (!found) { - on_missing_pg(m, rctx); - } - return seastar::now(); - }); - } else { - return seastar::now(); - } - }).then([this, &rctx] { - return shard_services.dispatch_context(std::move(rctx)); - }); - }); - } - - template - seastar::future<> handle_batch_pg_message( - const C &c, - F &&f) { - return handle_batch_pg_message_with_missing_handler( - c, - std::move(f), - [](const typename C::value_type &, PeeringCtx &){}); - } - - seastar::future<> handle_pg_create(ceph::net::Connection *conn, - Ref m); seastar::future<> handle_osd_map(ceph::net::Connection* conn, Ref m); seastar::future<> handle_osd_op(ceph::net::Connection* conn, Ref m); seastar::future<> handle_pg_log(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_pg_notify(ceph::net::Connection* conn, - Ref m); - seastar::future<> handle_pg_info(ceph::net::Connection* conn, - Ref m); - seastar::future<> handle_pg_query(ceph::net::Connection* conn, - Ref m); seastar::future<> committed_osd_maps(version_t first, version_t last, Ref m); + void check_osdmap_features(); - // order the promises in descending order of the waited osdmap epoch, - // so we can access all the waiters expecting a map whose epoch is less - // than a given epoch - using waiting_peering_t = std::map, - std::greater>; - waiting_peering_t waiting_peering; - // wait for an osdmap whose epoch is greater or equal to given epoch - seastar::future wait_for_map(epoch_t epoch); + +public: + OSDMapGate osdmap_gate; + + ShardServices &get_shard_services() { + return shard_services; + } + seastar::future<> consume_map(epoch_t epoch); - std::map>> pgs_creating; - seastar::future> get_pg( +private: + PGMap pg_map; + +public: + blocking_future> get_or_create_pg( spg_t pgid, epoch_t epoch, std::unique_ptr info); - - seastar::future> do_peering_event( - spg_t pgid, - std::unique_ptr evt, - PeeringCtx &rctx); - seastar::future<> do_peering_event_and_dispatch( - spg_t pgid, - std::unique_ptr evt); - seastar::future do_peering_event_and_dispatch_transaction( - spg_t pgid, - std::unique_ptr evt, - PeeringCtx &rctx); + blocking_future> wait_for_pg( + spg_t pgid); seastar::future<> advance_pg_to(Ref pg, epoch_t to); bool should_restart() const; @@ -254,6 +195,7 @@ private: seastar::future<> send_beacon(); void update_heartbeat_peers(); + }; } diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h new file mode 100644 index 00000000000..25c72a88f1c --- /dev/null +++ b/src/crimson/osd/osd_connection_priv.h @@ -0,0 +1,25 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/peering_event.h" + +namespace ceph::osd { + +struct OSDConnectionPriv : public ceph::net::Connection::user_private_t { + ClientRequest::ConnectionPipeline client_request_conn_pipeline; + RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline; +}; + +static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) { + if (!conn->has_user_private()) { + conn->set_user_private(std::make_unique()); + } + return static_cast(conn->get_user_private()); +} + +} diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc new file mode 100644 index 00000000000..d65887d303d --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -0,0 +1,76 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "messages/MOSDOp.h" + +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +ClientRequest::ClientRequest( + OSD &osd, ceph::net::ConnectionRef conn, Ref &&m) + : osd(osd), conn(conn), m(m) +{} + +void ClientRequest::print(std::ostream &lhs) const +{ + lhs << *m; +} + +void ClientRequest::dump_detail(Formatter *f) const +{ +} + +ClientRequest::ConnectionPipeline &ClientRequest::cp() +{ + return get_osd_priv(conn.get()).client_request_conn_pipeline; +} + +ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) +{ + return pg.client_request_pg_pipeline; +} + +seastar::future<> ClientRequest::start() +{ + logger().debug("{}: start", *this); + + IRef ref = this; + with_blocking_future(handle.enter(cp().await_map)) + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(m->get_spg())); + }).then([this, ref=std::move(ref)](Ref pg) { + return seastar::do_with( + std::move(pg), std::move(ref), [this](auto pg, auto op) { + return with_blocking_future( + handle.enter(pp(*pg).await_map) + ).then([this, pg] { + return with_blocking_future( + pg->osdmap_gate.wait_for_map(m->get_map_epoch())); + }).then([this, pg] (auto) { + return with_blocking_future(handle.enter(pp(*pg).process)); + }).then([this, pg] { + return pg->handle_op(conn.get(), std::move(m)); + }); + }); + }); + return seastar::make_ready_future(); +} + +} diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h new file mode 100644 index 00000000000..91bcfd3303a --- /dev/null +++ b/src/crimson/osd/osd_operations/client_request.h @@ -0,0 +1,55 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/common/type_helpers.h" + +class MOSDOp; + +namespace ceph::osd { +class PG; +class OSD; + +class ClientRequest final : public OperationT { + OSD &osd; + ceph::net::ConnectionRef conn; + Ref m; + OrderedPipelinePhase::Handle handle; + +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "ClientRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "ClientRequest::ConnectionPipeline::get_pg" + }; + friend class ClientRequest; + }; + class PGPipeline { + OrderedPipelinePhase await_map = { + "ClientRequest::PGPipeline::await_map" + }; + OrderedPipelinePhase process = { + "ClientRequest::PGPipeline::process" + }; + friend class ClientRequest; + }; + + static constexpr OperationTypeCode type = OperationTypeCode::client_request; + + ClientRequest(OSD &osd, ceph::net::ConnectionRef, Ref &&m); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); + +private: + ConnectionPipeline &cp(); + PGPipeline &pp(PG &pg); +}; + +} diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc new file mode 100644 index 00000000000..f1abf29c59d --- /dev/null +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -0,0 +1,321 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "osd/PeeringState.h" + +#include "messages/MOSDPGInfo.h" +#include "messages/MOSDPGNotify.h" +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGCreate2.h" + +#include "common/Formatter.h" + +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_operations/compound_peering_request.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace { +using namespace ceph::osd; + +struct compound_state { + seastar::promise promise; + BufferedRecoveryMessages ctx; + ~compound_state() { + promise.set_value(std::move(ctx)); + } +}; +using compound_state_ref = seastar::lw_shared_ptr; + +class PeeringSubEvent : public RemotePeeringEvent { + compound_state_ref state; +public: + template + PeeringSubEvent(compound_state_ref state, Args &&... args) : + RemotePeeringEvent(std::forward(args)...), state(state) {} + + seastar::future<> complete_rctx(Ref pg) final { + logger().debug("{}: submitting ctx transaction", *this); + state->ctx.accept_buffered_messages(ctx); + state = {}; + if (!pg) { + ceph_assert(ctx.transaction.empty()); + return seastar::now(); + } else { + return osd.get_shard_services().dispatch_context_transaction( + pg->get_collection_ref(), ctx); + } + } +}; + +std::vector handle_pg_create( + OSD &osd, + ceph::net::ConnectionRef conn, + compound_state_ref state, + Ref m) +{ + std::vector ret; + for (auto &p : m->pgs) { + const spg_t &pgid = p.first; + const auto &[created, created_stamp] = p.second; + auto q = m->pg_extra.find(pgid); + ceph_assert(q != m->pg_extra.end()); + logger().debug( + "{}, {} {} e{} @{} history {} pi {}", + __func__, + pgid, + created, + created_stamp, + q->second.first, + q->second.second); + if (!q->second.second.empty() && + m->epoch < q->second.second.get_bounds().second) { + logger().error( + "got pg_create on {} epoch {} unmatched past_intervals (history {})", + pgid, + m->epoch, + q->second.second, + q->second.first); + } else { + auto op = osd.get_shard_services().start_operation( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(), + pgid, + m->epoch, + m->epoch, + NullEvt(), + true, + new PGCreateInfo( + pgid, + m->epoch, + q->second.first, + q->second.second, + true)); + } + } + return ret; +} + +std::vector handle_pg_notify( + OSD &osd, + ceph::net::ConnectionRef conn, + compound_state_ref state, + Ref m) +{ + std::vector ret; + ret.reserve(m->get_pg_list().size()); + const int from = m->get_source().num(); + for (auto &p : m->get_pg_list()) { + auto& [pg_notify, past_intervals] = p; + spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; + MNotifyRec notify{pgid, + pg_shard_t{from, pg_notify.from}, + pg_notify, + 0, // the features is not used + past_intervals}; + logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from); + auto create_info = new PGCreateInfo{ + pgid, + pg_notify.query_epoch, + pg_notify.info.history, + past_intervals, + false}; + auto op = osd.get_shard_services().start_operation( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(from, pg_notify.from), + pgid, + pg_notify.epoch_sent, + pg_notify.query_epoch, + notify, + true, // requires_pg + create_info); + op->start(); + ret.push_back(op); + } + return ret; +} + +std::vector handle_pg_info( + OSD &osd, + ceph::net::ConnectionRef conn, + compound_state_ref state, + Ref m) +{ + std::vector ret; + ret.reserve(m->pg_list.size()); + const int from = m->get_source().num(); + for (auto &p : m->pg_list) { + auto& pg_notify = p.first; + spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; + logger().debug("handle_pg_info on {} from {}", pgid.pgid, from); + MInfoRec info{pg_shard_t{from, pg_notify.from}, + pg_notify.info, + pg_notify.epoch_sent}; + auto op = osd.get_shard_services().start_operation( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(from, pg_notify.from), + pgid, + pg_notify.epoch_sent, + pg_notify.query_epoch, + std::move(info)); + ret.push_back(op); + } + return ret; +} + +class QuerySubEvent : public PeeringSubEvent { +public: + template + QuerySubEvent(Args &&... args) : + PeeringSubEvent(std::forward(args)...) {} + + void on_pg_absent() final { + logger().debug("handle_pg_query on absent pg {} from {}", pgid, from); + pg_info_t empty(pgid); + ctx.notify_list[from.osd].emplace_back( + pg_notify_t( + from.shard, pgid.shard, + evt.get_epoch_sent(), + osd.get_shard_services().get_osdmap()->get_epoch(), + empty), + PastIntervals()); + } +}; + +std::vector handle_pg_query( + OSD &osd, + ceph::net::ConnectionRef conn, + compound_state_ref state, + Ref m) +{ + std::vector ret; + ret.reserve(m->pg_list.size()); + const int from = m->get_source().num(); + for (auto &p : m->pg_list) { + auto& [pgid, pg_query] = p; + MQuery query{pgid, pg_shard_t{from, pg_query.from}, + pg_query, pg_query.epoch_sent}; + logger().debug("handle_pg_query on {} from {}", pgid, from); + auto op = osd.get_shard_services().start_operation( + state, + osd, + conn, + osd.get_shard_services(), + pg_shard_t(from, pg_query.from), + pgid, + pg_query.epoch_sent, + pg_query.epoch_sent, + std::move(query)); + ret.push_back(op); + } + return ret; +} + +struct SubOpBlocker : BlockerT { + static constexpr const char * type_name = "CompoundOpBlocker"; + + std::vector subops; + SubOpBlocker(std::vector &&subops) : subops(subops) {} + + virtual void dump_detail(Formatter *f) const { + f->open_array_section("dependent_operations"); + { + for (auto &i : subops) { + i->dump_brief(f); + } + } + f->close_section(); + } +}; + +} // namespace + +namespace ceph::osd { + +CompoundPeeringRequest::CompoundPeeringRequest( + OSD &osd, ceph::net::ConnectionRef conn, Ref m) + : osd(osd), + conn(conn), + m(m) +{} + +void CompoundPeeringRequest::print(std::ostream &lhs) const +{ + lhs << *m; +} + +void CompoundPeeringRequest::dump_detail(Formatter *f) const +{ + f->dump_stream("message") << *m; +} + +seastar::future<> CompoundPeeringRequest::start() +{ + logger().info("{}: starting", *this); + auto state = seastar::make_lw_shared(); + auto blocker = std::make_unique( + [&] { + switch (m->get_type()) { + case MSG_OSD_PG_CREATE2: + return handle_pg_create( + osd, + conn, + state, + boost::static_pointer_cast(m)); + case MSG_OSD_PG_NOTIFY: + return handle_pg_notify( + osd, + conn, + state, + boost::static_pointer_cast(m)); + case MSG_OSD_PG_INFO: + return handle_pg_info( + osd, + conn, + state, + boost::static_pointer_cast(m)); + case MSG_OSD_PG_QUERY: + return handle_pg_query( + osd, + conn, + state, + boost::static_pointer_cast(m)); + default: + ceph_assert("Invalid message type" == 0); + return std::vector(); + } + }()); + + add_blocker(blocker.get()); + IRef ref = this; + logger().info("{}: about to fork future", *this); + state->promise.get_future().then( + [this, blocker=std::move(blocker)](auto &&ctx) { + clear_blocker(blocker.get()); + logger().info("{}: sub events complete", *this); + return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); + }).then([this, ref=std::move(ref)] { + logger().info("{}: complete", *this); + }); + + logger().info("{}: forked, returning", *this); + return seastar::now(); +} + +} // namespace ceph::osd diff --git a/src/crimson/osd/osd_operations/compound_peering_request.h b/src/crimson/osd/osd_operations/compound_peering_request.h new file mode 100644 index 00000000000..ac901f83530 --- /dev/null +++ b/src/crimson/osd/osd_operations/compound_peering_request.h @@ -0,0 +1,40 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "msg/MessageRef.h" + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" + +namespace ceph::osd { + +class OSD; +class PG; + +using osd_id_t = int; + +class CompoundPeeringRequest : public OperationT { +public: + static constexpr OperationTypeCode type = + OperationTypeCode::compound_peering_request; + +private: + OSD &osd; + ceph::net::ConnectionRef conn; + Ref m; + +public: + CompoundPeeringRequest( + OSD &osd, ceph::net::ConnectionRef conn, Ref m); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); +}; + +} diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc new file mode 100644 index 00000000000..6112d1fb4c3 --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "crimson/osd/pg.h" +#include "crimson/osd/osd.h" +#include "common/Formatter.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_connection_priv.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +void PeeringEvent::print(std::ostream &lhs) const +{ + lhs << "PeeringEvent(" + << "from=" << from + << " pgid=" << pgid + << " sent=" << evt.get_epoch_sent() + << " requested=" << evt.get_epoch_requested() + << " evt=" << evt.get_desc() + << ")"; +} + +void PeeringEvent::dump_detail(Formatter *f) const +{ + f->open_object_section("PeeringEvent"); + f->dump_stream("from") << from; + f->dump_stream("pgid") << pgid; + f->dump_int("sent", evt.get_epoch_sent()); + f->dump_int("requested", evt.get_epoch_requested()); + f->dump_string("evt", evt.get_desc()); + f->close_section(); +} + + +PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg) +{ + return pg.peering_request_pg_pipeline; +} + +seastar::future<> PeeringEvent::start() +{ + + logger().debug("{}: start", *this); + + IRef ref = this; + get_pg().then([this](Ref pg) { + if (!pg) { + logger().debug("{}: pg absent, did not create", *this); + on_pg_absent(); + handle.exit(); + return complete_rctx(pg); + } else { + logger().debug("{}: pg present", *this); + return with_blocking_future(handle.enter(pp(*pg).await_map) + ).then([this, pg] { + return with_blocking_future( + pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); + }).then([this, pg](auto) { + return with_blocking_future(handle.enter(pp(*pg).process)); + }).then([this, pg] { + pg->do_peering_event(evt, ctx); + handle.exit(); + return complete_rctx(pg); + }); + } + }).then([this, ref=std::move(ref)] { + logger().debug("{}: complete", *this); + }); + return seastar::make_ready_future(); +} + +void PeeringEvent::on_pg_absent() +{ + logger().debug("{}: pg absent, dropping", *this); +} + +seastar::future<> PeeringEvent::complete_rctx(Ref pg) +{ + logger().debug("{}: submitting ctx", *this); + return shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(ctx)); +} + +RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp() +{ + return get_osd_priv(conn.get()).peering_request_conn_pipeline; +} + +seastar::future> RemotePeeringEvent::get_pg() { + return with_blocking_future( + handle.enter(cp().await_map) + ).then([this] { + return with_blocking_future( + osd.osdmap_gate.wait_for_map(evt.get_epoch_sent())); + }).then([this](auto epoch) { + logger().debug("{}: got map {}", *this, epoch); + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future( + osd.get_or_create_pg( + pgid, evt.get_epoch_sent(), std::move(evt.create_info))); + }); +} + +seastar::future> LocalPeeringEvent::get_pg() { + return seastar::make_ready_future>(pg); +} + +LocalPeeringEvent::~LocalPeeringEvent() {} + +} diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h new file mode 100644 index 00000000000..995df4b5b27 --- /dev/null +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -0,0 +1,123 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" +#include "osd/PGPeeringEvent.h" +#include "osd/PeeringState.h" + +namespace ceph::osd { + +class OSD; +class ShardServices; +class PG; + +class PeeringEvent : public OperationT { +public: + static constexpr OperationTypeCode type = OperationTypeCode::peering_event; + + class PGPipeline { + OrderedPipelinePhase await_map = { + "PeeringEvent::PGPipeline::await_map" + }; + OrderedPipelinePhase process = { + "PeeringEvent::PGPipeline::process" + }; + friend class PeeringEvent; + }; + +protected: + OrderedPipelinePhase::Handle handle; + PGPipeline &pp(PG &pg); + + ShardServices &shard_services; + PeeringCtx ctx; + pg_shard_t from; + spg_t pgid; + PGPeeringEvent evt; + + const pg_shard_t get_from() const { + return from; + } + + const spg_t get_pgid() const { + return pgid; + } + + const PGPeeringEvent &get_event() const { + return evt; + } + + virtual void on_pg_absent(); + virtual seastar::future<> complete_rctx(Ref); + virtual seastar::future> get_pg() = 0; + +public: + template + PeeringEvent( + ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid, + Args&&... args) : + shard_services(shard_services), + from(from), + pgid(pgid), + evt(std::forward(args)...) + {} + + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); +}; + +class RemotePeeringEvent : public PeeringEvent { +protected: + OSD &osd; + ceph::net::ConnectionRef conn; + + seastar::future> get_pg() final; + +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "PeeringRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "PeeringRequest::ConnectionPipeline::get_pg" + }; + friend class RemotePeeringEvent; + }; + + template + RemotePeeringEvent(OSD &osd, ceph::net::ConnectionRef conn, Args&&... args) : + PeeringEvent(std::forward(args)...), + osd(osd), + conn(conn) + {} + +private: + ConnectionPipeline &cp(); +}; + +class LocalPeeringEvent final : public PeeringEvent { +protected: + seastar::future> get_pg() final; + + Ref pg; + +public: + template + LocalPeeringEvent(Ref pg, Args&&... args) : + PeeringEvent(std::forward(args)...), + pg(pg) + {} + + virtual ~LocalPeeringEvent(); +}; + + +} diff --git a/src/crimson/osd/osdmap_gate.cc b/src/crimson/osd/osdmap_gate.cc new file mode 100644 index 00000000000..f83743419a1 --- /dev/null +++ b/src/crimson/osd/osdmap_gate.cc @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/osdmap_gate.h" +#include "crimson/osd/shard_services.h" +#include "common/Formatter.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +void OSDMapGate::OSDMapBlocker::dump_detail(Formatter *f) const +{ + f->open_object_section("OSDMapGate"); + f->dump_int("epoch", epoch); + f->close_section(); +} + +blocking_future OSDMapGate::wait_for_map(epoch_t epoch) +{ + if (current >= epoch) { + return make_ready_blocking_future(current); + } else { + logger().info("evt epoch is {}, i have {}, will wait", epoch, current); + auto &blocker = waiting_peering.emplace( + epoch, make_pair(blocker_type, epoch)).first->second; + auto fut = blocker.promise.get_shared_future(); + if (shard_services) { + return blocker.make_blocking_future( + (*shard_services).get().osdmap_subscribe(current, true).then( + [fut=std::move(fut)]() mutable { + return std::move(fut); + })); + } else { + return blocker.make_blocking_future(std::move(fut)); + } + } +} + +void OSDMapGate::got_map(epoch_t epoch) { + current = epoch; + auto first = waiting_peering.begin(); + auto last = waiting_peering.upper_bound(epoch); + std::for_each(first, last, [epoch, this](auto& blocked_requests) { + blocked_requests.second.promise.set_value(epoch); + }); + waiting_peering.erase(first, last); +} + +} diff --git a/src/crimson/osd/osdmap_gate.h b/src/crimson/osd/osdmap_gate.h new file mode 100644 index 00000000000..073ce843f95 --- /dev/null +++ b/src/crimson/osd/osdmap_gate.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include + +#include +#include + +#include "include/types.h" +#include "crimson/osd/osd_operation.h" + +namespace ceph { + class Formatter; + namespace osd { + class ShardServices; + } +} + +namespace ceph::osd { + +class OSDMapGate { + struct OSDMapBlocker : public Blocker { + const char * type_name; + epoch_t epoch; + + OSDMapBlocker(std::pair args) + : type_name(args.first), epoch(args.second) {} + + OSDMapBlocker(const OSDMapBlocker &) = delete; + OSDMapBlocker(OSDMapBlocker &&) = delete; + OSDMapBlocker &operator=(const OSDMapBlocker &) = delete; + OSDMapBlocker &operator=(OSDMapBlocker &&) = delete; + + seastar::shared_promise promise; + + void dump_detail(Formatter *f) const final; + const char *get_type_name() const final { + return type_name; + } + }; + + // order the promises in descending order of the waited osdmap epoch, + // so we can access all the waiters expecting a map whose epoch is less + // than a given epoch + using waiting_peering_t = std::map>; + const char *blocker_type; + waiting_peering_t waiting_peering; + epoch_t current = 0; + std::optional> shard_services; +public: + OSDMapGate( + const char *blocker_type, + std::optional> shard_services) + : blocker_type(blocker_type), shard_services(shard_services) {} + + // wait for an osdmap whose epoch is greater or equal to given epoch + blocking_future wait_for_map(epoch_t epoch); + void got_map(epoch_t epoch); +}; + +} diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 3c83724bd40..d78f94403d9 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -24,15 +24,18 @@ #include "osd/OSDMap.h" +#include "os/Transaction.h" + #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" -#include "crimson/os/futurized_store.h" #include "os/Transaction.h" +#include "crimson/os/cyan_store.h" + #include "crimson/osd/exceptions.h" #include "crimson/osd/pg_meta.h" - -#include "pg_backend.h" +#include "crimson/osd/pg_backend.h" +#include "crimson/osd/osd_operations/peering_event.h" namespace { seastar::logger& logger() { @@ -72,6 +75,7 @@ PG::PG( pg_whoami{pg_shard}, coll_ref(shard_services.get_store().open_collection(coll)), pgmeta_oid{pgid.make_pgmeta_oid()}, + osdmap_gate("PG::osdmap_gate", std::nullopt), shard_services{shard_services}, osdmap{osdmap}, backend( @@ -98,27 +102,26 @@ PG::PG( peering_state.set_backend_predicates( new ReadablePredicate(pg_whoami), new RecoverablePredicate()); + osdmap_gate.got_map(osdmap->get_epoch()); } +PG::~PG() {} + bool PG::try_flush_or_schedule_async() { -// FIXME once there's a good way to schedule an "async" peering event -#if 0 shard_services.get_store().do_transaction( coll_ref, ObjectStore::Transaction()).then( - [this, epoch=peering_state.get_osdmap()->get_epoch()](){ - if (!peering_state.pg_has_reset_since(epoch)) { - PeeringCtx rctx; - auto evt = PeeringState::IntervalFlush(); - do_peering_event(evt, rctx); - return shard_services.dispatch_context(std::move(rctx)); - } else { - return seastar::now(); - } + [this, epoch=peering_state.get_osdmap()->get_epoch()]() { + return shard_services.start_operation( + this, + shard_services, + pg_whoami, + pgid, + epoch, + epoch, + PeeringState::IntervalFlush()); }); return false; -#endif - return true; } void PG::log_state_enter(const char *state) { @@ -232,6 +235,7 @@ void PG::handle_advance_map( newacting, acting_primary, rctx); + osdmap_gate.got_map(next_map->get_epoch()); } void PG::handle_activate_map(PeeringCtx &rctx) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 21d1cb56ac0..7c8e4518d5d 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -14,12 +14,16 @@ #include "common/dout.h" #include "crimson/net/Fwd.h" #include "os/Transaction.h" -#include "crimson/osd/shard_services.h" #include "osd/osd_types.h" #include "osd/osd_internal_types.h" #include "osd/PeeringState.h" #include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/shard_services.h" +#include "crimson/osd/osdmap_gate.h" + class OSDMap; class MQuery; class PGBackend; @@ -36,6 +40,9 @@ namespace ceph::os { class FuturizedStore; } +namespace ceph::osd { +class ClientRequest; + class PG : public boost::intrusive_ref_counter< PG, boost::thread_unsafe_counter>, @@ -45,6 +52,9 @@ class PG : public boost::intrusive_ref_counter< using ec_profile_t = std::map; using cached_map_t = boost::local_shared_ptr; + ClientRequest::PGPipeline client_request_pg_pipeline; + PeeringEvent::PGPipeline peering_request_pg_pipeline; + spg_t pgid; pg_shard_t pg_whoami; coll_t coll; @@ -56,9 +66,11 @@ public: pg_pool_t&& pool, std::string&& name, cached_map_t osdmap, - ceph::osd::ShardServices &shard_services, + ShardServices &shard_services, ec_profile_t profile); + ~PG(); + // EpochSource epoch_t get_osdmap_epoch() const final { return peering_state.get_osdmap_epoch(); @@ -187,7 +199,7 @@ public: t.register_on_commit( new LambdaContext([this, on_commit](){ PeeringCtx rctx; - do_peering_event(on_commit, rctx); + do_peering_event(*on_commit, rctx); shard_services.dispatch_context(std::move(rctx)); })); } @@ -389,16 +401,6 @@ public: void do_peering_event( PGPeeringEvent& evt, PeeringCtx &rctx); - void do_peering_event( - std::unique_ptr evt, - PeeringCtx &rctx) { - return do_peering_event(*evt, rctx); - } - void do_peering_event( - PGPeeringEventRef evt, - PeeringCtx &rctx) { - return do_peering_event(*evt, rctx); - } void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx); void handle_activate_map(PeeringCtx &rctx); @@ -421,6 +423,7 @@ private: uint64_t limit); private: + OSDMapGate osdmap_gate; ShardServices &shard_services; cached_map_t osdmap; @@ -432,6 +435,10 @@ private: seastar::future<> wait_for_active(); friend std::ostream& operator<<(std::ostream&, const PG& pg); + friend class ClientRequest; + friend class PeeringEvent; }; std::ostream& operator<<(std::ostream&, const PG& pg); + +} diff --git a/src/crimson/osd/pg_map.cc b/src/crimson/osd/pg_map.cc new file mode 100644 index 00000000000..536ba9980e5 --- /dev/null +++ b/src/crimson/osd/pg_map.cc @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/pg_map.h" + +#include "crimson/osd/pg.h" +#include "common/Formatter.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +PGMap::PGCreationState::PGCreationState(spg_t pgid) : pgid(pgid) {} +PGMap::PGCreationState::~PGCreationState() {} + +void PGMap::PGCreationState::dump_detail(Formatter *f) const +{ + f->dump_stream("pgid") << pgid; + f->dump_bool("creating", creating); +} + +std::pair>, bool> PGMap::get_pg(spg_t pgid, bool wait) +{ + if (auto pg = pgs.find(pgid); pg != pgs.end()) { + return make_pair(make_ready_blocking_future>(pg->second), true); + } else if (!wait) { + return make_pair(make_ready_blocking_future>(nullptr), true); + } else { + auto &state = pgs_creating.emplace(pgid, pgid).first->second; + return make_pair( + state.make_blocking_future(state.promise.get_shared_future()), + state.creating); + } +} + +void PGMap::set_creating(spg_t pgid) +{ + logger().debug("Creating {}", pgid); + ceph_assert(pgs.count(pgid) == 0); + auto pg = pgs_creating.find(pgid); + ceph_assert(pg != pgs_creating.end()); + ceph_assert(pg->second.creating == false); + pg->second.creating = true; +} + +void PGMap::pg_created(spg_t pgid, Ref pg) +{ + logger().debug("Created {}", pgid); + ceph_assert(!pgs.count(pgid)); + pgs.emplace(pgid, pg); + + auto state = pgs_creating.find(pgid); + ceph_assert(state != pgs_creating.end()); + state->second.promise.set_value(pg); + pgs_creating.erase(pgid); +} + +void PGMap::pg_loaded(spg_t pgid, Ref pg) +{ + ceph_assert(!pgs.count(pgid)); + pgs.emplace(pgid, pg); +} + +PGMap::~PGMap() {} + +} diff --git a/src/crimson/osd/pg_map.h b/src/crimson/osd/pg_map.h new file mode 100644 index 00000000000..8b4086efd88 --- /dev/null +++ b/src/crimson/osd/pg_map.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include + +#include +#include + +#include "include/types.h" +#include "crimson/common/type_helpers.h" +#include "crimson/osd/osd_operation.h" +#include "osd/osd_types.h" + +namespace ceph::osd { +class PG; + +class PGMap { + struct PGCreationState : BlockerT { + static constexpr const char * type_name = "PGCreation"; + + void dump_detail(Formatter *f) const final; + + spg_t pgid; + seastar::shared_promise> promise; + bool creating = false; + PGCreationState(spg_t pgid); + + PGCreationState(const PGCreationState &) = delete; + PGCreationState(PGCreationState &&) = delete; + PGCreationState &operator=(const PGCreationState &) = delete; + PGCreationState &operator=(PGCreationState &&) = delete; + + ~PGCreationState(); + }; + + std::map pgs_creating; + std::map> pgs; + +public: + /** + * Get future for pg with a bool indicating whether it's already being + * created. + */ + std::pair>, bool> get_pg(spg_t pgid, bool wait=true); + + /** + * Set creating + */ + void set_creating(spg_t pgid); + + /** + * Set newly created pg + */ + void pg_created(spg_t pgid, Ref pg); + + /** + * Add newly loaded pg + */ + void pg_loaded(spg_t pgid, Ref pg); + + decltype(pgs) &get_pgs() { return pgs; } + + PGMap() = default; + ~PGMap(); +}; + +} diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 9d4c9984205..f5502e92721 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -36,7 +36,8 @@ ShardServices::ShardServices( public_msgr(public_msgr), monc(monc), mgrc(mgrc), - store(store) { + store(store) +{ perf = build_osd_logger(&cct); cct.get_perfcounters_collection()->add(perf); @@ -67,7 +68,7 @@ seastar::future<> ShardServices::dispatch_context_transaction( } seastar::future<> ShardServices::dispatch_context_messages( - PeeringCtx &ctx) + BufferedRecoveryMessages &&ctx) { auto ret = seastar::when_all_succeed( seastar::parallel_for_each(std::move(ctx.notify_list), @@ -105,13 +106,9 @@ seastar::future<> ShardServices::dispatch_context( PeeringCtx &&ctx) { ceph_assert(col || ctx.transaction.empty()); - return seastar::do_with( - PeeringCtx{ctx}, - [this, col](auto& todo) { - return seastar::when_all_succeed( - dispatch_context_messages(todo), - col ? dispatch_context_transaction(col, todo) : seastar::now()); - }); + return seastar::when_all_succeed( + dispatch_context_messages(BufferedRecoveryMessages(ctx)), + col ? dispatch_context_transaction(col, ctx) : seastar::now()); } void ShardServices::queue_want_pg_temp(pg_t pgid, @@ -190,6 +187,16 @@ void ShardServices::send_pg_temp() _sent_pg_temp(); } +void ShardServices::update_map(cached_map_t new_osdmap) +{ + osdmap = std::move(new_osdmap); +} + +ShardServices::cached_map_t &ShardServices::get_osdmap() +{ + return osdmap; +} + seastar::future<> ShardServices::send_pg_created(pg_t pgid) { logger().debug(__func__); @@ -227,4 +234,15 @@ void ShardServices::prune_pg_created() } } +seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request) +{ + logger().info("{}({})", __func__, epoch); + if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc.renew_subs(); + } else { + return seastar::now(); + } +} + }; diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index fa3c72b1b0b..9d04ba2e9dc 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -6,6 +6,7 @@ #include #include +#include "osd_operation.h" #include "msg/MessageRef.h" #include "crimson/os/cyan_collection.h" @@ -28,6 +29,7 @@ namespace ceph::os { class PerfCounters; class OSDMap; class PeeringCtx; +class BufferedRecoveryMessages; namespace ceph::osd { @@ -68,6 +70,16 @@ public: return &cct; } + // Op Tracking + OperationRegistry registry; + + template + typename T::IRef start_operation(Args&&... args) { + auto op = registry.create_operation(std::forward(args)...); + op->start(); + return op; + } + // Loggers PerfCounters &get_recoverystate_perf_logger() { return *recoverystate_perf; @@ -82,7 +94,7 @@ public: /// Dispatch and reset ctx messages seastar::future<> dispatch_context_messages( - PeeringCtx &ctx); + BufferedRecoveryMessages &&ctx); /// Dispatch ctx and dispose of context seastar::future<> dispatch_context( @@ -118,12 +130,8 @@ public: private: cached_map_t osdmap; public: - void update_map(cached_map_t new_osdmap) { - osdmap = std::move(new_osdmap); - } - cached_map_t &get_osdmap() { - return osdmap; - } + void update_map(cached_map_t new_osdmap); + cached_map_t &get_osdmap(); // PG Created State private: @@ -132,6 +140,8 @@ public: seastar::future<> send_pg_created(pg_t pgid); seastar::future<> send_pg_created(); void prune_pg_created(); + + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); }; -- 2.39.5