]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: use single-pg peering ops
authorKefu Chai <kchai@redhat.com>
Fri, 13 Sep 2019 07:18:58 +0000 (15:18 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 13 Sep 2019 08:14:13 +0000 (16:14 +0800)
classic OSD started to use single-pg peering ops since ce05c172, and it
switched over to these ops since octopus. and it's assumed that
crimson-osd won't be GA until octopus+2 release, so it's not needed to
support pre-octopus releases.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/pg_advance_map.h
src/crimson/osd/shard_services.cc

index b0586d2f02d41b144f01b3ddb7a5282d3ea90478..e846c97827615df01a5180ce5e363522195758e8 100644 (file)
@@ -488,16 +488,19 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
   case CEPH_MSG_OSD_OP:
     return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
   case MSG_OSD_PG_CREATE2:
-  case MSG_OSD_PG_NOTIFY:
-  case MSG_OSD_PG_INFO:
-  case MSG_OSD_PG_QUERY:
     shard_services.start_operation<CompoundPeeringRequest>(
       *this,
       conn->get_shared(),
       m);
     return seastar::now();
+  case MSG_OSD_PG_NOTIFY2:
+    [[fallthrough]];
+  case MSG_OSD_PG_INFO2:
+    [[fallthrough]];
+  case MSG_OSD_PG_QUERY2:
+    [[fallthrough]];
   case MSG_OSD_PG_LOG:
-    return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+    return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
   case MSG_OSD_REPOP:
     return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
   case MSG_OSD_REPOPREPLY:
@@ -974,18 +977,18 @@ void OSD::update_heartbeat_peers()
   heartbeat->update_peers(whoami);
 }
 
-seastar::future<> OSD::handle_pg_log(
+seastar::future<> OSD::handle_peering_op(
   ceph::net::Connection* conn,
-  Ref<MOSDPGLog> m)
+  Ref<MOSDPeeringOp> m)
 {
   const int from = m->get_source().num();
-  logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
+  logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
   shard_services.start_operation<RemotePeeringEvent>(
     *this,
     conn->get_shared(),
     shard_services,
-    pg_shard_t(from, m->from),
-    spg_t(m->info.pgid.pgid, m->to),
+    pg_shard_t{from, m->get_spg().shard},
+    m->get_spg(),
     std::move(*m->get_event()));
   return seastar::now();
 }
index 0377edfd9dc87d3ca42dbbf9def2c2948c88ee5a..5c7b5c5242fc9b85aabca471a59c91ad86dfef02 100644 (file)
@@ -169,8 +169,8 @@ private:
                                  Ref<MOSDRepOp> m);
   seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
                                        Ref<MOSDRepOpReply> m);
-  seastar::future<> handle_pg_log(ceph::net::Connection* conn,
-                                 Ref<MOSDPGLog> m);
+  seastar::future<> handle_peering_op(ceph::net::Connection* conn,
+                                     Ref<MOSDPeeringOp> m);
 
   seastar::future<> committed_osd_maps(version_t first,
                                        version_t last,
index 43d715e8c0441f19cb39caef53cc79c76f7347ca..7c38be2545e2d81f9723ce1df69a0e03203a98cc 100644 (file)
@@ -5,8 +5,6 @@
 
 #include "osd/PeeringState.h"
 
-#include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGCreate2.h"
 
@@ -28,6 +26,11 @@ using namespace ceph::osd;
 struct compound_state {
   seastar::promise<BufferedRecoveryMessages> promise;
   BufferedRecoveryMessages ctx;
+  compound_state()
+    // assuming crimson-osd won't need to be compatible with pre-octopus
+    // releases
+    : ctx{ceph_release_t::octopus}
+  {}
   ~compound_state() {
     promise.set_value(std::move(ctx));
   }
@@ -107,75 +110,6 @@ std::vector<OperationRef> handle_pg_create(
   return ret;
 }
 
-std::vector<OperationRef> handle_pg_notify(
-  OSD &osd,
-  ceph::net::ConnectionRef conn,
-  compound_state_ref state,
-  Ref<MOSDPGNotify> m)
-{
-  std::vector<OperationRef> ret;
-  ret.reserve(m->get_pg_list().size());
-  const int from = m->get_source().num();
-  for (auto& pg_notify : m->get_pg_list()) {
-    spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
-    MNotifyRec notify{pgid,
-                     pg_shard_t{from, pg_notify.from},
-                     pg_notify,
-                     0}; // the features is not used
-    logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
-    auto create_info = new PGCreateInfo{
-      pgid,
-      pg_notify.query_epoch,
-      pg_notify.info.history,
-      pg_notify.past_intervals,
-      false};
-    auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
-      state,
-      osd,
-      conn,
-      osd.get_shard_services(),
-      pg_shard_t(from, pg_notify.from),
-      pgid,
-      pg_notify.epoch_sent,
-      pg_notify.query_epoch,
-      notify,
-      true, // requires_pg
-      create_info).first;
-    ret.push_back(op);
-  }
-  return ret;
-}
-
-std::vector<OperationRef> handle_pg_info(
-  OSD &osd,
-  ceph::net::ConnectionRef conn,
-  compound_state_ref state,
-  Ref<MOSDPGInfo> m)
-{
-  std::vector<OperationRef> ret;
-  ret.reserve(m->pg_list.size());
-  const int from = m->get_source().num();
-  for (auto& pg_notify : m->pg_list) {
-    spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
-    logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
-    MInfoRec info{pg_shard_t{from, pg_notify.from},
-                 pg_notify.info,
-                 pg_notify.epoch_sent};
-    auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
-       state,
-       osd,
-       conn,
-       osd.get_shard_services(),
-       pg_shard_t(from, pg_notify.from),
-       pgid,
-       pg_notify.epoch_sent,
-       pg_notify.query_epoch,
-       std::move(info)).first;
-    ret.push_back(op);
-  }
-  return ret;
-}
-
 class QuerySubEvent : public PeeringSubEvent {
 public:
   template <typename... Args>
@@ -185,7 +119,7 @@ public:
   void on_pg_absent() final {
     logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
     pg_info_t empty(pgid);
-    ctx.notify_list[from.osd].emplace_back(
+    ctx.send_notify(from.osd,
       pg_notify_t(
        from.shard, pgid.shard,
        evt.get_epoch_sent(),
@@ -275,18 +209,6 @@ seastar::future<> CompoundPeeringRequest::start()
          conn,
          state,
          boost::static_pointer_cast<MOSDPGCreate2>(m));
-      case MSG_OSD_PG_NOTIFY:
-       return handle_pg_notify(
-         osd,
-         conn,
-         state,
-         boost::static_pointer_cast<MOSDPGNotify>(m));
-      case MSG_OSD_PG_INFO:
-       return handle_pg_info(
-         osd,
-         conn,
-         state,
-         boost::static_pointer_cast<MOSDPGInfo>(m));
       case MSG_OSD_PG_QUERY:
        return handle_pg_query(
          osd,
index 2d3061c3252b986c29be2f01e46f80835b881c99..1408709b0ab885e91b3a7ca35dd56aed474117be 100644 (file)
@@ -18,10 +18,6 @@ namespace {
 
 namespace ceph::osd {
 
-PGAdvanceMap::PGAdvanceMap(
-  OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
-  : osd(osd), pg(pg), from(from), to(to), do_init(false) {}
-
 PGAdvanceMap::PGAdvanceMap(
   OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
   PeeringCtx &&rctx, bool do_init)
index 7225ca9de8103301a40d3c9826e06ee25b3183ed..bed61558f96052920b9bbcddc40d517356fdad48 100644 (file)
@@ -33,8 +33,6 @@ protected:
   const bool do_init;
 
 public:
-  PGAdvanceMap(
-    OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
   PGAdvanceMap(
     OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
     PeeringCtx &&rctx, bool do_init);
index db3add0a05f6bf01844a8137c1f34f4a723ec5c6..84d8277d41e1535a3400616884b4e93b7c528012 100644 (file)
@@ -77,34 +77,15 @@ seastar::future<> ShardServices::dispatch_context_transaction(
 seastar::future<> ShardServices::dispatch_context_messages(
   BufferedRecoveryMessages &&ctx)
 {
-  auto ret = seastar::when_all_succeed(
-    seastar::parallel_for_each(std::move(ctx.notify_list),
-      [this](auto& osd_notifies) {
-       auto& [peer, notifies] = osd_notifies;
-       auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
-                                           std::move(notifies));
-       logger().debug("dispatch_context_messages sending notify to {}", peer);
-       return send_to_osd(peer, m, osdmap->get_epoch());
-      }),
-    seastar::parallel_for_each(std::move(ctx.query_map),
-      [this](auto& osd_queries) {
-       auto& [peer, queries] = osd_queries;
-       auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
-                                          std::move(queries));
-       logger().debug("dispatch_context_messages sending query to {}", peer);
-       return send_to_osd(peer, m, osdmap->get_epoch());
-      }),
-    seastar::parallel_for_each(std::move(ctx.info_map),
-      [this](auto& osd_infos) {
-       auto& [peer, infos] = osd_infos;
-       auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
-                                         std::move(infos));
-       logger().debug("dispatch_context_messages sending info to {}", peer);
-       return send_to_osd(peer, m, osdmap->get_epoch());
-      }));
-  ctx.notify_list.clear();
-  ctx.query_map.clear();
-  ctx.info_map.clear();
+  auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
+    [this](auto& osd_messages) {
+      auto& [peer, messages] = osd_messages;
+      logger().debug("dispatch_context_messages sending messages to {}", peer);
+      return seastar::parallel_for_each(std::move(messages), [=](auto& m) {
+        return send_to_osd(peer, m, osdmap->get_epoch());
+      });
+    });
+  ctx.message_map.clear();
   return ret;
 }