From: Kefu Chai Date: Fri, 13 Sep 2019 07:18:58 +0000 (+0800) Subject: crimson/osd: use single-pg peering ops X-Git-Tag: v15.1.0~1561^2~1 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=260ab1a5f27c1b7c01009be0811e9a6cd5a4fac6;p=ceph-ci.git crimson/osd: use single-pg peering ops classic OSD started to use single-pg peering ops since ce05c172, and it switched over to these ops since octopus. and it's assumed that crimson-osd won't be GA until octopus+2 release, so it's not needed to support pre-octopus releases. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index b0586d2f02d..e846c978276 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -488,16 +488,19 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m) case CEPH_MSG_OSD_OP: return handle_osd_op(conn, boost::static_pointer_cast(m)); case MSG_OSD_PG_CREATE2: - case MSG_OSD_PG_NOTIFY: - case MSG_OSD_PG_INFO: - case MSG_OSD_PG_QUERY: shard_services.start_operation( *this, conn->get_shared(), m); return seastar::now(); + case MSG_OSD_PG_NOTIFY2: + [[fallthrough]]; + case MSG_OSD_PG_INFO2: + [[fallthrough]]; + case MSG_OSD_PG_QUERY2: + [[fallthrough]]; case MSG_OSD_PG_LOG: - return handle_pg_log(conn, boost::static_pointer_cast(m)); + return handle_peering_op(conn, boost::static_pointer_cast(m)); case MSG_OSD_REPOP: return handle_rep_op(conn, boost::static_pointer_cast(m)); case MSG_OSD_REPOPREPLY: @@ -974,18 +977,18 @@ void OSD::update_heartbeat_peers() heartbeat->update_peers(whoami); } -seastar::future<> OSD::handle_pg_log( +seastar::future<> OSD::handle_peering_op( ceph::net::Connection* conn, - Ref m) + Ref m) { const int from = m->get_source().num(); - logger().debug("handle_pg_log on {} from {}", m->get_spg(), from); + logger().debug("handle_peering_op on {} from {}", m->get_spg(), from); shard_services.start_operation( *this, conn->get_shared(), shard_services, - pg_shard_t(from, m->from), - spg_t(m->info.pgid.pgid, m->to), + pg_shard_t{from, m->get_spg().shard}, + m->get_spg(), std::move(*m->get_event())); return seastar::now(); } diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 0377edfd9dc..5c7b5c5242f 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -169,8 +169,8 @@ private: Ref m); seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_pg_log(ceph::net::Connection* conn, - Ref m); + seastar::future<> handle_peering_op(ceph::net::Connection* conn, + Ref m); seastar::future<> committed_osd_maps(version_t first, version_t last, diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc index 43d715e8c04..7c38be2545e 100644 --- a/src/crimson/osd/osd_operations/compound_peering_request.cc +++ b/src/crimson/osd/osd_operations/compound_peering_request.cc @@ -5,8 +5,6 @@ #include "osd/PeeringState.h" -#include "messages/MOSDPGInfo.h" -#include "messages/MOSDPGNotify.h" #include "messages/MOSDPGQuery.h" #include "messages/MOSDPGCreate2.h" @@ -28,6 +26,11 @@ using namespace ceph::osd; struct compound_state { seastar::promise promise; BufferedRecoveryMessages ctx; + compound_state() + // assuming crimson-osd won't need to be compatible with pre-octopus + // releases + : ctx{ceph_release_t::octopus} + {} ~compound_state() { promise.set_value(std::move(ctx)); } @@ -107,75 +110,6 @@ std::vector handle_pg_create( return ret; } -std::vector handle_pg_notify( - OSD &osd, - ceph::net::ConnectionRef conn, - compound_state_ref state, - Ref m) -{ - std::vector ret; - ret.reserve(m->get_pg_list().size()); - const int from = m->get_source().num(); - for (auto& pg_notify : m->get_pg_list()) { - spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; - MNotifyRec notify{pgid, - pg_shard_t{from, pg_notify.from}, - pg_notify, - 0}; // the features is not used - logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from); - auto create_info = new PGCreateInfo{ - pgid, - pg_notify.query_epoch, - pg_notify.info.history, - pg_notify.past_intervals, - false}; - auto op = osd.get_shard_services().start_operation( - state, - osd, - conn, - osd.get_shard_services(), - pg_shard_t(from, pg_notify.from), - pgid, - pg_notify.epoch_sent, - pg_notify.query_epoch, - notify, - true, // requires_pg - create_info).first; - ret.push_back(op); - } - return ret; -} - -std::vector handle_pg_info( - OSD &osd, - ceph::net::ConnectionRef conn, - compound_state_ref state, - Ref m) -{ - std::vector ret; - ret.reserve(m->pg_list.size()); - const int from = m->get_source().num(); - for (auto& pg_notify : m->pg_list) { - spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; - logger().debug("handle_pg_info on {} from {}", pgid.pgid, from); - MInfoRec info{pg_shard_t{from, pg_notify.from}, - pg_notify.info, - pg_notify.epoch_sent}; - auto op = osd.get_shard_services().start_operation( - state, - osd, - conn, - osd.get_shard_services(), - pg_shard_t(from, pg_notify.from), - pgid, - pg_notify.epoch_sent, - pg_notify.query_epoch, - std::move(info)).first; - ret.push_back(op); - } - return ret; -} - class QuerySubEvent : public PeeringSubEvent { public: template @@ -185,7 +119,7 @@ public: void on_pg_absent() final { logger().debug("handle_pg_query on absent pg {} from {}", pgid, from); pg_info_t empty(pgid); - ctx.notify_list[from.osd].emplace_back( + ctx.send_notify(from.osd, pg_notify_t( from.shard, pgid.shard, evt.get_epoch_sent(), @@ -275,18 +209,6 @@ seastar::future<> CompoundPeeringRequest::start() conn, state, boost::static_pointer_cast(m)); - case MSG_OSD_PG_NOTIFY: - return handle_pg_notify( - osd, - conn, - state, - boost::static_pointer_cast(m)); - case MSG_OSD_PG_INFO: - return handle_pg_info( - osd, - conn, - state, - boost::static_pointer_cast(m)); case MSG_OSD_PG_QUERY: return handle_pg_query( osd, diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index 2d3061c3252..1408709b0ab 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -18,10 +18,6 @@ namespace { namespace ceph::osd { -PGAdvanceMap::PGAdvanceMap( - OSD &osd, Ref pg, epoch_t from, epoch_t to) - : osd(osd), pg(pg), from(from), to(to), do_init(false) {} - PGAdvanceMap::PGAdvanceMap( OSD &osd, Ref pg, epoch_t from, epoch_t to, PeeringCtx &&rctx, bool do_init) diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 7225ca9de81..bed61558f96 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -33,8 +33,6 @@ protected: const bool do_init; public: - PGAdvanceMap( - OSD &osd, Ref pg, epoch_t from, epoch_t to); PGAdvanceMap( OSD &osd, Ref pg, epoch_t from, epoch_t to, PeeringCtx &&rctx, bool do_init); diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index db3add0a05f..84d8277d41e 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -77,34 +77,15 @@ seastar::future<> ShardServices::dispatch_context_transaction( seastar::future<> ShardServices::dispatch_context_messages( BufferedRecoveryMessages &&ctx) { - auto ret = seastar::when_all_succeed( - seastar::parallel_for_each(std::move(ctx.notify_list), - [this](auto& osd_notifies) { - auto& [peer, notifies] = osd_notifies; - auto m = make_message(osdmap->get_epoch(), - std::move(notifies)); - logger().debug("dispatch_context_messages sending notify to {}", peer); - return send_to_osd(peer, m, osdmap->get_epoch()); - }), - seastar::parallel_for_each(std::move(ctx.query_map), - [this](auto& osd_queries) { - auto& [peer, queries] = osd_queries; - auto m = make_message(osdmap->get_epoch(), - std::move(queries)); - logger().debug("dispatch_context_messages sending query to {}", peer); - return send_to_osd(peer, m, osdmap->get_epoch()); - }), - seastar::parallel_for_each(std::move(ctx.info_map), - [this](auto& osd_infos) { - auto& [peer, infos] = osd_infos; - auto m = make_message(osdmap->get_epoch(), - std::move(infos)); - logger().debug("dispatch_context_messages sending info to {}", peer); - return send_to_osd(peer, m, osdmap->get_epoch()); - })); - ctx.notify_list.clear(); - ctx.query_map.clear(); - ctx.info_map.clear(); + auto ret = seastar::parallel_for_each(std::move(ctx.message_map), + [this](auto& osd_messages) { + auto& [peer, messages] = osd_messages; + logger().debug("dispatch_context_messages sending messages to {}", peer); + return seastar::parallel_for_each(std::move(messages), [=](auto& m) { + return send_to_osd(peer, m, osdmap->get_epoch()); + }); + }); + ctx.message_map.clear(); return ret; }