From a7a52efffda1892dbc757c31a97c7e3c158b9e1b Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 15 Jul 2019 21:17:56 +0800 Subject: [PATCH] crimson/osd: handle MOSDRepOp * add a `RepRequest` operation which is blocked by `ConnectionPipeline` and `PGPipeline`. these two pipelines are modeled after their counterparts of `ClientRequest`. * add these two blockers to `PG` and `OSDConnectionPriv` accordingly. Signed-off-by: Kefu Chai --- src/crimson/osd/CMakeLists.txt | 3 +- src/crimson/osd/osd.cc | 17 ++++- src/crimson/osd/osd.h | 3 + src/crimson/osd/osd_connection_priv.h | 2 + src/crimson/osd/osd_operation.h | 4 +- .../osd/osd_operations/replicated_request.cc | 74 +++++++++++++++++++ .../osd/osd_operations/replicated_request.h | 54 ++++++++++++++ src/crimson/osd/pg.cc | 19 +++++ src/crimson/osd/pg.h | 6 +- 9 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 src/crimson/osd/osd_operations/replicated_request.cc create mode 100644 src/crimson/osd/osd_operations/replicated_request.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 7c3f0f34d46..6e92f976617 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -12,9 +12,10 @@ add_executable(crimson-osd shard_services.cc osd_operation.cc osd_operations/client_request.cc - osd_operations/peering_event.cc osd_operations/compound_peering_request.cc + osd_operations/peering_event.cc osd_operations/pg_advance_map.cc + osd_operations/replicated_request.cc osdmap_gate.cc pg_map.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 05fc4d06d11..6cf7e9e04a6 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -34,10 +34,11 @@ #include "crimson/osd/pg.h" #include "crimson/osd/pg_backend.h" #include "crimson/osd/pg_meta.h" +#include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/compound_peering_request.h" #include "crimson/osd/osd_operations/peering_event.h" #include "crimson/osd/osd_operations/pg_advance_map.h" -#include "crimson/osd/osd_operations/client_request.h" +#include "crimson/osd/osd_operations/replicated_request.h" namespace { seastar::logger& logger() { @@ -479,6 +480,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m) return seastar::now(); case MSG_OSD_PG_LOG: return handle_pg_log(conn, boost::static_pointer_cast(m)); + case MSG_OSD_REPOP: + return handle_rep_op(conn, boost::static_pointer_cast(m)); case MSG_OSD_REPOPREPLY: return handle_rep_op_reply(conn, boost::static_pointer_cast(m)); default: @@ -856,11 +859,23 @@ seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn, return seastar::now(); } +seastar::future<> OSD::handle_rep_op(ceph::net::Connection* conn, + Ref m) +{ + m->finish_decode(); + shard_services.start_operation( + *this, + conn->get_shared(), + std::move(m)); + return seastar::now(); +} + seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn, Ref m) { const auto& pgs = pg_map.get_pgs(); if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) { + m->finish_decode(); pg->second->handle_rep_op_reply(conn, *m); } else { logger().warn("stale reply: {}", *m); diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 5578e268a50..4e8523c0f64 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -35,6 +35,7 @@ class MOSDMap; class MOSDOp; class MOSDRepOpReply; +class MOSDRepOp; class OSDMap; class OSDMeta; class Heartbeat; @@ -165,6 +166,8 @@ private: Ref m); seastar::future<> handle_osd_op(ceph::net::Connection* conn, Ref m); + seastar::future<> handle_rep_op(ceph::net::Connection* conn, + Ref m); seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn, Ref m); seastar::future<> handle_pg_log(ceph::net::Connection* conn, diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h index 25c72a88f1c..af377c9ba22 100644 --- a/src/crimson/osd/osd_connection_priv.h +++ b/src/crimson/osd/osd_connection_priv.h @@ -7,12 +7,14 @@ #include "crimson/osd/osd_operation.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/replicated_request.h" namespace ceph::osd { struct OSDConnectionPriv : public ceph::net::Connection::user_private_t { ClientRequest::ConnectionPipeline client_request_conn_pipeline; RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline; + RepRequest::ConnectionPipeline replicated_request_conn_pipeline; }; static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) { diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index 0152b3f836f..7ed60394ab6 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -24,7 +24,8 @@ enum class OperationTypeCode { compound_peering_request = 2, pg_advance_map = 3, pg_creation = 4, - last_op = 5 + replicated_request = 5, + last_op = 6 }; static constexpr const char* const OP_NAMES[] = { @@ -33,6 +34,7 @@ static constexpr const char* const OP_NAMES[] = { "compound_peering_request", "pg_advance_map", "pg_creation", + "replicated_request", }; // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry: diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc new file mode 100644 index 00000000000..46b964fc631 --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "replicated_request.h" + +#include "common/Formatter.h" +#include "messages/MOSDRepOp.h" + +#include "crimson/osd/osd.h" +#include "crimson/osd/osd_connection_priv.h" +#include "crimson/osd/pg.h" + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } +} + +namespace ceph::osd { + +RepRequest::RepRequest(OSD &osd, + ceph::net::ConnectionRef&& conn, + Ref &&req) + : osd{osd}, + conn{std::move(conn)}, + req{req} +{} + +void RepRequest::print(std::ostream& os) const +{ + os << "RepRequest(" + << "from=" << req->from + << " req=" << *req + << ")"; +} + +void RepRequest::dump_detail(Formatter *f) const +{ + f->open_object_section("RepRequest"); + f->dump_stream("reqid") << req->reqid; + f->dump_stream("pgid") << req->get_spg(); + f->dump_unsigned("map_epoch", req->get_map_epoch()); + f->dump_unsigned("min_epoch", req->get_min_epoch()); + f->dump_stream("oid") << req->poid; + f->dump_stream("from") << req->from; + f->close_section(); +} + +RepRequest::ConnectionPipeline &RepRequest::cp() +{ + return get_osd_priv(conn.get()).replicated_request_conn_pipeline; +} + +RepRequest::PGPipeline &RepRequest::pp(PG &pg) +{ + return pg.replicated_request_pg_pipeline; +} + +seastar::future<> RepRequest::start() +{ + logger().info("{} start", *this); + IRef ref = this; + return with_blocking_future(handle.enter(cp().await_map)) + .then([this]() { + return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch())); + }).then([this](epoch_t epoch) { + return with_blocking_future(handle.enter(cp().get_pg)); + }).then([this] { + return with_blocking_future(osd.wait_for_pg(req->get_spg())); + }).then([this, ref=std::move(ref)](Ref pg) { + return pg->handle_rep_op(std::move(req)); + }); +} +} diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h new file mode 100644 index 00000000000..a835aed32ff --- /dev/null +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "crimson/net/Connection.h" +#include "crimson/osd/osd_operation.h" +#include "crimson/common/type_helpers.h" + +class MOSDRepOp; + +namespace ceph::osd { + +class OSD; +class PG; + +class RepRequest final : public OperationT { +public: + class ConnectionPipeline { + OrderedPipelinePhase await_map = { + "RepRequest::ConnectionPipeline::await_map" + }; + OrderedPipelinePhase get_pg = { + "RepRequest::ConnectionPipeline::get_pg" + }; + friend RepRequest; + }; + class PGPipeline { + OrderedPipelinePhase await_map = { + "RepRequest::PGPipeline::await_map" + }; + OrderedPipelinePhase process = { + "RepRequest::PGPipeline::process" + }; + friend RepRequest; + }; + static constexpr OperationTypeCode type = OperationTypeCode::replicated_request; + RepRequest(OSD&, ceph::net::ConnectionRef&&, Ref&&); + + void print(std::ostream &) const final; + void dump_detail(Formatter *f) const final; + seastar::future<> start(); + +private: + ConnectionPipeline &cp(); + PGPipeline &pp(PG &pg); + + OSD &osd; + ceph::net::ConnectionRef conn; + Ref req; + OrderedPipelinePhase::Handle handle; +}; + +} diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 2efcc416b36..634e2485663 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -21,6 +21,8 @@ #include "messages/MOSDPGLog.h" #include "messages/MOSDPGNotify.h" #include "messages/MOSDPGQuery.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" #include "osd/OSDMap.h" @@ -484,6 +486,23 @@ seastar::future<> PG::handle_op(ceph::net::Connection* conn, }); } +seastar::future<> PG::handle_rep_op(Ref req) +{ + ceph::os::Transaction txn; + auto encoded_txn = req->get_data().cbegin(); + decode(txn, encoded_txn); + return shard_services.get_store().do_transaction(coll_ref, std::move(txn)) + .then([req, lcod=peering_state.get_info().last_complete, this] { + peering_state.update_last_complete_ondisk(lcod); + const auto map_epoch = get_osdmap_epoch(); + auto reply = make_message( + req.get(), pg_whoami, 0, + map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(lcod); + return shard_services.send_to_osd(req->from.osd, reply, map_epoch); + }); +} + void PG::handle_rep_op_reply(ceph::net::Connection* conn, const MOSDRepOpReply& m) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 21e724c2a00..fe6115031ee 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -21,6 +21,7 @@ #include "crimson/common/type_helpers.h" #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_operations/peering_event.h" +#include "crimson/osd/osd_operations/replicated_request.h" #include "crimson/osd/shard_services.h" #include "crimson/osd/osdmap_gate.h" @@ -54,6 +55,7 @@ class PG : public boost::intrusive_ref_counter< ClientRequest::PGPipeline client_request_pg_pipeline; PeeringEvent::PGPipeline peering_request_pg_pipeline; + RepRequest::PGPipeline replicated_request_pg_pipeline; spg_t pgid; pg_shard_t pg_whoami; @@ -417,6 +419,7 @@ public: void handle_initialize(PeeringCtx &rctx); seastar::future<> handle_op(ceph::net::Connection* conn, Ref m); + seastar::future<> handle_rep_op(Ref m); void handle_rep_op_reply(ceph::net::Connection* conn, const MOSDRepOpReply& m); @@ -453,8 +456,9 @@ private: friend std::ostream& operator<<(std::ostream&, const PG& pg); friend class ClientRequest; - friend class PeeringEvent; friend class PGAdvanceMap; + friend class PeeringEvent; + friend class RepRequest; }; std::ostream& operator<<(std::ostream&, const PG& pg); -- 2.39.5