]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: fix state machine operations in advance_pg_to and pg creation 28395/head
authorSamuel Just <sjust@redhat.com>
Sat, 22 Jun 2019 00:46:01 +0000 (17:46 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 25 Jun 2019 01:00:47 +0000 (18:00 -0700)
Both pg creation and advance_pg_to process statemachine events and
therefore need to be under the pg process pipeline stage.

Signed-off-by: Samuel Just <sjust@redhat.com>
13 files changed:
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/peering_event.h
src/crimson/osd/osd_operations/pg_advance_map.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/pg_advance_map.h [new file with mode: 0644]
src/crimson/osd/pg.h
src/crimson/osd/pg_map.h
src/crimson/osd/shard_services.h

index 0d3da12a113fa6753190c984f78c5424f9a76630..7c3f0f34d4678592c16c38d4265787654f5b8ef0 100644 (file)
@@ -14,6 +14,7 @@ add_executable(crimson-osd
   osd_operations/client_request.cc
   osd_operations/peering_event.cc
   osd_operations/compound_peering_request.cc
+  osd_operations/pg_advance_map.cc
   osdmap_gate.cc
   pg_map.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
index ab3a4fc7b5683164bdd57d6ca350a550c91932f2..32694a87a8bb32508264e789dd93be799d87face 100644 (file)
@@ -34,6 +34,7 @@
 #include "osd/PeeringState.h"
 #include "crimson/osd/osd_operations/compound_peering_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
 #include "crimson/osd/osd_operations/client_request.h"
 
 namespace {
@@ -638,19 +639,11 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
            false,
            rctx.transaction);
 
-         pg->handle_initialize(rctx);
-         pg->handle_activate_map(rctx);
-
-         logger().info("{} new pg {}", __func__, *pg);
-         pg_map.pg_created(info->pgid, pg);
-
-         return seastar::when_all_succeed(
-           advance_pg_to(pg, osdmap->get_epoch()),
-           pg->get_need_up_thru() ? _send_alive() : seastar::now(),
-           shard_services.dispatch_context(
-             pg->get_collection_ref(),
-             std::move(rctx)).then(
-               [pg]() { return seastar::make_ready_future<Ref<PG>>(pg); }));
+         return shard_services.start_operation<PGAdvanceMap>(
+           *this, pg, pg->get_osdmap_epoch(),
+           osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
+             return seastar::make_ready_future<Ref<PG>>(pg);
+           });
        });
     });
 }
@@ -898,7 +891,8 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
   // todo: m-to-n: broadcast this news to all shards
   auto &pgs = pg_map.get_pgs();
   return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
-    return advance_pg_to(pg.second, epoch);
+    return shard_services.start_operation<PGAdvanceMap>(
+      *this, pg.second, pg.second->get_osdmap_epoch(), epoch).second;
   }).then([epoch, this] {
     osdmap_gate.got_map(epoch);
     return seastar::make_ready_future();
@@ -926,30 +920,4 @@ blocking_future<Ref<PG>> OSD::wait_for_pg(
   return pg_map.get_pg(pgid).first;
 }
 
-seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
-{
-  auto from = pg->get_osdmap_epoch();
-  // todo: merge/split support
-  return seastar::do_with(
-    PeeringCtx{},
-    [this, pg, from, to](auto &rctx) {
-      return seastar::do_for_each(
-       boost::make_counting_iterator(from + 1),
-       boost::make_counting_iterator(to + 1),
-       [this, pg, &rctx](epoch_t next_epoch) {
-         return get_map(next_epoch).then(
-           [pg, this, &rctx] (cached_map_t&& next_map) {
-             pg->handle_advance_map(next_map, rctx);
-           });
-       }).then([this, &rctx, pg] {
-         pg->handle_activate_map(rctx);
-         return seastar::when_all_succeed(
-           pg->get_need_up_thru() ? _send_alive() : seastar::now(),
-           shard_services.dispatch_context(
-             pg->get_collection_ref(),
-             std::move(rctx)));
-       });
-    });
-}
-
 }
index 5cf093972af5a78e0004fda82411bdddfe3dc7e2..802aa8c101fb62b5839ef1da5f4fd8e4208c9e1d 100644 (file)
@@ -188,7 +188,6 @@ public:
   blocking_future<Ref<PG>> wait_for_pg(
     spg_t pgid);
 
-  seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
   bool should_restart() const;
   seastar::future<> restart();
   seastar::future<> shutdown();
@@ -196,6 +195,7 @@ public:
   seastar::future<> send_beacon();
   void update_heartbeat_peers();
 
+  friend class PGAdvanceMap;
 };
 
 }
index 73ce75476d5308df27389c1f3523ac7337f86dbd..0152b3f836f950073ba19ef2cbd15ce95a357044 100644 (file)
@@ -20,15 +20,19 @@ namespace ceph::osd {
 
 enum class OperationTypeCode {
   client_request = 0,
-  peering_event,
-  compound_peering_request,
-  last_op
+  peering_event = 1,
+  compound_peering_request = 2,
+  pg_advance_map = 3,
+  pg_creation = 4,
+  last_op = 5
 };
 
 static constexpr const char* const OP_NAMES[] = {
-  "client_write",
+  "client_request",
   "peering_event",
-  "compound_peering_request"
+  "compound_peering_request",
+  "pg_advance_map",
+  "pg_creation",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
index d65887d303dbc17ebe1decd25be936dc0c456810..8bd81e9816a33bd9b546c6b5f371c6539248d6b7 100644 (file)
@@ -48,7 +48,7 @@ seastar::future<> ClientRequest::start()
   logger().debug("{}: start", *this);
 
   IRef ref = this;
-  with_blocking_future(handle.enter(cp().await_map))
+  return with_blocking_future(handle.enter(cp().await_map))
     .then([this]() {
       return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch()));
     }).then([this](epoch_t epoch) {
@@ -70,7 +70,6 @@ seastar::future<> ClientRequest::start()
        });
       });
     });
-  return seastar::make_ready_future();
 }
 
 }
index f1abf29c59d4afe550f5301e7e6bdd899583f8ff..fee6a8794fa2c5b0b48efc6ce7f19f90cc5d1c8a 100644 (file)
@@ -85,22 +85,23 @@ std::vector<OperationRef> handle_pg_create(
        q->second.first);
     } else {
       auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
-       state,
-       osd,
-       conn,
-       osd.get_shard_services(),
-       pg_shard_t(),
-       pgid,
-       m->epoch,
-       m->epoch,
-       NullEvt(),
-       true,
-       new PGCreateInfo(
+         state,
+         osd,
+         conn,
+         osd.get_shard_services(),
+         pg_shard_t(),
          pgid,
          m->epoch,
-         q->second.first,
-         q->second.second,
-         true));
+         m->epoch,
+         NullEvt(),
+         true,
+         new PGCreateInfo(
+           pgid,
+           m->epoch,
+           q->second.first,
+           q->second.second,
+           true)).first;
+      ret.push_back(op);
     }
   }
   return ret;
@@ -141,8 +142,7 @@ std::vector<OperationRef> handle_pg_notify(
       pg_notify.query_epoch,
       notify,
       true, // requires_pg
-      create_info);
-    op->start();
+      create_info).first;
     ret.push_back(op);
   }
   return ret;
@@ -173,7 +173,7 @@ std::vector<OperationRef> handle_pg_info(
        pgid,
        pg_notify.epoch_sent,
        pg_notify.query_epoch,
-       std::move(info));
+       std::move(info)).first;
     ret.push_back(op);
   }
   return ret;
@@ -213,15 +213,15 @@ std::vector<OperationRef> handle_pg_query(
                 pg_query, pg_query.epoch_sent};
     logger().debug("handle_pg_query on {} from {}", pgid, from);
     auto op = osd.get_shard_services().start_operation<QuerySubEvent>(
-      state,
-      osd,
-      conn,
-      osd.get_shard_services(),
-      pg_shard_t(from, pg_query.from),
-      pgid,
-      pg_query.epoch_sent,
-      pg_query.epoch_sent,
-      std::move(query));
+       state,
+       osd,
+       conn,
+       osd.get_shard_services(),
+       pg_shard_t(from, pg_query.from),
+       pgid,
+       pg_query.epoch_sent,
+       pg_query.epoch_sent,
+       std::move(query)).first;
     ret.push_back(op);
   }
   return ret;
@@ -305,7 +305,7 @@ seastar::future<> CompoundPeeringRequest::start()
   add_blocker(blocker.get());
   IRef ref = this;
   logger().info("{}: about to fork future", *this);
-  state->promise.get_future().then(
+  return state->promise.get_future().then(
     [this, blocker=std::move(blocker)](auto &&ctx) {
       clear_blocker(blocker.get());
       logger().info("{}: sub events complete", *this);
@@ -313,9 +313,6 @@ seastar::future<> CompoundPeeringRequest::start()
     }).then([this, ref=std::move(ref)] {
       logger().info("{}: complete", *this);
     });
-
-  logger().info("{}: forked, returning", *this);
-  return seastar::now();
 }
 
 } // namespace ceph::osd
index 6112d1fb4c36a49e4922348a7369a7dbe8d9c100..79584fe4e2397e8215b2f38f6c79c045f19e1589 100644 (file)
@@ -51,7 +51,7 @@ seastar::future<> PeeringEvent::start()
   logger().debug("{}: start", *this);
 
   IRef ref = this;
-  get_pg().then([this](Ref<PG> pg) {
+  return get_pg().then([this](Ref<PG> pg) {
     if (!pg) {
       logger().debug("{}: pg absent, did not create", *this);
       on_pg_absent();
@@ -74,7 +74,6 @@ seastar::future<> PeeringEvent::start()
   }).then([this, ref=std::move(ref)] {
     logger().debug("{}: complete", *this);
   });
-  return seastar::make_ready_future();
 }
 
 void PeeringEvent::on_pg_absent()
index 995df4b5b27464bf934b276a9fde2d2a409af12b..62ecccfbe729fd108ade35c9b25c21731af8f469 100644 (file)
@@ -29,6 +29,7 @@ public:
       "PeeringEvent::PGPipeline::process"
     };
     friend class PeeringEvent;
+    friend class PGAdvanceMap;
   };
 
 protected:
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc
new file mode 100644 (file)
index 0000000..2d3061c
--- /dev/null
@@ -0,0 +1,95 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include "include/types.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+PGAdvanceMap::PGAdvanceMap(
+  OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
+  : osd(osd), pg(pg), from(from), to(to), do_init(false) {}
+
+PGAdvanceMap::PGAdvanceMap(
+  OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+  PeeringCtx &&rctx, bool do_init)
+  : osd(osd), pg(pg), from(from), to(to),
+    rctx(std::move(rctx)), do_init(do_init) {}
+
+PGAdvanceMap::~PGAdvanceMap() {}
+
+void PGAdvanceMap::print(std::ostream &lhs) const
+{
+  lhs << "PGAdvanceMap("
+      << "pg=" << pg->get_pgid()
+      << " from=" << from
+      << " to=" << to;
+  if (do_init) {
+    lhs << " do_init";
+  }
+  lhs << ")";
+}
+
+void PGAdvanceMap::dump_detail(Formatter *f) const
+{
+  f->open_object_section("PGAdvanceMap");
+  f->dump_stream("pgid") << pg->get_pgid();
+  f->dump_int("from", from);
+  f->dump_int("to", to);
+  f->dump_bool("do_init", do_init);
+  f->close_section();
+}
+
+seastar::future<> PGAdvanceMap::start()
+{
+  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  return with_blocking_future(
+    handle.enter(pg->peering_request_pg_pipeline.process))
+    .then([this] {
+      if (do_init) {
+       pg->handle_initialize(rctx);
+       pg->handle_activate_map(rctx);
+      }
+      return seastar::do_for_each(
+       boost::make_counting_iterator(from + 1),
+       boost::make_counting_iterator(to + 1),
+       [this](epoch_t next_epoch) {
+         return osd.get_map(next_epoch).then(
+           [this] (cached_map_t&& next_map) {
+             pg->handle_advance_map(next_map, rctx);
+           });
+       }).then([this] {
+         pg->handle_activate_map(rctx);
+         handle.exit();
+         if (do_init) {
+           osd.pg_map.pg_created(pg->get_pgid(), pg);
+           logger().info("{} new pg {}", __func__, *pg);
+         }
+         return seastar::when_all_succeed(
+           pg->get_need_up_thru() ? osd._send_alive() : seastar::now(),
+           osd.shard_services.dispatch_context(
+             pg->get_collection_ref(),
+             std::move(rctx)));
+       });
+    }).then([this, ref=std::move(ref)] {
+      logger().debug("{}: complete", *this);
+    });
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h
new file mode 100644 (file)
index 0000000..7225ca9
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+#include "osd/PeeringState.h"
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+class PGAdvanceMap : public OperationT<PGAdvanceMap> {
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
+
+protected:
+  OrderedPipelinePhase::Handle handle;
+
+  OSD &osd;
+  Ref<PG> pg;
+
+  epoch_t from;
+  epoch_t to;
+
+  PeeringCtx rctx;
+  const bool do_init;
+
+public:
+  PGAdvanceMap(
+    OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
+  PGAdvanceMap(
+    OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+    PeeringCtx &&rctx, bool do_init);
+  ~PGAdvanceMap();
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+};
+
+}
index 7c8e4518d5d54509d5092df4188b80c9c70e2cee..4af2424963e773b216ffd4b946d923c13d859450 100644 (file)
@@ -71,6 +71,14 @@ public:
 
   ~PG();
 
+  const pg_shard_t &get_pg_whoami() const {
+    return pg_whoami;
+  }
+
+  const spg_t&get_pgid() const {
+    return pgid;
+  }
+
   // EpochSource
   epoch_t get_osdmap_epoch() const final {
     return peering_state.get_osdmap_epoch();
@@ -437,6 +445,7 @@ private:
   friend std::ostream& operator<<(std::ostream&, const PG& pg);
   friend class ClientRequest;
   friend class PeeringEvent;
+  friend class PGAdvanceMap;
 };
 
 std::ostream& operator<<(std::ostream&, const PG& pg);
index 8b4086efd889a4730b6d57f14407510b4c39917f..a17388c1b3bfa6aee5ca5a7806752dcef1a91c34 100644 (file)
@@ -11,6 +11,7 @@
 #include "include/types.h"
 #include "crimson/common/type_helpers.h"
 #include "crimson/osd/osd_operation.h"
+#include "crimson/osd/pg.h"
 #include "osd/osd_types.h"
 
 namespace ceph::osd {
index 9d04ba2e9dc4508a6dddb5b4638190a09ddbf313..62b3932c4113078d390873d9e892d7adb21ce5c7 100644 (file)
@@ -74,10 +74,9 @@ public:
   OperationRegistry registry;
 
   template <typename T, typename... Args>
-  typename T::IRef start_operation(Args&&... args) {
+  auto start_operation(Args&&... args) {
     auto op = registry.create_operation<T>(std::forward<Args>(args)...);
-    op->start();
-    return op;
+    return std::make_pair(op, op->start());
   }
 
   // Loggers