]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: handle MOSDRepOp
authorKefu Chai <kchai@redhat.com>
Mon, 15 Jul 2019 13:17:56 +0000 (21:17 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 12 Aug 2019 10:01:46 +0000 (18:01 +0800)
* add a `RepRequest` operation which is blocked by `ConnectionPipeline`
  and `PGPipeline`. these two pipelines are modeled after their
  counterparts of `ClientRequest`.
* add these two blockers to `PG` and `OSDConnectionPriv` accordingly.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/replicated_request.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/replicated_request.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 7c3f0f34d4678592c16c38d4265787654f5b8ef0..6e92f97661750c59be26b6fde8c8dc7b476ba605 100644 (file)
@@ -12,9 +12,10 @@ add_executable(crimson-osd
   shard_services.cc
   osd_operation.cc
   osd_operations/client_request.cc
-  osd_operations/peering_event.cc
   osd_operations/compound_peering_request.cc
+  osd_operations/peering_event.cc
   osd_operations/pg_advance_map.cc
+  osd_operations/replicated_request.cc
   osdmap_gate.cc
   pg_map.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
index 05fc4d06d11ef7e8a37b632a22bfc55672a11da3..6cf7e9e04a67a4123026392487ffba9a0f6599f1 100644 (file)
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_backend.h"
 #include "crimson/osd/pg_meta.h"
+#include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/compound_peering_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/pg_advance_map.h"
-#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
 
 namespace {
   seastar::logger& logger() {
@@ -479,6 +480,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
     return seastar::now();
   case MSG_OSD_PG_LOG:
     return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+  case MSG_OSD_REPOP:
+    return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
   case MSG_OSD_REPOPREPLY:
     return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
   default:
@@ -856,11 +859,23 @@ seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn,
   return seastar::now();
 }
 
+seastar::future<> OSD::handle_rep_op(ceph::net::Connection* conn,
+                                    Ref<MOSDRepOp> m)
+{
+  m->finish_decode();
+  shard_services.start_operation<RepRequest>(
+    *this,
+    conn->get_shared(),
+    std::move(m));
+  return seastar::now();
+}
+
 seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn,
                                           Ref<MOSDRepOpReply> m)
 {
   const auto& pgs = pg_map.get_pgs();
   if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
+    m->finish_decode();
     pg->second->handle_rep_op_reply(conn, *m);
   } else {
     logger().warn("stale reply: {}", *m);
index 5578e268a50d04ab7fc636e386af8851f35108e0..4e8523c0f64a7d3af65ee6842d0c0b48299af0e5 100644 (file)
@@ -35,6 +35,7 @@
 class MOSDMap;
 class MOSDOp;
 class MOSDRepOpReply;
+class MOSDRepOp;
 class OSDMap;
 class OSDMeta;
 class Heartbeat;
@@ -165,6 +166,8 @@ private:
                                    Ref<MOSDMap> m);
   seastar::future<> handle_osd_op(ceph::net::Connection* conn,
                                  Ref<MOSDOp> m);
+  seastar::future<> handle_rep_op(ceph::net::Connection* conn,
+                                 Ref<MOSDRepOp> m);
   seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
                                        Ref<MOSDRepOpReply> m);
   seastar::future<> handle_pg_log(ceph::net::Connection* conn,
index 25c72a88f1c7358013540528402687221687977e..af377c9ba22b4e2948839f70ae1ee93773b96850 100644 (file)
@@ -7,12 +7,14 @@
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
 
 namespace ceph::osd {
 
 struct OSDConnectionPriv : public ceph::net::Connection::user_private_t {
   ClientRequest::ConnectionPipeline client_request_conn_pipeline;
   RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
+  RepRequest::ConnectionPipeline replicated_request_conn_pipeline;
 };
 
 static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) {
index 0152b3f836f950073ba19ef2cbd15ce95a357044..7ed60394ab680fab8279d332e6051617dad0b260 100644 (file)
@@ -24,7 +24,8 @@ enum class OperationTypeCode {
   compound_peering_request = 2,
   pg_advance_map = 3,
   pg_creation = 4,
-  last_op = 5
+  replicated_request = 5,
+  last_op = 6
 };
 
 static constexpr const char* const OP_NAMES[] = {
@@ -33,6 +34,7 @@ static constexpr const char* const OP_NAMES[] = {
   "compound_peering_request",
   "pg_advance_map",
   "pg_creation",
+  "replicated_request",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc
new file mode 100644 (file)
index 0000000..46b964f
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "replicated_request.h"
+
+#include "common/Formatter.h"
+#include "messages/MOSDRepOp.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+RepRequest::RepRequest(OSD &osd,
+                      ceph::net::ConnectionRef&& conn,
+                      Ref<MOSDRepOp> &&req)
+  : osd{osd},
+    conn{std::move(conn)},
+    req{req}
+{}
+
+void RepRequest::print(std::ostream& os) const
+{
+  os << "RepRequest("
+     << "from=" << req->from
+     << " req=" << *req
+     << ")";
+}
+
+void RepRequest::dump_detail(Formatter *f) const
+{
+  f->open_object_section("RepRequest");
+  f->dump_stream("reqid") << req->reqid;
+  f->dump_stream("pgid") << req->get_spg();
+  f->dump_unsigned("map_epoch", req->get_map_epoch());
+  f->dump_unsigned("min_epoch", req->get_min_epoch());
+  f->dump_stream("oid") << req->poid;
+  f->dump_stream("from") << req->from;
+  f->close_section();
+}
+
+RepRequest::ConnectionPipeline &RepRequest::cp()
+{
+  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &RepRequest::pp(PG &pg)
+{
+  return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> RepRequest::start()
+{
+  logger().info("{} start", *this);
+  IRef ref = this;
+  return with_blocking_future(handle.enter(cp().await_map))
+    .then([this]() {
+      return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch()));
+    }).then([this](epoch_t epoch) {
+      return with_blocking_future(handle.enter(cp().get_pg));
+    }).then([this] {
+      return with_blocking_future(osd.wait_for_pg(req->get_spg()));
+    }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+      return pg->handle_rep_op(std::move(req));
+    });
+}
+}
diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h
new file mode 100644 (file)
index 0000000..a835aed
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDRepOp;
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+class RepRequest final : public OperationT<RepRequest> {
+public:
+  class ConnectionPipeline {
+    OrderedPipelinePhase await_map = {
+      "RepRequest::ConnectionPipeline::await_map"
+    };
+    OrderedPipelinePhase get_pg = {
+      "RepRequest::ConnectionPipeline::get_pg"
+    };
+    friend RepRequest;
+  };
+  class PGPipeline {
+    OrderedPipelinePhase await_map = {
+      "RepRequest::PGPipeline::await_map"
+    };
+    OrderedPipelinePhase process = {
+      "RepRequest::PGPipeline::process"
+    };
+    friend RepRequest;
+  };
+  static constexpr OperationTypeCode type = OperationTypeCode::replicated_request;
+  RepRequest(OSD&, ceph::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+
+private:
+  ConnectionPipeline &cp();
+  PGPipeline &pp(PG &pg);
+
+  OSD &osd;
+  ceph::net::ConnectionRef conn;
+  Ref<MOSDRepOp> req;
+  OrderedPipelinePhase::Handle handle;
+};
+
+}
index 2efcc416b36ffe450fafc4730111484012d2a2c0..634e2485663266616b3a043397a4f2a71f5ce793 100644 (file)
@@ -21,6 +21,8 @@
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
 
 #include "osd/OSDMap.h"
 
@@ -484,6 +486,23 @@ seastar::future<> PG::handle_op(ceph::net::Connection* conn,
   });
 }
 
+seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+{
+  ceph::os::Transaction txn;
+  auto encoded_txn = req->get_data().cbegin();
+  decode(txn, encoded_txn);
+  return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
+    .then([req, lcod=peering_state.get_info().last_complete, this] {
+      peering_state.update_last_complete_ondisk(lcod);
+      const auto map_epoch = get_osdmap_epoch();
+      auto reply = make_message<MOSDRepOpReply>(
+        req.get(), pg_whoami, 0,
+       map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
+      reply->set_last_complete_ondisk(lcod);
+      return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
+    });
+}
+
 void PG::handle_rep_op_reply(ceph::net::Connection* conn,
                             const MOSDRepOpReply& m)
 {
index 21e724c2a009f4d7e28e41b36c6cff42630c0c8d..fe6115031ee9e8b40fed536f46086abda5a35218 100644 (file)
@@ -21,6 +21,7 @@
 #include "crimson/common/type_helpers.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
 #include "crimson/osd/shard_services.h"
 #include "crimson/osd/osdmap_gate.h"
 
@@ -54,6 +55,7 @@ class PG : public boost::intrusive_ref_counter<
 
   ClientRequest::PGPipeline client_request_pg_pipeline;
   PeeringEvent::PGPipeline peering_request_pg_pipeline;
+  RepRequest::PGPipeline replicated_request_pg_pipeline;
 
   spg_t pgid;
   pg_shard_t pg_whoami;
@@ -417,6 +419,7 @@ public:
   void handle_initialize(PeeringCtx &rctx);
   seastar::future<> handle_op(ceph::net::Connection* conn,
                              Ref<MOSDOp> m);
+  seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
   void handle_rep_op_reply(ceph::net::Connection* conn,
                           const MOSDRepOpReply& m);
 
@@ -453,8 +456,9 @@ private:
 
   friend std::ostream& operator<<(std::ostream&, const PG& pg);
   friend class ClientRequest;
-  friend class PeeringEvent;
   friend class PGAdvanceMap;
+  friend class PeeringEvent;
+  friend class RepRequest;
 };
 
 std::ostream& operator<<(std::ostream&, const PG& pg);