]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: hook PeeringState into pg
authorSamuel Just <sjust@redhat.com>
Thu, 30 May 2019 00:32:17 +0000 (17:32 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 31 May 2019 21:05:45 +0000 (14:05 -0700)
This patch:
1) Adds partially implemented stubs for PeeringState::PeeringListener
2) Revamps the message handling to batch the peering events from
   a single message.
3) Replaces the message handling with hand-offs to PeeringState
4) Adds support for MOSDPGCreate2 enabling creation of pgs.
5) Introduces ShardServices for exposing osd machinery to the pgs.

Signed-off-by: Samuel Just <sjust@redhat.com>
19 files changed:
src/crimson/CMakeLists.txt
src/crimson/mon/MonClient.h
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osdmap_service.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.cc
src/crimson/osd/pg_backend.h
src/crimson/osd/recovery_events.h [deleted file]
src/crimson/osd/recovery_machine.cc [deleted file]
src/crimson/osd/recovery_machine.h [deleted file]
src/crimson/osd/recovery_state.cc [deleted file]
src/crimson/osd/recovery_state.h [deleted file]
src/crimson/osd/recovery_states.cc [deleted file]
src/crimson/osd/recovery_states.h [deleted file]
src/crimson/osd/shard_services.cc [new file with mode: 0644]
src/crimson/osd/shard_services.h [new file with mode: 0644]

index 843e0f98749bc5660da91c43ec6be67dac5e6f6f..9a8fcf8a2d87c2dfa513e56b48a034f3faee000a 100644 (file)
@@ -64,6 +64,7 @@ add_library(crimson-common STATIC
   ${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc
   ${PROJECT_SOURCE_DIR}/src/common/Formatter.cc
   ${PROJECT_SOURCE_DIR}/src/common/Graylog.cc
+  ${PROJECT_SOURCE_DIR}/src/common/ostream_temp.cc
   ${PROJECT_SOURCE_DIR}/src/common/LogEntry.cc
   ${PROJECT_SOURCE_DIR}/src/common/Mutex.cc
   ${PROJECT_SOURCE_DIR}/src/common/SubProcess.cc
@@ -102,14 +103,21 @@ target_compile_definitions(crimson-common PRIVATE
   "CEPH_PKGLIBDIR=\"${CEPH_INSTALL_FULL_PKGLIBDIR}\""
   "CEPH_DATADIR=\"${CEPH_INSTALL_DATADIR}\"")
 
+set(crimson_common_deps
+  Boost::iostreams
+  Boost::random)
+
+if(NOT WITH_SYSTEM_BOOST)
+  list(APPEND crimson_common_deps ${ZLIB_LIBRARIES})
+endif()
+
 target_link_libraries(crimson-common
   PUBLIC
     json_spirit
   PRIVATE
     crc32
     crimson::cflags
-    Boost::iostreams
-    Boost::random
+    ${crimson_common_deps}
     ${NSS_LIBRARIES} ${NSPR_LIBRARIES} OpenSSL::Crypto)
 
 set(crimson_auth_srcs
index 75a00433824297ccd5999a4e1b7ead62aa03e636..c84f0adb5edb0c98a46ed35145703a041585a4e9 100644 (file)
@@ -90,10 +90,6 @@ public:
   bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
   seastar::future<> renew_subs();
 
-  MonMap &get_monmap_ref() {
-    return monmap;
-  }
-
 private:
   // AuthServer methods
   std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
index e0f1c580417e4ab81f116e1e020bb9b676b1be90..2cbfe0a702d08d1b3954d4064fa023a27c5af25b 100644 (file)
@@ -8,9 +8,14 @@ add_executable(crimson-osd
   pg.cc
   pg_backend.cc
   pg_meta.cc
-  recovery_machine.cc
-  recovery_state.cc
-  recovery_states.cc
-  replicated_backend.cc)
+  replicated_backend.cc
+  shard_services.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc
+  ${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc
+  )
 target_link_libraries(crimson-osd
   crimson-common crimson-os crimson fmt::fmt)
index 404f3e7b8cda09c7f824f937a08ee3f2dfa88f88..3258b70826f8902ea77deaf8c8d97845e4f012a9 100644 (file)
@@ -17,6 +17,7 @@
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
 #include "messages/MPGStats.h"
+#include "messages/MOSDPGCreate2.h"
 
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
@@ -30,8 +31,8 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_backend.h"
 #include "crimson/osd/pg_meta.h"
-
 #include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
 
 namespace {
   seastar::logger& logger() {
@@ -58,7 +59,8 @@ OSD::OSD(int id, uint32_t nonce,
     heartbeat{new Heartbeat{*this, *monc, hb_front_msgr, hb_back_msgr}},
     heartbeat_timer{[this] { update_heartbeat_peers(); }},
     store{std::make_unique<ceph::os::CyanStore>(
-      local_conf().get_val<std::string>("osd_data"))}
+      local_conf().get_val<std::string>("osd_data"))},
+    shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store}
 {
   osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
@@ -184,6 +186,7 @@ seastar::future<> OSD::start()
     superblock = std::move(sb);
     return get_map(superblock.current_epoch);
   }).then([this](cached_map_t&& map) {
+    shard_services.update_map(osdmap);
     osdmap = std::move(map);
     return load_pgs();
   }).then([this] {
@@ -278,8 +281,14 @@ seastar::future<> OSD::_send_boot()
   return monc->send_message(m);
 }
 
-seastar::future<> OSD::_send_alive(epoch_t want)
+seastar::future<> OSD::_send_alive()
 {
+  auto want = osdmap->get_epoch();
+  logger().info(
+    "{} want {} up_thru_wanted {}",
+    __func__,
+    want,
+    up_thru_wanted);
   if (!osdmap->exists(whoami)) {
     return seastar::now();
   } else if (want <= up_thru_wanted){
@@ -326,41 +335,51 @@ seastar::future<> OSD::load_pgs()
     });
 }
 
-seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, spg_t pgid)
 {
   using ec_profile_t = map<string,string>;
-  return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
-    return get_map(e);
-  }).then([pgid, this] (auto&& create_map) {
+  return ([&]() {
     if (create_map->have_pg_pool(pgid.pool())) {
       pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
       string name = create_map->get_pool_name(pgid.pool());
       ec_profile_t ec_profile;
       if (pi.is_erasure()) {
-        ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
+       ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
       }
-      return seastar::make_ready_future<pg_pool_t,
-                                        string,
-                                        ec_profile_t>(std::move(pi),
-                                                      std::move(name),
-                                                      std::move(ec_profile));
+      return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
+       std::move(pi),
+       std::move(name),
+       std::move(ec_profile));
     } else {
       // pool was deleted; grab final pg_pool_t off disk.
       return meta_coll->load_final_pool_info(pgid.pool());
     }
-  }).then([pgid, this](pg_pool_t&& pool,
+  })().then([pgid, this, create_map](pg_pool_t&& pool,
                        string&& name,
                        ec_profile_t&& ec_profile) {
-    auto backend = PGBackend::create(pgid, pool, store.get(), ec_profile);
-    Ref<PG> pg{new PG{pgid,
-                      pg_shard_t{whoami, pgid.shard},
-                      std::move(pool),
-                      std::move(name),
-                      std::move(backend),
-                      osdmap,
-                      cluster_msgr}};
+    return seastar::make_ready_future<Ref<PG>>(Ref<PG>{new PG{pgid,
+           pg_shard_t{whoami, pgid.shard},
+           std::move(pool),
+           std::move(name),
+           create_map,
+           shard_services,
+           ec_profile}});
+  });
+}
+
+seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+{
+  return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+    return get_map(e);
+  }).then([pgid, this] (auto&& create_map) {
+    return make_pg(std::move(create_map), pgid);
+  }).then([this, pgid](Ref<PG> pg) {
     return pg->read_state(store.get()).then([pg] {
       return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+    }).handle_exception([pgid](auto ep) {
+      logger().info("pg {} saw exception on load {}", pgid, ep);
+      ceph_abort("Could not load pg" == 0);
+      return seastar::make_exception_future<Ref<PG>>(ep);
     });
   });
 }
@@ -384,7 +403,10 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
     return handle_pg_query(conn, boost::static_pointer_cast<MOSDPGQuery>(m));
   case MSG_OSD_PG_LOG:
     return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+  case MSG_OSD_PG_CREATE2:
+    return handle_pg_create(conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
   default:
+    logger().info("{} unhandled message {}", __func__, *m);
     return seastar::now();
   }
 }
@@ -526,6 +548,167 @@ seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
   }
 }
 
+bool OSD::require_mon_peer(ceph::net::Connection *conn, Ref<Message> m)
+{
+  if (!conn->peer_is_mon()) {
+    logger().info("{} received from non-mon {}, {}",
+                 __func__,
+                 conn->get_peer_addr(),
+                 *m);
+    return false;
+  }
+  return true;
+}
+
+seastar::future<Ref<PG>> OSD::handle_pg_create_info(
+  std::unique_ptr<PGCreateInfo> info) {
+  return seastar::do_with(
+    std::move(info),
+    [this](auto &info) -> seastar::future<Ref<PG>> {
+      return get_map(info->epoch).then(
+       [&info, this](cached_map_t startmap) ->
+       seastar::future<Ref<PG>, cached_map_t> {
+         const spg_t &pgid = info->pgid;
+         if (info->by_mon) {
+           int64_t pool_id = pgid.pgid.pool();
+           const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
+           if (!pool) {
+             logger().debug(
+               "{} ignoring pgid {}, pool dne",
+               __func__,
+               pgid);
+             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+               Ref<PG>(),
+               startmap);
+           }
+           ceph_assert(osdmap->require_osd_release >= ceph_release_t::nautilus);
+           if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
+             // this ensures we do not process old creating messages after the
+             // pool's initial pgs have been created (and pg are subsequently
+             // allowed to split or merge).
+             logger().debug(
+               "{} dropping {} create, pool does not have CREATING flag set",
+               __func__,
+               pgid);
+             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+               Ref<PG>(),
+               startmap);
+           }
+         }
+         return make_pg(startmap, pgid).then(
+           [this, startmap=std::move(startmap)](auto pg) mutable {
+             return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+               std::move(pg),
+               std::move(startmap));
+           });
+      }).then(
+       [this, &info](auto pg, auto startmap) -> seastar::future<Ref<PG>> {
+         if (!pg)
+           return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+         PeeringCtx rctx;
+         const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
+
+         int up_primary, acting_primary;
+         vector<int> up, acting;
+         startmap->pg_to_up_acting_osds(
+           info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+         int role = startmap->calc_pg_role(whoami, acting, acting.size());
+         if (!pp->is_replicated() && role != info->pgid.shard) {
+           role = -1;
+         }
+
+
+         auto coll = store->create_new_collection(coll_t(info->pgid));
+         create_pg_collection(
+           rctx.transaction,
+           info->pgid,
+           info->pgid.get_split_bits(pp->get_pg_num()));
+         init_pg_ondisk(
+           rctx.transaction,
+           info->pgid,
+           pp);
+
+         pg->init(
+           coll,
+           role,
+           up,
+           up_primary,
+           acting,
+           acting_primary,
+           info->history,
+           info->past_intervals,
+           false,
+           rctx.transaction);
+
+         pg->handle_initialize(rctx);
+         pg->handle_activate_map(rctx);
+
+         logger().info("{} new pg {}", __func__, *pg);
+         pgs.emplace(info->pgid, pg);
+         return seastar::when_all_succeed(
+           pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+           shard_services.dispatch_context(
+             pg->get_collection_ref(),
+             std::move(rctx)).then(
+               [pg]() { return seastar::make_ready_future<Ref<PG>>(pg); }));
+       });
+    });
+}
+
+seastar::future<> OSD::handle_pg_create(
+  ceph::net::Connection* conn,
+  Ref<MOSDPGCreate2> m)
+{
+  logger().info("{}: {} from {}", __func__, *m, m->get_source());
+  if (!require_mon_peer(conn, m)) {
+    return seastar::now();
+  }
+  return handle_batch_pg_message(
+    m->pgs,
+    [this, conn, m](auto p)
+    -> std::optional<std::tuple<spg_t, std::unique_ptr<PGPeeringEvent>>> {
+      const spg_t &pgid = p.first;
+      const auto &[created, created_stamp] = p.second;
+
+      auto q = m->pg_extra.find(pgid);
+      ceph_assert(q != m->pg_extra.end());
+      logger().debug(
+       "{} {} e{} @{} history {} pi {}",
+       __func__,
+       pgid,
+       created,
+       created_stamp,
+       q->second.first,
+       q->second.second);
+      if (!q->second.second.empty() &&
+         m->epoch < q->second.second.get_bounds().second) {
+       logger().error(
+         "got pg_create on {} epoch {} unmatched past_intervals (history {})",
+         pgid,
+         m->epoch,
+         q->second.second,
+         q->second.first);
+       return std::nullopt;
+      } else {
+       return std::make_optional(
+         std::make_tuple(
+           pgid,
+           std::make_unique<PGPeeringEvent>(
+             m->epoch,
+             m->epoch,
+             NullEvt(),
+             true,
+             new PGCreateInfo(
+               pgid,
+               m->epoch,
+               q->second.first,
+               q->second.second,
+               true))));
+      }
+    });
+}
+
 seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
                                       Ref<MOSDMap> m)
 {
@@ -604,6 +787,7 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
                               [this](epoch_t cur) {
     return get_map(cur).then([this](cached_map_t&& o) {
       osdmap = std::move(o);
+      shard_services.update_map(osdmap);
       if (up_epoch != 0 &&
           osdmap->is_up(whoami) &&
           osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
@@ -746,13 +930,15 @@ void OSD::update_heartbeat_peers()
   heartbeat->update_peers(whoami);
 }
 
-seastar::future<> OSD::handle_pg_notify(ceph::net::Connection* conn,
-                                        Ref<MOSDPGNotify> m)
+seastar::future<> OSD::handle_pg_notify(
+  ceph::net::Connection* conn,
+  Ref<MOSDPGNotify> m)
 {
   // assuming all pgs reside in a single shard
   // see OSD::dequeue_peering_evt()
   const int from = m->get_source().num();
-  return seastar::parallel_for_each(m->get_pg_list(),
+  return handle_batch_pg_message(
+    m->get_pg_list(),
     [from, this](pair<pg_notify_t, PastIntervals> p) {
       auto& [pg_notify, past_intervals] = p;
       spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
@@ -761,72 +947,98 @@ seastar::future<> OSD::handle_pg_notify(ceph::net::Connection* conn,
                         pg_notify,
                         0, // the features is not used
                         past_intervals};
-      auto create_info = new PGCreateInfo{pgid,
-                                          pg_notify.query_epoch,
-                                          pg_notify.info.history,
-                                          past_intervals,
-                                          false};
-      auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
-                                                  pg_notify.query_epoch,
-                                                  notify,
-                                                  true, // requires_pg
-                                                  create_info);
-      return do_peering_event(pgid, std::move(evt));
+      logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
+      auto create_info = new PGCreateInfo{
+       pgid,
+       pg_notify.query_epoch,
+       pg_notify.info.history,
+       past_intervals,
+       false};
+      return std::make_optional(
+       std::make_tuple(
+         pgid,
+         std::make_unique<PGPeeringEvent>(
+           pg_notify.epoch_sent,
+           pg_notify.query_epoch,
+           notify,
+           true, // requires_pg
+           create_info)));
   });
 }
 
-seastar::future<> OSD::handle_pg_info(ceph::net::Connection* conn,
-                                      Ref<MOSDPGInfo> m)
+seastar::future<> OSD::handle_pg_info(
+  ceph::net::Connection* conn,
+  Ref<MOSDPGInfo> m)
 {
   // assuming all pgs reside in a single shard
   // see OSD::dequeue_peering_evt()
   const int from = m->get_source().num();
-  return seastar::parallel_for_each(m->pg_list,
+  return handle_batch_pg_message(
+    m->pg_list,
     [from, this](pair<pg_notify_t, PastIntervals> p) {
       auto& pg_notify = p.first;
       spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+      logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
       MInfoRec info{pg_shard_t{from, pg_notify.from},
                     pg_notify.info,
                     pg_notify.epoch_sent};
-      auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
-                                                  pg_notify.query_epoch,
-                                                  std::move(info));
-      return do_peering_event(pgid, std::move(evt));
-  });
+      return std::make_optional(
+       std::tuple(
+         pgid,
+         std::make_unique<PGPeeringEvent>(
+           pg_notify.epoch_sent,
+           pg_notify.query_epoch,
+           std::move(info))));
+    });
 }
 
 seastar::future<> OSD::handle_pg_query(ceph::net::Connection* conn,
                                        Ref<MOSDPGQuery> m)
 {
+  // assuming all pgs reside in a single shard
+  // see OSD::dequeue_peering_evt()
   const int from = m->get_source().num();
-  return seastar::parallel_for_each(m->pg_list,
+  // TODO: handle missing pg -- handle_batch_pg_message ignores pgs
+  // that don't exist
+  return handle_batch_pg_message_with_missing_handler(
+    m->pg_list,
     [from, this](pair<spg_t, pg_query_t> p) {
       auto& [pgid, pg_query] = p;
       MQuery query{pgid, pg_shard_t{from, pg_query.from},
-                   pg_query, pg_query.epoch_sent};
-      auto evt = std::make_unique<PGPeeringEvent>(pg_query.epoch_sent,
-                                                  pg_query.epoch_sent,
-                                                  std::move(query));
-      return do_peering_event(pgid, std::move(evt));
-  });
+                  pg_query, pg_query.epoch_sent};
+      logger().debug("handle_pg_query on {} from {}", pgid, from);
+      return std::make_optional(
+       std::make_tuple(
+         pgid,
+         std::make_unique<PGPeeringEvent>(
+           pg_query.epoch_sent,
+           pg_query.epoch_sent,
+           std::move(query))));
+    },
+    [this, from](pair<spg_t, pg_query_t> p, PeeringCtx &ctx) {
+      auto &[pgid, query] = p;
+      logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
+      pg_info_t empty(spg_t(pgid.pgid, query.to));
+      ceph_assert(query.type == pg_query_t::INFO);
+      ctx.notify_list[from].emplace_back(
+        pg_notify_t(
+          query.from, query.to,
+          query.epoch_sent,
+         osdmap->get_epoch(),
+         empty),
+       PastIntervals());
+    });
 }
 
-seastar::future<> OSD::handle_pg_log(ceph::net::Connection* conn,
-                                       Ref<MOSDPGLog> m)
+seastar::future<> OSD::handle_pg_log(
+  ceph::net::Connection* conn,
+  Ref<MOSDPGLog> m)
 {
   const int from = m->get_source().num();
-  MLogRec log{pg_shard_t{from, m->from}, m.get()};
-  auto create_info = new PGCreateInfo{m->get_spg(),
-                                      m->get_query_epoch(),
-                                      m->info.history,
-                                      m->past_intervals,
-                                      false};
-  auto evt = std::make_unique<PGPeeringEvent>(m->get_epoch(),
-                                              m->get_query_epoch(),
-                                              std::move(log),
-                                              true,
-                                              create_info);
-  return do_peering_event(m->get_spg(), std::move(evt));
+  logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
+  return do_peering_event_and_dispatch(
+    m->get_spg(),
+    PGPeeringEventURef(m->get_event()));
 }
 
 void OSD::check_osdmap_features()
@@ -844,8 +1056,8 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
   return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
     return advance_pg_to(pg.second, epoch);
   }).then([epoch, this] {
-    auto first = waiting_peering.lower_bound(epoch);
-    auto last = waiting_peering.end();
+    auto first = waiting_peering.begin();
+    auto last = waiting_peering.upper_bound(epoch);
     std::for_each(first, last, [epoch, this](auto& blocked_requests) {
       blocked_requests.second.set_value(epoch);
     });
@@ -854,24 +1066,96 @@ seastar::future<> OSD::consume_map(epoch_t epoch)
   });
 }
 
+
+seastar::future<Ref<PG>>
+OSD::get_pg(
+  spg_t pgid,
+  epoch_t epoch,
+  std::unique_ptr<PGCreateInfo> info)
+{
+  return wait_for_map(epoch).then([this, pgid, epoch, info=std::move(info)](epoch_t) mutable {
+    if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+      return advance_pg_to(pg->second, epoch).then([pg=pg->second]() {
+       return seastar::make_ready_future<Ref<PG>>(pg);
+      });
+    } else if (!info) {
+      return seastar::make_ready_future<Ref<PG>>();
+    } else {
+      auto creating = pgs_creating.find(pgid);
+      if (creating == pgs_creating.end()) {
+       creating = pgs_creating.emplace(
+         pgid,
+         seastar::shared_future<Ref<PG>>(handle_pg_create_info(std::move(info)).then([this, pgid](auto pg) {
+           pgs_creating.erase(pgid);
+           return seastar::make_ready_future<Ref<PG>>(pg);
+         }))).first;
+      }
+      return creating->second.get_future().then([this, epoch](auto pg) {
+       return advance_pg_to(pg, epoch).then([pg]() {
+         return seastar::make_ready_future<Ref<PG>>(pg);
+       });
+      });
+    }
+  });
+}
+
+seastar::future<Ref<PG>>
+OSD::do_peering_event(
+  spg_t pgid,
+  PGPeeringEventURef evt,
+  PeeringCtx &rctx)
+{
+  return get_pg(pgid, evt->get_epoch_sent(), std::move(evt->create_info))
+    .then([this, evt=std::move(evt), &rctx](Ref<PG> pg) mutable {
+      if (pg) {
+       pg->do_peering_event(std::move(evt), rctx);
+      }
+      return seastar::make_ready_future<Ref<PG>>(pg);
+    });
+}
+
+seastar::future<bool>
+OSD::do_peering_event_and_dispatch_transaction(
+  spg_t pgid,
+  std::unique_ptr<PGPeeringEvent> evt,
+  PeeringCtx &rctx)
+{
+  return do_peering_event(pgid, std::move(evt), rctx).then(
+    [this, pgid, &rctx](Ref<PG> pg) mutable {
+      if (pg) {
+       return seastar::when_all_succeed(
+         pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+         shard_services.dispatch_context_transaction(
+           pg->get_collection_ref(), rctx)).then([] { return true; });
+      } else {
+       return seastar::make_ready_future<bool>(false);
+      }
+    });
+}
+
 seastar::future<>
-OSD::do_peering_event(spg_t pgid,
-                      std::unique_ptr<PGPeeringEvent> evt)
-{
-  if (auto pg = pgs.find(pgid); pg != pgs.end()) {
-    return wait_for_map(evt->get_epoch_sent()).then(
-      [pg=pg->second, this](epoch_t epoch) {
-        return advance_pg_to(pg, epoch);
-    }).then([pg, evt=std::move(evt)]() mutable {
-        return pg->second->do_peering_event(std::move(evt));
-    }).then([pg=pg->second, this] {
-        return _send_alive(pg->get_need_up_thru());
+OSD::do_peering_event_and_dispatch(
+  spg_t pgid,
+  std::unique_ptr<PGPeeringEvent> evt)
+{
+  return seastar::do_with(
+    PeeringCtx{},
+    [this, pgid, evt=std::move(evt)](auto &rctx) mutable {
+      return do_peering_event(pgid, std::move(evt), rctx).then(
+       [this, pgid, &rctx](Ref<PG> pg) mutable {
+         if (pg) {
+           return seastar::when_all_succeed(
+             pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+             shard_services.dispatch_context(
+               pg->get_collection_ref(), std::move(rctx)));
+         } else {
+           return seastar::now();
+         }
+       });
+    }).handle_exception([](auto ep) {
+      logger().error("do_peering_event_and_dispatch saw {}", ep);
+      return seastar::make_exception_future<>(ep);
     });
-  } else {
-    logger().warn("pg not found: {}", pgid);
-    // todo: handle_pg_query_nopg()
-    return seastar::now();
-  }
 }
 
 seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
@@ -881,7 +1165,11 @@ seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
     return seastar::make_ready_future<epoch_t>(mine);
   } else {
     logger().info("evt epoch is {}, i have {}, will wait", epoch, mine);
-    return waiting_peering[epoch].get_shared_future();
+    auto fut = waiting_peering[epoch].get_shared_future();
+    return osdmap_subscribe(osdmap->get_epoch(), true).then(
+      [fut=std::move(fut)]() mutable {
+      return std::move(fut);
+    });
   }
 }
 
@@ -889,13 +1177,24 @@ seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
 {
   auto from = pg->get_osdmap_epoch();
   // todo: merge/split support
-  return seastar::do_for_each(boost::make_counting_iterator(from + 1),
-                              boost::make_counting_iterator(to + 1),
-    [pg, this](epoch_t next_epoch) {
-      return get_map(next_epoch).then([pg, this] (cached_map_t&& next_map) {
-        return pg->handle_advance_map(next_map);
-      }).then([pg, this] {
-        return pg->handle_activate_map();
-      });
+  return seastar::do_with(
+    PeeringCtx{},
+    [this, pg, from, to](auto &rctx) {
+      return seastar::do_for_each(
+       boost::make_counting_iterator(from + 1),
+       boost::make_counting_iterator(to + 1),
+       [this, pg, &rctx](epoch_t next_epoch) {
+         return get_map(next_epoch).then(
+           [pg, this, &rctx] (cached_map_t&& next_map) {
+             return pg->handle_advance_map(next_map, rctx);
+           });
+       }).then([this, &rctx, pg] {
+         pg->handle_activate_map(rctx);
+         return seastar::when_all_succeed(
+           pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+           shard_services.dispatch_context(
+             pg->get_collection_ref(),
+             std::move(rctx)));
+       });
     });
 }
index 931aa6ea7d693d4b70f6717e57c62df6f75f5863..bcab1788108aeb84fe8a85caf74a3abbeb686f9a 100644 (file)
@@ -4,7 +4,10 @@
 #pragma once
 
 #include <map>
+#include <tuple>
+#include <optional>
 #include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
 #include <seastar/core/gate.hh>
 #include <seastar/core/shared_ptr.hh>
 #include <seastar/core/shared_future.hh>
 #include "crimson/osd/chained_dispatchers.h"
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/osd/state.h"
+#include "crimson/osd/shard_services.h"
 
+#include "osd/PeeringState.h"
 #include "osd/osd_types.h"
+#include "osd/osd_perf_counters.h"
+#include "osd/PGPeeringEvent.h"
 
 class MOSDMap;
 class MOSDOp;
 class OSDMap;
 class OSDMeta;
 class PG;
-class PGPeeringEvent;
 class Heartbeat;
 
 namespace ceph::mon {
@@ -71,7 +77,6 @@ class OSD : public ceph::net::Dispatcher,
   std::unique_ptr<ceph::os::CyanStore> store;
   std::unique_ptr<OSDMeta> meta_coll;
 
-  std::unordered_map<spg_t, Ref<PG>> pgs;
   OSDState state;
 
   /// _first_ epoch we were marked up (after this process started)
@@ -99,6 +104,9 @@ class OSD : public ceph::net::Dispatcher,
                             uint64_t global_id,
                             const AuthCapsInfo& caps) final;
 
+  ceph::osd::ShardServices shard_services;
+  std::unordered_map<spg_t, Ref<PG>> pgs;
+
 public:
   OSD(int id, uint32_t nonce,
       ceph::net::Messenger& cluster_msgr,
@@ -117,11 +125,12 @@ private:
   seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap);
   seastar::future<> _send_boot();
 
+  seastar::future<Ref<PG>> make_pg(cached_map_t create_map, spg_t pgid);
   seastar::future<Ref<PG>> load_pg(spg_t pgid);
   seastar::future<> load_pgs();
 
   epoch_t up_thru_wanted = 0;
-  seastar::future<> _send_alive(epoch_t want);
+  seastar::future<> _send_alive();
 
   // OSDMapService methods
   seastar::future<cached_map_t> get_map(epoch_t e) override;
@@ -137,6 +146,61 @@ private:
   void write_superblock(ceph::os::Transaction& t);
   seastar::future<> read_superblock();
 
+  bool require_mon_peer(ceph::net::Connection *conn, Ref<Message> m);
+
+  seastar::future<Ref<PG>> handle_pg_create_info(
+    std::unique_ptr<PGCreateInfo> info);
+
+  template <typename C, typename F, typename G>
+  seastar::future<> handle_batch_pg_message_with_missing_handler(
+    const C &c,
+    F &&f,
+    G &&on_missing_pg) {
+    using mapped_type = const typename C::value_type &;
+    using event_type = std::optional<std::tuple<
+      spg_t,
+      std::unique_ptr<PGPeeringEvent>>>;
+    return seastar::do_with(
+      PeeringCtx{},
+      std::move(f),
+      std::move(on_missing_pg),
+      [this, &c] (auto &rctx, auto &f, auto &on_missing_pg) {
+       return seastar::parallel_for_each(
+         c,
+         [this, &rctx, &f, &on_missing_pg](mapped_type m) {
+           event_type result = f(m);
+           if (result) {
+             auto [pgid, event] = std::move(*result);
+             return do_peering_event_and_dispatch_transaction(
+               pgid,
+               std::move(event),
+               rctx).then([m, &on_missing_pg, &rctx] (bool found) {
+                 if (!found) {
+                   on_missing_pg(m, rctx);
+                 }
+                 return seastar::now();
+               });
+           } else {
+             return seastar::now();
+           }
+         }).then([this, &rctx] {
+              return shard_services.dispatch_context(std::move(rctx));
+         });
+      });
+  }
+
+  template <typename C, typename F>
+  seastar::future<> handle_batch_pg_message(
+    const C &c,
+    F &&f) {
+    return handle_batch_pg_message_with_missing_handler(
+      c,
+      std::move(f),
+      [](const typename C::value_type &, PeeringCtx &){});
+  }
+
+  seastar::future<> handle_pg_create(ceph::net::Connection *conn,
+                                    Ref<MOSDPGCreate2> m);
   seastar::future<> handle_osd_map(ceph::net::Connection* conn,
                                    Ref<MOSDMap> m);
   seastar::future<> handle_osd_op(ceph::net::Connection* conn,
@@ -163,10 +227,26 @@ private:
   // wait for an osdmap whose epoch is greater or equal to given epoch
   seastar::future<epoch_t> wait_for_map(epoch_t epoch);
   seastar::future<> consume_map(epoch_t epoch);
-  seastar::future<> do_peering_event(spg_t pgid,
-                                    std::unique_ptr<PGPeeringEvent> evt);
-  seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
 
+  std::map<spg_t, seastar::shared_future<Ref<PG>>> pgs_creating;
+  seastar::future<Ref<PG>> get_pg(
+    spg_t pgid,
+    epoch_t epoch,
+    std::unique_ptr<PGCreateInfo> info);
+
+  seastar::future<Ref<PG>> do_peering_event(
+    spg_t pgid,
+    std::unique_ptr<PGPeeringEvent> evt,
+    PeeringCtx &rctx);
+  seastar::future<> do_peering_event_and_dispatch(
+    spg_t pgid,
+    std::unique_ptr<PGPeeringEvent> evt);
+  seastar::future<bool> do_peering_event_and_dispatch_transaction(
+    spg_t pgid,
+    std::unique_ptr<PGPeeringEvent> evt,
+    PeeringCtx &rctx);
+
+  seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
   bool should_restart() const;
   seastar::future<> restart();
   seastar::future<> shutdown();
index 0a3aaed3c5ed5c2becfa1d51207ed65ad66aaf32..307d3e2843cffb83e03593439f87a80005a71830 100644 (file)
@@ -11,7 +11,7 @@ class OSDMap;
 
 class OSDMapService {
 public:
-  using cached_map_t = boost::local_shared_ptr<OSDMap>;
+  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
   virtual ~OSDMapService() = default;
   virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0;
   /// get the latest map
index b4f5f57822dd01828961b790f997350607050ced..3b182c24ffaafde198ea49ca2b756ceee02afeeb 100644 (file)
@@ -33,8 +33,6 @@
 #include "crimson/osd/pg_meta.h"
 
 #include "pg_backend.h"
-#include "recovery_events.h"
-#include "recovery_state.h"
 
 namespace {
   seastar::logger& logger() {
@@ -43,933 +41,233 @@ namespace {
 }
 
 using ceph::common::local_conf;
-using recovery::AdvMap;
-using recovery::ActMap;
-using recovery::Initialize;
 
-PG::PG(spg_t pgid,
-       pg_shard_t pg_shard,
-       pg_pool_t&& pool,
-       std::string&& name,
-       std::unique_ptr<PGBackend> backend,
-       cached_map_t osdmap,
-       ceph::net::Messenger& msgr)
+class RecoverablePredicate : public IsPGRecoverablePredicate {
+public:
+  bool operator()(const set<pg_shard_t> &have) const override {
+    return !have.empty();
+  }
+};
+
+class ReadablePredicate: public IsPGReadablePredicate {
+  pg_shard_t whoami;
+public:
+  explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
+  bool operator()(const set<pg_shard_t> &have) const override {
+    return have.count(whoami);
+  }
+};
+
+PG::PG(
+  spg_t pgid,
+  pg_shard_t pg_shard,
+  pg_pool_t&& pool,
+  std::string&& name,
+  cached_map_t osdmap,
+  ceph::osd::ShardServices &shard_services,
+  ec_profile_t profile)
   : pgid{pgid},
-    whoami{pg_shard},
-    pool{std::move(pool)},
-    recovery_state{*this},
-    info{pgid},
-    backend{std::move(backend)},
+    pg_whoami{pg_shard},
+    coll_ref(shard_services.get_store().open_collection(coll)),
+    pgmeta_oid{pgid.make_pgmeta_oid()},
+    shard_services{shard_services},
     osdmap{osdmap},
-    msgr{msgr}
-{
-  // TODO
-}
-
-seastar::future<> PG::read_state(ceph::os::CyanStore* store)
-{
-  return PGMeta{store, pgid}.load().then(
-    [this](pg_info_t pg_info_, PastIntervals past_intervals_) {
-      info = std::move(pg_info_);
-      last_written_info = info;
-      past_intervals = std::move(past_intervals_);
-      // initialize current mapping
-      {
-        vector<int> new_up, new_acting;
-        int new_up_primary, new_acting_primary;
-        osdmap->pg_to_up_acting_osds(pgid.pgid,
-                                     &new_up, &new_up_primary,
-                                     &new_acting, &new_acting_primary);
-        update_primary_state(new_up, new_up_primary,
-                             new_acting, new_acting_primary);
-      }
-      info.stats.up = up;
-      info.stats.up_primary = up_primary.osd;
-      info.stats.acting = acting;
-      info.stats.acting_primary = primary.osd;
-      info.stats.mapping_epoch = info.history.same_interval_since;
-      recovery_state.handle_event(Initialize{});
-      // note: we don't activate here because we know the OSD will advance maps
-      // during boot.
-      return seastar::now();
-    });
-}
-
-void
-PG::update_primary_state(const std::vector<int>& new_up,
-                         int new_up_primary,
-                         const std::vector<int>& new_acting,
-                         int new_acting_primary)
-{
-  auto collect_pg_shards =
-    [is_erasure=pool.is_erasure()](const std::vector<int>& osds,
-                                   int osd_primary) {
-      int8_t index = 0;
-      pg_shard_set_t collected;
-      pg_shard_t pg_primary;
-      for (auto osd : osds) {
-        if (osd != CRUSH_ITEM_NONE) {
-          pg_shard_t pg_shard{
-            osd, is_erasure ? shard_id_t{index} : shard_id_t::NO_SHARD};
-          if (osd == osd_primary) {
-            pg_primary = pg_shard;
-          }
-          collected.insert(pg_shard);
-        }
-        index++;
-      }
-      return std::make_pair(collected, pg_primary);
-    };
-  acting = new_acting;
-  std::tie(actingset, primary) = collect_pg_shards(acting, new_acting_primary);
-  ceph_assert(primary.osd == new_acting_primary);
-  up = new_up;
-  std::tie(upset, up_primary) = collect_pg_shards(up, new_up_primary);
-  ceph_assert(up_primary.osd == new_up_primary);
-}
-
-epoch_t PG::get_osdmap_epoch() const
-{
-  return osdmap->get_epoch();
-}
-
-pg_shard_t PG::get_whoami() const
-{
-  return whoami;
-}
-
-const pg_info_t& PG::get_info() const
-{
-  return info;
-}
-
-const pg_stat_t& PG::get_stats() const
-{
-  return info.stats;
-}
-
-void PG::clear_state(uint64_t mask)
-{
-  if (!test_state(mask))
-    return;
-  info.stats.state &= ~mask;
-  const auto now = utime_t{coarse_real_clock::now()};
-  info.stats.last_change = now;
-  if (mask & PG_STATE_ACTIVE) {
-    info.stats.last_active = now;
-  }
-}
-
-bool PG::test_state(uint64_t mask) const
-{
-  return info.stats.state & mask;
-}
-
-void PG::set_state(uint64_t mask)
-{
-  if (test_state(mask)) {
-    return;
-  }
-  info.stats.state |= mask;
-  const auto now = utime_t{coarse_real_clock::now()};
-  info.stats.last_change = now;
-  if (mask & PG_STATE_ACTIVE) {
-    info.stats.last_became_active = now;
-    if (active_promise) {
-      std::move(active_promise)->set_value();
-      active_promise.reset();
-    }
-  }
-  if (mask & (PG_STATE_ACTIVE | PG_STATE_PEERED) &&
-      test_state(PG_STATE_ACTIVE | PG_STATE_PEERED)) {
-    info.stats.last_became_peered = now;
-  }
-  if (mask & PG_STATE_CLEAN) {
-    info.stats.last_epoch_clean = get_osdmap_epoch();
-  }
-}
-
-const PastIntervals& PG::get_past_intervals() const
-{
-  return past_intervals;
-}
-
-pg_shard_t PG::get_primary() const
-{
-  return primary;
-}
-
-bool PG::is_primary() const
-{
-  return whoami == primary;
-}
-
-
-namespace {
-  bool has_shard(bool ec, const vector<int>& osds, pg_shard_t pg_shard)
-  {
-    if (ec) {
-      return (osds.size() > static_cast<unsigned>(pg_shard.shard) &&
-              osds[pg_shard.shard] == pg_shard.osd);
-    } else {
-      return std::find(osds.begin(), osds.end(), pg_shard.osd) != osds.end();
-    }
-  }
-}
-
-bool PG::is_acting(pg_shard_t pg_shard) const
-{
-  return has_shard(pool.is_erasure(), acting, pg_shard);
-}
-
-bool PG::is_up(pg_shard_t pg_shard) const
-{
-  return has_shard(pool.is_erasure(), up, pg_shard);
-}
-
-epoch_t PG::get_last_peering_reset() const
-{
-  return last_peering_reset;
-}
-
-void PG::update_last_peering_reset()
-{
-  last_peering_reset = get_osdmap_epoch();
-}
-
-epoch_t PG::get_need_up_thru() const
-{
-  return need_up_thru;
-}
-
-void PG::update_need_up_thru(const OSDMap* o)
-{
-  if (!o) {
-    o = osdmap.get();
-  }
-  if (auto up_thru = o->get_up_thru(whoami.osd);
-      up_thru < info.history.same_interval_since) {
-    logger().info("up_thru {} < same_since {}, must notify monitor",
-                  up_thru, info.history.same_interval_since);
-    need_up_thru = info.history.same_interval_since;
-  } else {
-    logger().info("up_thru {} >= same_since {}, all is well",
-                  up_thru, info.history.same_interval_since);
-    need_up_thru = 0;
-  }
-}
-
-std::vector<int>
-PG::calc_acting(pg_shard_t auth_shard,
-                const vector<int>& acting,
-                const map<pg_shard_t, pg_info_t>& all_info) const
-{
-  // select primary
-  auto auth_log_shard = all_info.find(auth_shard);
-  auto primary = all_info.find(up_primary);
-  if (up.empty() ||
-      primary->second.is_incomplete() ||
-      primary->second.last_update < auth_log_shard->second.log_tail) {
-    ceph_assert(!auth_log_shard->second.is_incomplete());
-    logger().info("up[0] needs backfill, osd.{} selected as primary instead",
-                  auth_shard);
-    primary = auth_log_shard;
-  }
-  auto& [primary_shard_id, primary_info] = *primary;
-  logger().info("primary is osd.{} with {}",
-                primary_shard_id.osd, primary_info);
-
-  vector<int> want{primary_shard_id.osd};
-  // We include auth_log_shard->second.log_tail because in GetLog,
-  // we will request logs back to the min last_update over our
-  // acting_backfill set, which will result in our log being extended
-  // as far backwards as necessary to pick up any peers which can
-  // be log recovered by auth_log_shard's log
-  auto oldest_auth_log_entry =
-    std::min(primary_info.log_tail, auth_log_shard->second.log_tail);
-  // select replicas that have log contiguity with primary.
-  // prefer up, then acting, then any peer_info osds
-  auto get_shard = [](int osd) {
-    return pg_shard_t{osd, shard_id_t::NO_SHARD}; };
-  auto get_info = [&](int osd) -> const pg_info_t& {
-    return all_info.find(get_shard(osd))->second; };
-  auto is_good = [&, oldest_auth_log_entry](int osd) {
-    auto& info = get_info(osd);
-    return (!info.is_incomplete() &&
-            info.last_update >= oldest_auth_log_entry);
-  };
-  auto is_enough = [size=pool.get_size(), &want](int) {
-    return want.size() >= size;
-  };
-  std::vector<std::reference_wrapper<const vector<int>>> covered;
-  auto has_covered = [primary=primary_shard_id.osd, &covered](int osd) {
-    if (osd == primary)
-      return true;
-    for (auto& c : covered) {
-      if (std::find(c.get().begin(), c.get().end(), osd) != c.get().end()) {
-        return true;
-      }
-    }
-    return false;
-  };
-  boost::copy((up |
-               boost::adaptors::filtered(std::not_fn(is_enough)) |
-               boost::adaptors::filtered(std::not_fn(has_covered)) |
-               boost::adaptors::filtered(is_good)),
-              std::back_inserter(want));
-  if (is_enough(0))
-    return want;
-  covered.push_back(std::cref(up));
-  // let's select from acting. the later "last_update" is, the better
-  // sort by last_update, in descending order.
-  using cands_sorted_by_eversion_t = std::map<eversion_t,
-                                              pg_shard_t,
-                                              std::greater<eversion_t>>;
-  auto shard_to_osd = [](const pg_shard_t& shard) { return shard.osd; };
-  {
-    // This no longer has backfill OSDs, as they are covered above.
-    auto cands = boost::accumulate(
-      (acting |
-       boost::adaptors::filtered(std::not_fn(is_enough)) |
-       boost::adaptors::filtered(std::not_fn(has_covered)) |
-       boost::adaptors::filtered(is_good)),
-      cands_sorted_by_eversion_t{},
-      [&](cands_sorted_by_eversion_t& cands, int osd) {
-        cands.emplace(get_info(osd).last_update, get_shard(osd));
-        return std::move(cands);
+    backend(
+      PGBackend::create(
+        pgid,
+       pool,
+       coll_ref,
+       &shard_services.get_store(),
+       profile)),
+    peering_state(
+      shard_services.get_cct(),
+      pg_shard,
+      pgid,
+      PGPool(
+       shard_services.get_cct(),
+       osdmap,
+       pgid.pool(),
+       pool,
+       osdmap->get_pool_name(pgid.pool())),
+      osdmap,
+      this,
+      this)
+{
+  peering_state.set_backend_predicates(
+    new ReadablePredicate(pg_whoami),
+    new RecoverablePredicate());
+}
+
+bool PG::try_flush_or_schedule_async() {
+// FIXME once there's a good way to schedule an "async" peering event
+#if 0
+  shard_services.get_store().do_transaction(
+    coll_ref,
+    ObjectStore::Transaction()).then(
+      [this, epoch=peering_state.get_osdmap()->get_epoch()](){
+       if (!peering_state.pg_has_reset_since(epoch)) {
+         PeeringCtx rctx;
+         auto evt = PeeringState::IntervalFlush();
+         do_peering_event(evt, rctx);
+         return shard_services.dispatch_context(std::move(rctx));
+       } else {
+         return seastar::now();
+       }
       });
-    boost::copy(cands |
-                boost::adaptors::map_values |
-                boost::adaptors::transformed(shard_to_osd),
-                std::back_inserter(want));
-    if (is_enough(0)) {
-      return want;
-    }
-    covered.push_back(std::cref(acting));
-  }
-  // continue to search stray for more suitable peers
-  {
-    auto pi_to_osd = [](const peer_info_t::value_type& pi) {
-      return pi.first.osd; };
-    auto cands = boost::accumulate(
-      (all_info |
-       boost::adaptors::transformed(pi_to_osd) |
-       boost::adaptors::filtered(std::not_fn(is_enough)) |
-       boost::adaptors::filtered(std::not_fn(has_covered)) |
-       boost::adaptors::filtered(is_good)),
-      cands_sorted_by_eversion_t{},
-      [&](cands_sorted_by_eversion_t& cands, int osd) {
-        cands.emplace(get_info(osd).last_update, get_shard(osd));
-        return cands;
-      });
-    boost::copy(cands |
-                boost::adaptors::map_values |
-                boost::adaptors::transformed(shard_to_osd),
-                std::back_inserter(want));
-  }
-  return want;
-}
-
-bool PG::proc_replica_info(pg_shard_t from,
-                           const pg_info_t& pg_info,
-                           epoch_t send_epoch)
-{
-
-  if (auto found = peer_info.find(from);
-      found != peer_info.end() &&
-      found->second.last_update == pg_info.last_update) {
-    logger().info("got info {} from osd.{}, identical to ours",
-                  info, from);
-    return false;
-  } else if (!osdmap->has_been_up_since(from.osd, send_epoch)) {
-    logger().info("got info {} from down osd.{}. discarding",
-                  info, from);
-    return false;
-  } else {
-    logger().info("got info {} from osd.{}", info, from);
-    peer_info.emplace(from, pg_info);
-    return true;
-  }
-}
-
-void PG::proc_replica_log(pg_shard_t from,
-                          const pg_info_t& pg_info,
-                          const pg_log_t& pg_log,
-                          const pg_missing_t& pg_missing)
-{
-
-  logger().info("{} for osd.{}: {} {} {}", from, pg_info, pg_log, pg_missing);
-  peer_info.insert_or_assign(from, pg_info);
-}
-
-// Returns an iterator to the best info in infos sorted by:
-//  1) Prefer newer last_update
-//  2) Prefer longer tail if it brings another info into contiguity
-//  3) Prefer current primary
-pg_shard_t
-PG::find_best_info(const PG::peer_info_t& infos) const
-{
-  // See doc/dev/osd_internals/last_epoch_started.rst before attempting
-  // to make changes to this process.  Also, make sure to update it
-  // when you find bugs!
-  auto min_last_update_acceptable = eversion_t::max();
-  epoch_t max_les = 0;
-  for ([[maybe_unused]] auto& [shard, info] : infos) {
-    if (max_les < info.history.last_epoch_started) {
-      max_les = info.history.last_epoch_started;
-    }
-    if (!info.is_incomplete() &&
-        max_les < info.last_epoch_started) {
-      max_les = info.last_epoch_started;
-    }
-  }
-  for ([[maybe_unused]] auto& [shard, info] : infos) {
-    if (max_les <= info.last_epoch_started &&
-        min_last_update_acceptable > info.last_update) {
-      min_last_update_acceptable = info.last_update;
-    }
-  }
-  if (min_last_update_acceptable == eversion_t::max()) {
-    return pg_shard_t{};
-  }
-  // find osd with newest last_update (oldest for ec_pool).
-  // if there are multiples, prefer
-  //  - a longer tail, if it brings another peer into log contiguity
-  //  - the current primary
-  struct is_good {
-    // boost::max_element() copies the filter function, so make it copyable
-    eversion_t min_last_update_acceptable;
-    epoch_t max_les;
-    const PG* thiz;
-    is_good(eversion_t min_lua, epoch_t max_les, const PG* thiz)
-      : min_last_update_acceptable{min_lua}, max_les{max_les}, thiz{thiz} {}
-    is_good(const is_good& rhs) = default;
-    is_good& operator=(const is_good& rhs) = default;
-    bool operator()(const PG::peer_info_t::value_type& pi) const {
-      auto& [shard, info] = pi;
-      if (!thiz->is_up(shard) && !thiz->is_acting(shard)) {
-        return false;
-        // Only consider peers with last_update >= min_last_update_acceptable
-      } else if (info.last_update < min_last_update_acceptable) {
-        return false;
-        // Disqualify anyone with a too old last_epoch_started
-      } else if (info.last_epoch_started < max_les) {
-        return false;
-        // Disqualify anyone who is incomplete (not fully backfilled)
-      } else if (info.is_incomplete()) {
-        return false;
-      } else {
-        return true;
-      }
-    }
-  };
-  auto better = [require_rollback=pool.require_rollback(), this]
-    (const PG::peer_info_t::value_type& lhs,
-     const PG::peer_info_t::value_type& rhs) {
-    if (require_rollback) {
-      // prefer older last_update for ec_pool
-      if (lhs.second.last_update > rhs.second.last_update) {
-        return true;
-      } else if (lhs.second.last_update < rhs.second.last_update) {
-        return false;
-      }
-    } else {
-      // prefer newer last_update for replica pool
-      if (lhs.second.last_update > rhs.second.last_update) {
-        return false;
-      } else if (lhs.second.last_update < rhs.second.last_update) {
-        return true;
-      }
-    }
-    // Prefer longer tail
-    if (lhs.second.log_tail > rhs.second.log_tail) {
-      return true;
-    } else if (lhs.second.log_tail < rhs.second.log_tail) {
-      return false;
-    }
-    // prefer complete to missing
-    if (lhs.second.has_missing() && !rhs.second.has_missing()) {
-      return true;
-    } else if (!lhs.second.has_missing() && rhs.second.has_missing()) {
-      return false;
-    }
-    // prefer current primary (usually the caller), all things being equal
-    if (rhs.first == whoami) {
-      return true;
-    } else if (lhs.first == whoami) {
-      return false;
-    }
-    return false;
-  };
-  auto good_infos =
-    (infos | boost::adaptors::filtered(is_good{min_last_update_acceptable,
-                                               max_les, this}));
-  if (good_infos.empty()) {
-    return pg_shard_t{};
-  } else {
-    return boost::max_element(good_infos, better)->first;
-  }
-}
-
-std::pair<PG::choose_acting_t, pg_shard_t> PG::choose_acting()
-{
-  auto all_info = peer_info;
-  all_info.emplace(whoami, info);
-
-  auto auth_log_shard = find_best_info(all_info);
-  if (auth_log_shard.is_undefined()) {
-    if (up != acting) {
-      logger().info("{} no suitable info found (incomplete backfills?), "
-                    "reverting to up", __func__);
-      want_acting = up;
-      // todo: reset pg_temp
-      return {choose_acting_t::should_change, auth_log_shard};
-    } else {
-      logger().info("{} failed ", __func__);
-      ceph_assert(want_acting.empty());
-      return {choose_acting_t::pg_incomplete, auth_log_shard};
-    }
-  }
-
-  auto want = calc_acting(auth_log_shard, acting, all_info);
-  if (want != acting) {
-    logger().info("{} want {} != acting {}, requesting pg_temp change",
-                  __func__, want, acting);
-    want_acting = std::move(want);
-    // todo: update pg temp
-    return {choose_acting_t::should_change, auth_log_shard};
-  } else {
-    logger().info("{} want={}", __func__, want);
-    want_acting.clear();
-    acting_recovery_backfill.clear();
-    std::transform(want.begin(), want.end(),
-      std::inserter(acting_recovery_backfill, acting_recovery_backfill.end()),
-      [](int osd) { return pg_shard_t{osd, shard_id_t::NO_SHARD}; });
-    return {choose_acting_t::dont_change, auth_log_shard};
-  }
-}
-
-bool PG::should_send_notify() const
-{
-  return should_notify_primary && primary.osd >= 0;
-}
-
-pg_notify_t PG::get_notify(epoch_t query_epoch) const
-{
-  return pg_notify_t{primary.shard,
-                     whoami.shard,
-                     query_epoch,
-                     get_osdmap_epoch(),
-                     info};
-}
-
-bool PG::is_last_activated_peer(pg_shard_t peer)
-{
-  if (!acting_recovery_backfill.count(peer))
-    return false;
-  if (!peer_activated.insert(peer).second)
-    return false;
-  logger().info("peer osd.{} activated and committed", peer);
-  return peer_activated.size() == acting_recovery_backfill.size();
-}
-
-
-void PG::clear_primary_state()
-{
-  peer_info.clear();
-  want_acting.clear();
-  need_up_thru = 0;
-  peer_activated.clear();
+  return false;
+#endif
+  return true;
 }
 
-bool PG::should_restart_peering(int new_up_primary,
-                                int new_acting_primary,
-                                const std::vector<int>& new_up,
-                                const std::vector<int>& new_acting,
-                                cached_map_t last_map,
-                                cached_map_t osd_map) const
-{
-  auto pgid = info.pgid.pgid;
-  auto pool = last_map->get_pg_pool(pgid.pool());
-  if (!pool) {
-    return false;
-  }
-  auto new_pool = osd_map->get_pg_pool(pgid.pool());
-  if (!new_pool) {
-    return true;
-  }
-  if (PastIntervals::is_new_interval(
-        primary.osd,
-        new_acting_primary,
-        acting,
-        new_acting,
-        up_primary.osd,
-        new_up_primary,
-        up,
-        new_up,
-        pool->size,
-        new_pool->size,
-        pool->min_size,
-        new_pool->min_size,
-        pool->get_pg_num(),
-        new_pool->get_pg_num(),
-        pool->get_pg_num_pending(),
-        new_pool->get_pg_num_pending(),
-        last_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
-        osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
-        last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
-        osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
-        pgid)) {
-    logger().info("new interval new_up {} new_acting {}",
-                  new_up, new_acting);
-    return true;
-  }
-  if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) {
-    logger().info(" osd transitioned from down -> up");
-    return true;
-  }
-  return false;
+void PG::log_state_enter(const char *state) {
+  logger().info("Entering state: {}", state);
 }
 
-template<class T>
-bool compare_n_set(T& v, const T& new_v)
-{
-  if (v != new_v) {
-    v = new_v;
-    return true;
-  } else {
-    return false;
-  }
+void PG::log_state_exit(
+  const char *state_name, utime_t enter_time,
+  uint64_t events, utime_t event_dur) {
+  logger().info(
+    "Exiting state: {}, entered at {}, {} spent on {} events",
+    state_name,
+    enter_time,
+    event_dur,
+    events);
 }
 
-void PG::start_peering_interval(int new_up_primary,
-                                int new_acting_primary,
-                                const std::vector<int>& new_up,
-                                const std::vector<int>& new_acting,
-                                cached_map_t last_map)
+void PG::init(
+  ceph::os::CollectionRef coll,
+  int role,
+  const vector<int>& newup, int new_up_primary,
+  const vector<int>& newacting, int new_acting_primary,
+  const pg_history_t& history,
+  const PastIntervals& pi,
+  bool backfill,
+  ObjectStore::Transaction &t)
 {
-  // todo
-  update_last_peering_reset();
-
-  auto old_acting_primary = primary;
-  auto old_acting = std::move(acting);
-  auto old_up_primary = up_primary;
-  auto old_up = std::move(up);
-
-  update_primary_state(new_up, new_up_primary,
-                       new_acting, new_acting_primary);
-  if (compare_n_set(info.stats.up, up) +
-      compare_n_set(info.stats.up_primary, up_primary.osd) +
-      compare_n_set(info.stats.acting, acting) +
-      compare_n_set(info.stats.acting_primary, primary.osd)) {
-    info.stats.mapping_epoch = osdmap->get_epoch();
-  }
-  if (old_up_primary != up_primary || old_up != up) {
-    info.history.same_up_since = osdmap->get_epoch();
-  }
-  // this comparison includes primary rank via pg_shard_t
-  if (old_acting_primary != get_primary()) {
-    info.history.same_primary_since = osdmap->get_epoch();
-  }
-  // todo: always start a new interval
-  info.history.same_interval_since = osdmap->get_epoch();
-  // This will now be remapped during a backfill in cases
-  // that it would not have been before.
-  if (up != acting) {
-    set_state(PG_STATE_REMAPPED);
-  } else {
-    clear_state(PG_STATE_REMAPPED);
-  }
-  // deactivate.
-  clear_state(PG_STATE_ACTIVE);
-  clear_state(PG_STATE_PEERED);
-  clear_state(PG_STATE_DOWN);
-
-  acting_recovery_backfill.clear();
-  // should we tell the primary we are here?
-  should_notify_primary = !is_primary();
+  coll_ref = coll;
+  peering_state.init(
+    role, newup, new_up_primary, newacting,
+    new_acting_primary, history, pi, backfill, t);
 }
 
-void PG::activate(epoch_t activation_epoch)
+seastar::future<> PG::read_state(ceph::os::CyanStore* store)
 {
-  clear_state(PG_STATE_DOWN);
+  coll_ref = store->open_collection(coll_t(pgid));
+  return PGMeta{store, pgid}.load().then(
+    [this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+      return peering_state.init_from_disk_state(
+       std::move(pg_info),
+       std::move(past_intervals),
+       [this, store, &pg_info] (PGLog &pglog) {
+         return pglog.read_log_and_missing_crimson(
+           *store,
+           coll_ref,
+           peering_state.get_info(),
+           pgmeta_oid);
+       });
+    }).then([this, store]() {
+      int primary, up_primary;
+      vector<int> acting, up;
+      peering_state.get_osdmap()->pg_to_up_acting_osds(
+       pgid.pgid, &up, &up_primary, &acting, &primary);
+      peering_state.init_primary_up_acting(
+       up,
+       acting,
+       up_primary,
+       primary);
+      int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
+      if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
+       peering_state.set_role(rr);
+      else
+       peering_state.set_role(-1);
+
+      PeeringCtx rctx;
+      PeeringState::Initialize evt;
+      peering_state.handle_event(evt, &rctx);
+      peering_state.write_if_dirty(rctx.transaction);
+      store->do_transaction(
+       coll_ref,
+       std::move(rctx.transaction));
 
-  if (is_primary()) {
-    // only update primary last_epoch_started if we will go active
-    if (acting.size() >= pool.min_size) {
-      info.last_epoch_started = activation_epoch;
-      info.last_interval_started = info.history.same_interval_since;
-    }
-  } else if (is_acting(whoami)) {
-    // update last_epoch_started on acting replica to whatever the primary sent
-    // unless it's smaller (could happen if we are going peered rather than
-    // active, see doc/dev/osd_internals/last_epoch_started.rst)
-    if (info.last_epoch_started < activation_epoch) {
-      info.last_epoch_started = activation_epoch;
-      info.last_interval_started = info.history.same_interval_since;
-    }
-  }
-  if (is_primary()) {
-    // start up replicas
-    seastar::do_for_each(
-      acting_recovery_backfill.begin(),
-      acting_recovery_backfill.end(),
-      [this](pg_shard_t peer) { return activate_peer(peer); });
-    set_state(PG_STATE_ACTIVATING);
-  } else {
-    // todo: write/commit pg log, activate myself, and then tell primary
-    on_activated();
-    pg_notify_t notify{get_primary().shard,
-                       whoami.shard,
-                       get_osdmap_epoch(),
-                       get_osdmap_epoch(),
-                       info};
-    auto m = make_message<MOSDPGInfo>(
-      get_osdmap_epoch(),
-      MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})});
-    send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch());
-  }
-  // todo:
-  info.last_complete = info.last_update;
-  update_need_up_thru();
+      return seastar::now();
+    });
 }
 
-void PG::on_activated()
+void PG::do_peering_event(
+  const boost::statechart::event_base &evt,
+  PeeringCtx &rctx)
 {
-  if (acting.size() >= pool.min_size) {
-    set_state(PG_STATE_ACTIVE);
-  } else {
-    set_state(PG_STATE_PEERED);
-  }
+  peering_state.handle_event(
+    evt,
+    &rctx);
 }
 
-seastar::future<> PG::activate_peer(pg_shard_t peer)
+void PG::do_peering_event(
+  PGPeeringEvent& evt, PeeringCtx &rctx)
 {
-  if (peer == whoami) {
-    // todo: write/commit pg log
-    peer_activated.insert(whoami);
-    return seastar::now();
-  }
-  auto& pi = peer_info[peer];
-  MOSDPGLog* m = nullptr;
-  if (pi.last_update == info.last_update) {
-    // empty log
-    logger().info("activate peer osd.{} is up to date, "
-                  "but sending pg_log anyway", peer);
-    m = new MOSDPGLog{peer.shard,
-                      whoami.shard,
-                      get_osdmap_epoch(),
-                      get_info(),
-                      get_last_peering_reset()};
-  } else if (pi.last_backfill.is_min()) {
-    logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer,
-                  pi.log_tail, pi.last_update,
-                  pi.last_backfill, info.last_update);
-    // backfill
-    pi.last_update = info.last_update;
-    pi.last_complete = info.last_update;
-    pi.last_epoch_started = info.last_epoch_started;
-    pi.last_interval_started = info.last_interval_started;
-    pi.history = info.history;
-    pi.hit_set = info.hit_set;
-    pi.stats.stats.clear();
-    pi.purged_snaps = info.purged_snaps;
-    m = new MOSDPGLog{peer.shard,
-                      whoami.shard,
-                      get_osdmap_epoch(),
-                      pi,
-                      get_last_peering_reset()};
+  if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
+    logger().debug("{} handling {}", __func__, evt.get_desc());
+    return do_peering_event(evt.get_event(), rctx);
   } else {
-    // catch up
-    logger().info("send missing log to osd.{}", peer);
-    m = new MOSDPGLog{peer.shard,
-                      whoami.shard,
-                      get_osdmap_epoch(),
-                      get_info(),
-                      get_last_peering_reset()};
-    // todo. send pg_log
-    pi.last_update = info.last_update;
-  }
-  return send_to_osd(peer.osd, Ref<Message>{m, false}, get_osdmap_epoch());
-}
-
-void PG::maybe_mark_clean()
-{
-  if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) {
-    set_state(PG_STATE_CLEAN);
-    info.history.last_epoch_clean = get_osdmap_epoch();
-    info.history.last_interval_clean = info.history.same_interval_since;
+    logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
   }
 }
 
-seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
+void PG::handle_advance_map(
+  cached_map_t next_map, PeeringCtx &rctx)
 {
-  return dispatch_context(recovery_state.handle_event(evt->get_event()));
+  vector<int> newup, newacting;
+  int up_primary, acting_primary;
+  next_map->pg_to_up_acting_osds(
+    pgid.pgid,
+    &newup, &up_primary,
+    &newacting, &acting_primary);
+  peering_state.advance_map(
+    next_map,
+    peering_state.get_osdmap(),
+    newup,
+    up_primary,
+    newacting,
+    acting_primary,
+    rctx);
 }
 
-seastar::future<> PG::dispatch_context(recovery::Context&& ctx)
+void PG::handle_activate_map(PeeringCtx &rctx)
 {
-  return seastar::do_with(recovery::Context{ctx}, [this](auto& todo) {
-    return seastar::when_all_succeed(
-      seastar::parallel_for_each(std::move(todo.notifies),
-        [this](auto& osd_notifies) {
-          auto& [peer, notifies] = osd_notifies;
-          auto m = make_message<MOSDPGNotify>(get_osdmap_epoch(),
-                                              std::move(notifies));
-          return send_to_osd(peer, m, get_osdmap_epoch());
-        }),
-      seastar::parallel_for_each(std::move(todo.queries),
-        [this](auto& osd_queries) {
-          auto& [peer, queries] = osd_queries;
-          auto m = make_message<MOSDPGQuery>(get_osdmap_epoch(),
-                                             std::move(queries));
-          return send_to_osd(peer, m, get_osdmap_epoch());
-        }),
-      seastar::parallel_for_each(std::move(todo.infos),
-        [this](auto& osd_infos) {
-          auto& [peer, infos] = osd_infos;
-          auto m = make_message<MOSDPGInfo>(get_osdmap_epoch(),
-                                            std::move(infos));
-          return send_to_osd(peer, m, get_osdmap_epoch());
-        })
-    );
-  });
+  peering_state.activate_map(rctx);
 }
 
-seastar::future<> PG::handle_advance_map(cached_map_t next_map)
+void PG::handle_initialize(PeeringCtx &rctx)
 {
-  auto last_map = std::move(osdmap);
-  osdmap = std::move(next_map);
-  vector<int> new_up, new_acting;
-  int up_primary, acting_primary;
-  osdmap->pg_to_up_acting_osds(pgid.pgid,
-                               &new_up,
-                               &up_primary,
-                               &new_acting,
-                               &acting_primary);
-  logger().info("handle_advance_map {}/{} -- {}/{}",
-                new_up, new_acting, up_primary, acting_primary);
-  recovery_state.handle_event(AdvMap{osdmap, last_map,
-                                     std::move(new_up), up_primary,
-                                     std::move(new_acting), acting_primary});
-  return seastar::now();
+  PeeringState::Initialize evt;
+  peering_state.handle_event(evt, &rctx);
 }
 
-seastar::future<> PG::handle_activate_map()
-{
-  recovery_state.handle_event(ActMap{});
-  return seastar::now();
-}
 
 void PG::print(ostream& out) const
 {
-  out << "pg[" << info
-      << " " << up;
-  if (acting != up)
-    out << "/" << acting;
-  out << " lpr=" << last_peering_reset
-      << " " << pg_state_string(info.stats.state)
-      << "]";
+  out << peering_state << " ";
 }
 
 
 std::ostream& operator<<(std::ostream& os, const PG& pg)
 {
+  os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
   pg.print(os);
   return os;
 }
 
-void PG::reply_pg_query(const MQuery& query, recovery::Context* ctx)
-{
-  switch (query.query.type) {
-  case pg_query_t::INFO:
-    return reply_pg_query_for_info(query, ctx);
-  case pg_query_t::LOG:
-    return reply_pg_query_for_log(query, false);
-  case pg_query_t::FULLLOG:
-    return reply_pg_query_for_log(query, true);
-  }
-}
-
-void PG::reply_pg_query_for_info(const MQuery& query, recovery::Context* ctx)
-{
-  recovery::Context::notify_t notify{pg_notify_t{query.from.shard,
-                                                 whoami.shard,
-                                                 query.query_epoch,
-                                                 get_osdmap_epoch(),
-                                                 info},
-                                     past_intervals};
-  ctx->notifies[query.from.osd].push_back(std::move(notify));
-}
-
-void PG::reply_pg_query_for_log(const MQuery& query, bool full)
-{
-  auto m = make_message<MOSDPGLog>(query.from.shard,
-                                   whoami.shard,
-                                   get_osdmap_epoch(),
-                                   info,
-                                   query.query_epoch);
-  // todo:
-  // m->missing = pg_log.get_missing();
-  if (full) {
-    // m->log = pg_log.get_log();
-  } else {
-    // maybe partial
-  }
-  send_to_osd(query.from.osd, m, get_osdmap_epoch());
-}
-
-seastar::future<> PG::send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch)
-{
-  if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
-    return seastar::now();
-  } else {
-    return msgr.connect(osdmap->get_cluster_addrs(peer).front(),
-                        CEPH_ENTITY_TYPE_OSD)
-     .then([m, this] (auto xconn) {
-       return (*xconn)->send(m);
-     });
-  }
-}
-
-seastar::future<> PG::share_pg_info()
-{
-  return seastar::do_for_each(
-    acting_recovery_backfill.begin(),
-    acting_recovery_backfill.end(),
-    [this](pg_shard_t peer) {
-      if (peer == whoami) return seastar::now();
-      if (auto pi = peer_info.find(peer); pi != peer_info.end()) {
-        pi->second.last_epoch_started = info.last_epoch_started;
-        pi->second.last_interval_started = info.last_interval_started;
-        pi->second.history.merge(info.history);
-      }
-      pg_notify_t notify{peer.shard,
-                         whoami.shard,
-                         get_osdmap_epoch(),
-                         get_osdmap_epoch(),
-                         info};
-      auto m = make_message<MOSDPGInfo>(
-        get_osdmap_epoch(),
-        MOSDPGInfo::pg_list_t{make_pair(std::move(notify),
-                                        past_intervals)});
-      return send_to_osd(peer.osd, m, get_osdmap_epoch());
-    });
-}
-
 seastar::future<> PG::wait_for_active()
 {
-  logger().debug("wait_for_active: {}", pg_state_string(info.stats.state));
+  logger().debug("wait_for_active: {}", peering_state.get_pg_state_string());
   if (local_conf()->crimson_debug_pg_always_active) {
     return seastar::now();
   }
-  if (test_state(PG_STATE_ACTIVE)) {
+
+  if (peering_state.is_active()) {
     return seastar::now();
   } else {
-    if (!active_promise) {
-      active_promise = seastar::shared_promise<>();
-    }
-    return active_promise->get_shared_future();
+    return active_promise.get_shared_future();
   }
 }
 
@@ -1028,7 +326,7 @@ seastar::future<bufferlist> PG::do_pgnls(bufferlist& indata,
     throw std::invalid_argument("unable to decode PGNLS handle");
   }
   const auto pg_start = pgid.pgid.get_hobj_start();
-  const auto pg_end = pgid.pgid.get_hobj_end(pool.get_pg_num());
+  const auto pg_end = pgid.pgid.get_hobj_end(peering_state.get_pool().info.get_pg_num());
   if (!(lower_bound.is_min() ||
         lower_bound.is_max() ||
         (lower_bound >= pg_start && lower_bound < pg_end))) {
@@ -1089,8 +387,8 @@ seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
       backend->evict_object_state(oid);
       auto reply = make_message<MOSDOpReply>(m.get(), -ENOENT, get_osdmap_epoch(),
                                              0, false);
-      reply->set_enoent_reply_versions(info.last_update,
-                                       info.last_user_version);
+      reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+                                       peering_state.get_info().last_user_version);
       return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
     });
   });
index 2e3e7f519f231b2fc08f5ddc7a35b584f3e74485..211fc77053c4f2984863bb16c4355f7a77c54870 100644 (file)
 #include <seastar/core/future.hh>
 #include <seastar/core/shared_future.hh>
 
+#include "common/dout.h"
 #include "crimson/net/Fwd.h"
 #include "os/Transaction.h"
+#include "crimson/osd/shard_services.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
-#include "recovery_state.h"
+#include "osd/PeeringState.h"
 
 template<typename T> using Ref = boost::intrusive_ptr<T>;
 class OSDMap;
@@ -36,99 +38,378 @@ namespace ceph::os {
 
 class PG : public boost::intrusive_ref_counter<
   PG,
-  boost::thread_unsafe_counter>
+  boost::thread_unsafe_counter>,
+  PeeringState::PeeringListener,
+  DoutPrefixProvider
 {
   using ec_profile_t = std::map<std::string,std::string>;
-  using cached_map_t = boost::local_shared_ptr<OSDMap>;
+  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
 
+  spg_t pgid;
+  pg_shard_t pg_whoami;
+  coll_t coll;
+  ceph::os::CollectionRef coll_ref;
+  ghobject_t pgmeta_oid;
 public:
   PG(spg_t pgid,
      pg_shard_t pg_shard,
      pg_pool_t&& pool,
      std::string&& name,
-     std::unique_ptr<PGBackend> backend,
      cached_map_t osdmap,
-     ceph::net::Messenger& msgr);
-
-  epoch_t get_osdmap_epoch() const;
-  const pg_info_t& get_info() const;
-  const pg_stat_t& get_stats() const;
-  void clear_state(uint64_t mask);
-  bool test_state(uint64_t mask) const;
-  void set_state(uint64_t mask);
-  const PastIntervals& get_past_intervals() const;
-  pg_shard_t get_primary() const;
-  bool is_primary() const;
-  bool is_acting(pg_shard_t pg_shard) const;
-  bool is_up(pg_shard_t pg_shard) const;
-  pg_shard_t get_whoami() const;
-  epoch_t get_last_peering_reset() const;
-  void update_last_peering_reset();
-  epoch_t get_need_up_thru() const;
-  void update_need_up_thru(const OSDMap* o = nullptr);
-
-  bool proc_replica_info(pg_shard_t from,
-                        const pg_info_t& pg_info,
-                        epoch_t send_epoch);
-  void proc_replica_log(pg_shard_t from,
-                       const pg_info_t& pg_info,
-                       const pg_log_t& pg_log,
-                       const pg_missing_t& pg_missing);
-
-  using peer_info_t = std::map<pg_shard_t, pg_info_t>;
-  pg_shard_t find_best_info(const PG::peer_info_t& infos) const;
-  enum class choose_acting_t {
-    dont_change,
-    should_change,
-    pg_incomplete,
+     ceph::osd::ShardServices &shard_services,
+     ec_profile_t profile);
+
+  // EpochSource
+  epoch_t get_osdmap_epoch() const final {
+    return peering_state.get_osdmap_epoch();
+  }
+
+  // DoutPrefixProvider
+  std::ostream& gen_prefix(std::ostream& out) const final {
+    return out << *this;
+  }
+  CephContext *get_cct() const final {
+    return shard_services.get_cct();
+  }
+  unsigned get_subsys() const final {
+    return ceph_subsys_osd;
+  }
+
+  ceph::os::CollectionRef get_collection_ref() {
+    return coll_ref;
+  }
+
+  // PeeringListener
+  void prepare_write(
+    pg_info_t &info,
+    pg_info_t &last_written_info,
+    PastIntervals &past_intervals,
+    PGLog &pglog,
+    bool dirty_info,
+    bool dirty_big_info,
+    bool need_write_epoch,
+    ObjectStore::Transaction &t) final {
+    std::map<string,bufferlist> km;
+    if (dirty_big_info || dirty_info) {
+      int ret = prepare_info_keymap(
+       shard_services.get_cct(),
+       &km,
+       peering_state.get_osdmap()->get_epoch(),
+       info,
+       last_written_info,
+       past_intervals,
+       dirty_big_info,
+       need_write_epoch,
+       true,
+       nullptr,
+       this);
+      ceph_assert(ret == 0);
+    }
+    pglog.write_log_and_missing(
+      t, &km, coll, pgmeta_oid,
+      peering_state.get_pool().info.require_rollback());
+    if (!km.empty())
+      t.omap_setkeys(coll, pgmeta_oid, km);
+  }
+
+  void on_info_history_change() final {
+    // Not needed yet -- mainly for scrub scheduling
+  }
+
+  void scrub_requested(bool deep, bool repair) final {
+    ceph_assert(0 == "Not implemented");
+  }
+
+  uint64_t get_snap_trimq_size() const final {
+    return 0;
+  }
+
+  void send_cluster_message(
+    int osd, Message *m,
+    epoch_t epoch, bool share_map_update=false) final {
+    shard_services.send_to_osd(osd, m, epoch);
+  }
+
+  void send_pg_created(pg_t pgid) final {
+    shard_services.send_pg_created(pgid);
+  }
+
+  bool try_flush_or_schedule_async() final;
+
+  void start_flush_on_transaction(
+    ObjectStore::Transaction &t) final {
+    t.register_on_commit(
+      new LambdaContext([this](){
+       peering_state.complete_flush();
+    }));
+  }
+
+  void on_flushed() final {
+    // will be needed for unblocking IO operations/peering
+  }
+
+  void schedule_event_after(
+    PGPeeringEventRef event,
+    float delay) final {
+    ceph_assert(0 == "Not implemented yet");
+  }
+
+  void request_local_background_io_reservation(
+    unsigned priority,
+    PGPeeringEventRef on_grant,
+    PGPeeringEventRef on_preempt) final {
+    ceph_assert(0 == "Not implemented yet");
+  }
+
+  void update_local_background_io_priority(
+    unsigned priority) final {
+    ceph_assert(0 == "Not implemented yet");
+  }
+
+  void cancel_local_background_io_reservation() final {
+    // Not implemented yet, but gets called on exit() from some states
+  }
+
+  void request_remote_recovery_reservation(
+    unsigned priority,
+    PGPeeringEventRef on_grant,
+    PGPeeringEventRef on_preempt) final {
+    ceph_assert(0 == "Not implemented yet");
+  }
+
+  void cancel_remote_recovery_reservation() final {
+    // Not implemented yet, but gets called on exit() from some states
+  }
+
+  void schedule_event_on_commit(
+    ObjectStore::Transaction &t,
+    PGPeeringEventRef on_commit) final {
+    t.register_on_commit(
+      new LambdaContext([this, on_commit](){
+       PeeringCtx rctx;
+        do_peering_event(on_commit, rctx);
+       shard_services.dispatch_context(std::move(rctx));
+    }));
+  }
+
+  void update_heartbeat_peers(set<int> peers) final {
+    // Not needed yet
+  }
+  void set_probe_targets(const set<pg_shard_t> &probe_set) final {
+    // Not needed yet
+  }
+  void clear_probe_targets() final {
+    // Not needed yet
+  }
+  void queue_want_pg_temp(const std::vector<int> &wanted) final {
+    shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+  }
+  void clear_want_pg_temp() final {
+    shard_services.remove_want_pg_temp(pgid.pgid);
+  }
+  void publish_stats_to_osd() final {
+    // Not needed yet
+  }
+  void clear_publish_stats() final {
+    // Not needed yet
+  }
+  void check_recovery_sources(const OSDMapRef& newmap) final {
+    // Not needed yet
+  }
+  void check_blacklisted_watchers() final {
+    // Not needed yet
+  }
+  void clear_primary_state() final {
+    // Not needed yet
+  }
+
+
+  void on_pool_change() final {
+    // Not needed yet
+  }
+  void on_role_change() final {
+    // Not needed yet
+  }
+  void on_change(ObjectStore::Transaction &t) final {
+    // Not needed yet
+  }
+  void on_activate(interval_set<snapid_t> to_trim) final {
+    // Not needed yet (will be needed for IO unblocking)
+  }
+  void on_activate_complete() final {
+    active_promise.set_value();
+    active_promise = {};
+  }
+  void on_new_interval() final {
+    // Not needed yet
+  }
+  Context *on_clean() final {
+    // Not needed yet (will be needed for IO unblocking)
+    return nullptr;
+  }
+  void on_activate_committed() final {
+    // Not needed yet (will be needed for IO unblocking)
+  }
+  void on_active_exit() final {
+    // Not needed yet
+  }
+
+  void on_removal(ObjectStore::Transaction &t) final {
+    // TODO
+  }
+  void do_delete_work(ObjectStore::Transaction &t) final {
+    // TODO
+  }
+
+  // merge/split not ready
+  void clear_ready_to_merge() final {}
+  void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
+  void set_not_ready_to_merge_source(pg_t pgid) final {}
+  void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
+  void set_ready_to_merge_source(eversion_t lu) final {}
+
+  void on_active_actmap() final {
+    // Not needed yet
+  }
+  void on_active_advmap(const OSDMapRef &osdmap) final {
+    // Not needed yet
+  }
+  epoch_t oldest_stored_osdmap() final {
+    // TODO
+    return 0;
+  }
+
+
+  void on_backfill_reserved() final {
+    ceph_assert(0 == "Not implemented");
+  }
+  void on_backfill_canceled() final {
+    ceph_assert(0 == "Not implemented");
+  }
+  void on_recovery_reserved() final {
+    ceph_assert(0 == "Not implemented");
+  }
+
+
+  bool try_reserve_recovery_space(
+    int64_t primary_num_bytes, int64_t local_num_bytes) final {
+    return true;
+  }
+  void unreserve_recovery_space() final {}
+
+  struct PGLogEntryHandler : public PGLog::LogEntryHandler {
+    PG *pg;
+    ObjectStore::Transaction *t;
+    PGLogEntryHandler(PG *pg, ObjectStore::Transaction *t) : pg(pg), t(t) {}
+
+    // LogEntryHandler
+    void remove(const hobject_t &hoid) override {
+      // TODO
+    }
+    void try_stash(const hobject_t &hoid, version_t v) override {
+      // TODO
+    }
+    void rollback(const pg_log_entry_t &entry) override {
+      // TODO
+    }
+    void rollforward(const pg_log_entry_t &entry) override {
+      // TODO
+    }
+    void trim(const pg_log_entry_t &entry) override {
+      // TODO
+    }
   };
-  std::vector<int>
-  calc_acting(pg_shard_t auth_shard,
-             const vector<int>& acting,
-             const map<pg_shard_t, pg_info_t>& all_info) const;
-  std::pair<choose_acting_t, pg_shard_t> choose_acting();
+  PGLog::LogEntryHandlerRef get_log_handler(
+    ObjectStore::Transaction &t) final {
+    return std::make_unique<PG::PGLogEntryHandler>(this, &t);
+  }
+
+  void rebuild_missing_set_with_deletes(PGLog &pglog) final {
+    ceph_assert(0 == "Impossible for crimson");
+  }
+
+  PerfCounters &get_peering_perf() final {
+    return shard_services.get_recoverystate_perf_logger();
+  }
+  PerfCounters &get_perf_logger() final {
+    return shard_services.get_perf_logger();
+  }
+
+  void log_state_enter(const char *state) final;
+  void log_state_exit(
+    const char *state_name, utime_t enter_time,
+    uint64_t events, utime_t event_dur) final;
+
+  void dump_recovery_info(Formatter *f) const final {
+  }
+
+  OstreamTemp get_clog_info() final {
+    // not needed yet: replace with not a stub (needs to be wired up to monc)
+    return OstreamTemp(CLOG_INFO, nullptr);
+  }
+  OstreamTemp get_clog_debug() final {
+    // not needed yet: replace with not a stub (needs to be wired up to monc)
+    return OstreamTemp(CLOG_DEBUG, nullptr);
+  }
+  OstreamTemp get_clog_error() final {
+    // not needed yet: replace with not a stub (needs to be wired up to monc)
+    return OstreamTemp(CLOG_ERROR, nullptr);
+  }
+
+  // Utility
+  bool is_primary() const {
+    return peering_state.is_primary();
+  }
+  pg_stat_t get_stats() {
+    auto stats = peering_state.prepare_stats_for_publish(
+      false,
+      pg_stat_t(),
+      object_stat_collection_t());
+    ceph_assert(stats);
+    return *stats;
+  }
+  bool get_need_up_thru() const {
+    return peering_state.get_need_up_thru();
+  }
+
+  /// initialize created PG
+  void init(
+    ceph::os::CollectionRef coll_ref,
+    int role,
+    const std::vector<int>& up,
+    int up_primary,
+    const std::vector<int>& acting,
+    int acting_primary,
+    const pg_history_t& history,
+    const PastIntervals& pim,
+    bool backfill,
+    ObjectStore::Transaction &t);
+
   seastar::future<> read_state(ceph::os::CyanStore* store);
 
-  // peering/recovery
-  bool should_send_notify() const;
-  pg_notify_t get_notify(epoch_t query_epoch) const;
-  bool is_last_activated_peer(pg_shard_t peer);
-  void clear_primary_state();
-
-  bool should_restart_peering(int new_up_primary,
-                             int new_acting_primary,
-                             const std::vector<int>& new_up,
-                             const std::vector<int>& new_acting,
-                             cached_map_t last_map,
-                             cached_map_t osd_map) const;
-  void start_peering_interval(int new_up_primary,
-                             int new_acting_primary,
-                             const std::vector<int>& new_up,
-                             const std::vector<int>& new_acting,
-                             cached_map_t last_map);
-  void activate(epoch_t activation_epoch);
-  void on_activated();
-  void maybe_mark_clean();
-
-  seastar::future<> do_peering_event(std::unique_ptr<PGPeeringEvent> evt);
-  seastar::future<> dispatch_context(recovery::Context&& ctx);
-  seastar::future<> handle_advance_map(cached_map_t next_map);
-  seastar::future<> handle_activate_map();
-  seastar::future<> share_pg_info();
-  void reply_pg_query(const MQuery& query, recovery::Context* ctx);
+  void do_peering_event(
+    const boost::statechart::event_base &evt,
+    PeeringCtx &rctx);
+  void do_peering_event(
+    PGPeeringEvent& evt, PeeringCtx &rctx);
+  void do_peering_event(
+    std::unique_ptr<PGPeeringEvent> evt,
+    PeeringCtx &rctx) {
+    return do_peering_event(*evt, rctx);
+  }
+  void do_peering_event(
+    PGPeeringEventRef evt,
+    PeeringCtx &rctx) {
+    return do_peering_event(*evt, rctx);
+  }
+
+  void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+  void handle_activate_map(PeeringCtx &rctx);
+  void handle_initialize(PeeringCtx &rctx);
   seastar::future<> handle_op(ceph::net::Connection* conn,
                              Ref<MOSDOp> m);
   void print(ostream& os) const;
 private:
-  seastar::future<> activate_peer(pg_shard_t peer);
-  void reply_pg_query_for_info(const MQuery& query, recovery::Context* ctx);
-  void reply_pg_query_for_log(const MQuery& query, bool full);
-  seastar::future<> send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch);
-
-  void update_primary_state(const std::vector<int>& new_up,
-                           int new_up_primary,
-                           const std::vector<int>& new_acting,
-                           int new_acting_primary);
   seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
   seastar::future<> do_osd_op(
     ObjectState& os,
@@ -139,39 +420,17 @@ private:
                                             uint64_t limit);
 
 private:
-  const spg_t pgid;
-  pg_shard_t whoami;
-  pg_pool_t pool;
-
-  epoch_t last_peering_reset = 0;
-  epoch_t need_up_thru = 0;
-  recovery::State recovery_state;
-
-  bool should_notify_primary = false;
-
-  using pg_shard_set_t = std::set<pg_shard_t>;
-  // peer_info    -- projected (updates _before_ replicas ack)
-  peer_info_t peer_info; //< info from peers (stray or prior)
-  pg_shard_set_t peer_activated;
-
-  //< pg state
-  pg_info_t info;
-  //< last written info, for fast info persistence
-  pg_info_t last_written_info;
-  PastIntervals past_intervals;
-  // primary state
-  pg_shard_t primary, up_primary;
-  std::vector<int> acting, up;
-  pg_shard_set_t actingset, upset;
-  pg_shard_set_t acting_recovery_backfill;
-  std::vector<int> want_acting;
+  ceph::osd::ShardServices &shard_services;
 
-  seastar::future<> wait_for_active();
-  std::optional<seastar::shared_promise<>> active_promise;
+  cached_map_t osdmap;
   std::unique_ptr<PGBackend> backend;
 
-  cached_map_t osdmap;
-  ceph::net::Messenger& msgr;
+  PeeringState peering_state;
+
+  seastar::shared_promise<> active_promise;
+  seastar::future<> wait_for_active();
+
+  friend std::ostream& operator<<(std::ostream&, const PG& pg);
 };
 
 std::ostream& operator<<(std::ostream&, const PG& pg);
index 34867be1d555e36437016ad89bdcf6a826ec583e..e2e27b45b04a9280da67007030bcd1588109a685 100644 (file)
@@ -24,10 +24,10 @@ namespace {
 
 std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
                                              const pg_pool_t& pool,
+                                            ceph::os::CollectionRef coll,
                                              ceph::os::CyanStore* store,
                                              const ec_profile_t& ec_profile)
 {
-  auto coll = store->open_collection(coll_t{pgid});
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED:
     return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
index 5edafeb2068fb2ad97fab16498fff41e08e0435b..a0d16bb24f41a4a8d94a0abfbeef0652df5b2799 100644 (file)
@@ -8,16 +8,13 @@
 #include <string>
 #include <boost/smart_ptr/local_shared_ptr.hpp>
 
+#include "crimson/os/cyan_store.h"
 #include "crimson/common/shared_lru.h"
 #include "os/Transaction.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
 
 struct hobject_t;
-namespace ceph::os {
-  class Collection;
-  class CyanStore;
-}
 
 class PGBackend
 {
@@ -30,6 +27,7 @@ public:
   virtual ~PGBackend() = default;
   static std::unique_ptr<PGBackend> create(const spg_t pgid,
                                           const pg_pool_t& pool,
+                                          ceph::os::CollectionRef coll,
                                           ceph::os::CyanStore* store,
                                           const ec_profile_t& ec_profile);
   using cached_os_t = boost::local_shared_ptr<ObjectState>;
diff --git a/src/crimson/osd/recovery_events.h b/src/crimson/osd/recovery_events.h
deleted file mode 100644 (file)
index 9653ad9..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <vector>
-
-#include <boost/smart_ptr/local_shared_ptr.hpp>
-#include <boost/statechart/custom_reaction.hpp>
-#include <boost/statechart/event.hpp>
-#include <boost/statechart/simple_state.hpp>
-#include <boost/statechart/state.hpp>
-#include <boost/statechart/state_machine.hpp>
-#include <boost/statechart/transition.hpp>
-#include <boost/statechart/event_base.hpp>
-
-#include "osd/PGPeeringEvent.h"
-
-namespace recovery {
-
-struct AdvMap : boost::statechart::event< AdvMap > {
-  // TODO: use foreign_ptr<> once the osdmap cache needs to be shared across
-  //       cores
-  using OSDMapRef = boost::local_shared_ptr<OSDMap>;
-  OSDMapRef osdmap;
-  OSDMapRef last_map;
-  std::vector<int> new_up, new_acting;
-  int up_primary, acting_primary;
-  AdvMap(OSDMapRef osdmap, OSDMapRef last_map,
-        const std::vector<int>& new_up, int up_primary,
-        const std::vector<int>& new_acting, int acting_primary):
-    osdmap(osdmap),
-    last_map(last_map),
-    new_up(new_up),
-    new_acting(new_acting),
-    up_primary(up_primary),
-    acting_primary(acting_primary) {}
-  void print(std::ostream *out) const {
-    *out << "AdvMap";
-  }
-};
-
-struct ActMap : boost::statechart::event< ActMap > {
-  ActMap() : boost::statechart::event< ActMap >() {}
-  void print(std::ostream *out) const {
-    *out << "ActMap";
-  }
-};
-
-struct Activate : boost::statechart::event< Activate > {
-  epoch_t activation_epoch;
-  explicit Activate(epoch_t q) : boost::statechart::event< Activate >(),
-                                activation_epoch(q) {}
-  void print(std::ostream *out) const {
-    *out << "Activate from " << activation_epoch;
-  }
-};
-
-struct Initialize : boost::statechart::event<Initialize> {};
-
-}
diff --git a/src/crimson/osd/recovery_machine.cc b/src/crimson/osd/recovery_machine.cc
deleted file mode 100644 (file)
index ab82cd1..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-#include "recovery_machine.h"
-#include "recovery_state.h"
-#include "pg.h"
-
-namespace recovery
-{
-void Machine::send_notify(pg_shard_t to,
-                          const pg_notify_t& info,
-                          const PastIntervals& pi)
-{
-  state.context.notifies[to.osd].emplace_back(info, pi);
-}
-
-void Machine::send_query(pg_shard_t to,
-                         const pg_query_t& query)
-{
-  spg_t pgid{pg.get_info().pgid.pgid, to.shard};
-  state.context.queries[to.osd].emplace(pgid, query);
-}
-
-recovery::Context* Machine::get_context()
-{
-  return &state.context;
-}
-
-}
diff --git a/src/crimson/osd/recovery_machine.h b/src/crimson/osd/recovery_machine.h
deleted file mode 100644 (file)
index fef37cd..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <boost/statechart/state_machine.hpp>
-
-#include "osd/osd_types.h"
-
-class PG;
-
-namespace recovery {
-
-struct Initial;
-class State;
-class Context;
-
-struct Machine : public boost::statechart::state_machine<Machine,
-                                                        Initial> {
-  Machine(State& state, PG& pg)
-    : state{state},
-      pg{pg}
-  {}
-  void send_notify(pg_shard_t to,
-                  const pg_notify_t& info,
-                  const PastIntervals& pi);
-  void send_query(pg_shard_t to,
-                 const pg_query_t& query);
-  recovery::Context* get_context();
-  State& state;
-  PG& pg;
-};
-}
diff --git a/src/crimson/osd/recovery_state.cc b/src/crimson/osd/recovery_state.cc
deleted file mode 100644 (file)
index b05b431..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-#include "recovery_state.h"
-#include "recovery_states.h"
-#include "messages/MOSDPGLog.h" // MLogRec needs this
-
-namespace recovery {
-
-State::State(PG& pg)
-  : machine{*this, pg}
-{
-    machine.initiate();
-}
-
-Context State::handle_event(const boost::statechart::event_base& evt)
-{
-  machine.process_event(evt);
-  Context pending;
-  std::swap(pending, context);
-  return pending;
-}
-}
diff --git a/src/crimson/osd/recovery_state.h b/src/crimson/osd/recovery_state.h
deleted file mode 100644 (file)
index feb9335..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <vector>
-#include <map>
-
-#include "recovery_machine.h"
-
-class PG;
-
-namespace recovery {
-
-// PeeringMachine::handle_event() could send multiple notifications to a
-// certain peer OSD before it reaches the last state. for better performance,
-// we send them in batch. the pending messages are collected in RecoveryCtx
-// before being dispatched upon returning of handle_event().
-struct Context
-{
-  using osd_id_t = int;
-
-  using notify_t = std::pair<pg_notify_t, PastIntervals>;
-  std::map<osd_id_t, std::vector<notify_t>> notifies;
-
-  using queries_t = std::map<spg_t, pg_query_t>;
-  std::map<osd_id_t, queries_t> queries;
-
-  using infos_t = std::vector<pair<pg_notify_t, PastIntervals>>;
-  std::map<osd_id_t, infos_t> infos;
-};
-
-/// Encapsulates PG recovery process,
-class State {
-public:
-  explicit State(PG& pg);
-  Context handle_event(const boost::statechart::event_base& evt);
-private:
-  friend class Machine;
-  Machine machine;
-  Context context;
-};
-}
diff --git a/src/crimson/osd/recovery_states.cc b/src/crimson/osd/recovery_states.cc
deleted file mode 100644 (file)
index 0d2bd34..0000000
+++ /dev/null
@@ -1,647 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "recovery_states.h"
-
-#include "crimson/common/log.h"
-#include "messages/MOSDPGLog.h"
-#include "osd/OSDMap.h"
-#include "pg.h"
-
-namespace {
-  seastar::logger& logger() {
-    return ceph::get_logger(ceph_subsys_osd);
-  }
-}
-
-namespace recovery {
-
-/*------Crashed-------*/
-Crashed::Crashed(my_context ctx)
-  : my_base(ctx)
-{
-  ceph_abort_msg("we got a bad state machine event");
-}
-
-/*------Initial-------*/
-
-Initial::Initial(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Initial");
-}
-
-boost::statechart::result Initial::react(const MNotifyRec& notify)
-{
-  logger().info("Active <MNotifyRec>");
-  // todo: process info from replica
-  auto& pg = context<Machine>().pg;
-  pg.update_last_peering_reset();
-  return transit<Primary>();
-}
-
-boost::statechart::result Initial::react(const MInfoRec& i)
-{
-  // todo
-  logger().info("<MInfoRec> transitioning from Initial to Stray");
-  post_event(i);
-  return transit<Stray>();
-}
-
-boost::statechart::result Initial::react(const MLogRec& i)
-{
-  // todo
-  logger().info("<MLogRec> transitioning from Initial to Stray");
-  post_event(i);
-  return transit<Stray>();
-}
-
-void Initial::exit()
-{}
-
-/*--------Reset---------*/
-Reset::Reset(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Reset");
-  auto& pg = context<Machine>().pg;
-  pg.update_last_peering_reset();
-}
-
-boost::statechart::result Reset::react(const AdvMap& advmap)
-{
-  logger().info("Reset advmap");
-  auto& pg = context<Machine>().pg;
-  if (pg.should_restart_peering(advmap.up_primary,
-                               advmap.acting_primary,
-                               advmap.new_up,
-                               advmap.new_acting,
-                               advmap.last_map,
-                               advmap.osdmap)) {
-    pg.start_peering_interval(advmap.up_primary,
-                             advmap.acting_primary,
-                             advmap.new_up,
-                             advmap.new_acting,
-                             advmap.last_map);
-  }
-  return discard_event();
-}
-
-boost::statechart::result Reset::react(const ActMap&)
-{
-  auto& pg = context<Machine>().pg;
-  if (pg.should_send_notify()) {
-    context<Machine>().send_notify(pg.get_primary(),
-                                  pg.get_notify(pg.get_osdmap_epoch()),
-                                  pg.get_past_intervals());
-  }
-  return transit<Started>();
-}
-
-void Reset::exit()
-{
-  logger().info("Leaving Reset");
-}
-
-/*------Started-------*/
-Started::Started(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Started");
-}
-
-boost::statechart::result Started::react(const AdvMap& advmap)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("Started <AdvMap>");
-  if (pg.should_restart_peering(advmap.up_primary,
-                               advmap.acting_primary,
-                               advmap.new_up,
-                               advmap.new_acting,
-                               advmap.last_map,
-                               advmap.osdmap)) {
-    logger().info("should_restart_peering, transitioning to Reset");
-    post_event(advmap);
-    return transit<Reset>();
-  }
-  return discard_event();
-}
-
-void Started::exit()
-{
-  logger().info("Leaving Started");
-}
-
-/*-------Start---------*/
-Start::Start(my_context ctx)
-  : my_base(ctx)
-{
-  auto& pg = context<Machine>().pg;
-  if (pg.is_primary()) {
-    logger().info("Start transitioning to Primary");
-    post_event(MakePrimary{});
-  } else { // is_stray
-    logger().info("Start transitioning to Stray");
-    post_event(MakeStray{});
-  }
-}
-
-void Start::exit()
-{
-  logger().info("Leaving Start");
-}
-
-/*---------Primary--------*/
-Primary::Primary(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Primary");
-}
-
-boost::statechart::result Primary::react(const MNotifyRec& notevt)
-{
-  // todo
-  auto& pg = context<Machine>().pg;
-  pg.proc_replica_info(notevt.from,
-                      notevt.notify.info,
-                      notevt.notify.epoch_sent);
-  logger().info("Primary <MNotifyRec> {}", pg);
-  return discard_event();
-}
-
-boost::statechart::result Primary::react(const ActMap&)
-{
-  // todo
-  auto& pg = context<Machine>().pg;
-  logger().info("Primary <ActMap> {}", pg);
-  return discard_event();
-}
-
-void Primary::exit()
-{
-  auto& pg = context<Machine>().pg;
-  pg.clear_primary_state();
-  pg.clear_state(PG_STATE_CREATING);
-  logger().info("Leaving Primary: {}", pg);
-}
-
-/*---------Peering--------*/
-Peering::Peering(my_context ctx)
-  : my_base(ctx)
-{
-  auto& pg = context<Machine>().pg;
-  pg.set_state(PG_STATE_PEERING);
-  logger().info("Entering Peering");
-}
-
-boost::statechart::result Peering::react(const AdvMap& advmap)
-{
-  logger().info("Peering <AdvMap>");
-  context<Machine>().pg.update_need_up_thru(advmap.osdmap.get());
-  return forward_event();
-}
-
-void Peering::exit()
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("Leaving Peering: {}", pg);
-  pg.clear_state(PG_STATE_PEERING);
-}
-
-/*--------GetInfo---------*/
-GetInfo::GetInfo(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering GetInfo");
-  context<Machine>().pg.update_need_up_thru();
-  post_event(GotInfo{});
-}
-
-boost::statechart::result GetInfo::react(const MNotifyRec& infoevt)
-{
-  logger().info("GetInfo <MNotifyRec>");
-  // todo: depends on get_infos()
-  post_event(GotInfo());
-  return discard_event();
-}
-
-void GetInfo::exit()
-{
-  logger().info("Leaving GetInfo");
-  // todo
-}
-
-/*------GetLog------------*/
-GetLog::GetLog(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering GetLog");
-  auto& pg = context<Machine>().pg;
-
-  PG::choose_acting_t adjust_acting;
-  tie(adjust_acting, auth_log_shard) = pg.choose_acting();
-  switch (adjust_acting) {
-  case PG::choose_acting_t::dont_change:
-    break;
-  case PG::choose_acting_t::should_change:
-    // post_event(NeedActingChange());
-    return;
-  case PG::choose_acting_t::pg_incomplete:
-    // post_event(IsIncomplete());
-    return;
-  }
-  // am i the best?
-  if (auth_log_shard == pg.get_whoami()) {
-    post_event(GotLog{});
-    return;
-  } else {
-    // todo: request log from peer
-    return;
-  }
-}
-
-boost::statechart::result GetLog::react(const AdvMap& advmap)
-{
-  // make sure our log source didn't go down.  we need to check
-  // explicitly because it may not be part of the prior set, which
-  // means the Peering state check won't catch it going down.
-  if (!advmap.osdmap->is_up(auth_log_shard.osd)) {
-    logger().info("GetLog: auth_log_shard osd.{} went down",
-                 auth_log_shard.osd);
-    post_event(advmap);
-    return transit<Reset>();
-  }
-
-  // let the Peering state do its checks.
-  return forward_event();
-}
-
-boost::statechart::result GetLog::react(const MLogRec& logevt)
-{
-  assert(!msg);
-  if (logevt.from != auth_log_shard) {
-    logger().info("GetLog: discarding log from non-auth_log_shard osd.{}",
-                 logevt.from);
-    return discard_event();
-  }
-  logger().info("GetLog: received master log from osd.{}",
-               logevt.from);
-  msg = logevt.msg;
-  post_event(GotLog{});
-  return discard_event();
-}
-
-boost::statechart::result GetLog::react(const GotLog&)
-{
-  logger().info("leaving GetLog");
-  return transit<GetMissing>();
-}
-
-void GetLog::exit()
-{}
-
-Recovered::Recovered(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Recovered");
-  if (context<Active>().all_replicas_activated) {
-    logger().info("all_replicas_activated");
-    post_event(GoClean{});
-  }
-}
-
-boost::statechart::result Recovered::react(const AllReplicasActivated&)
-{
-  logger().info("Recovered <AllReplicasActivated>");
-  post_event(GoClean{});
-  return forward_event();
-}
-
-void Recovered::exit()
-{
-  logger().info("Leaving Recovered");
-}
-
-Clean::Clean(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Clean");
-  auto& pg = context<Machine>().pg;
-  pg.maybe_mark_clean();
-}
-
-void Clean::exit()
-{
-  logger().info("Leaving Clean");
-  auto& pg = context<Machine>().pg;
-  pg.clear_state(PG_STATE_CLEAN);
-}
-
-/*---------Active---------*/
-Active::Active(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Activate");
-  auto& pg = context<Machine>().pg;
-  assert(pg.is_primary());
-  pg.activate(pg.get_osdmap_epoch());
-  logger().info("Activate Finished");
-}
-
-boost::statechart::result Active::react(const AdvMap& advmap)
-{
-  auto& pg = context<Machine>().pg;
-  if (pg.should_restart_peering(advmap.up_primary,
-                               advmap.acting_primary,
-                               advmap.new_up,
-                               advmap.new_acting,
-                               advmap.last_map,
-                               advmap.osdmap)) {
-    logger().info("Active advmap interval change, fast return");
-    return forward_event();
-  }
-  logger().info("Active advmap");
-  return forward_event();
-}
-
-boost::statechart::result Active::react(const ActMap&)
-{
-  return forward_event();
-}
-
-boost::statechart::result Active::react(const MNotifyRec& notevt)
-{
-  logger().info("Active <MNotifyRec>");
-  auto& pg = context<Machine>().pg;
-  pg.proc_replica_info(notevt.from,
-                      notevt.notify.info,
-                      notevt.notify.epoch_sent);
-  return discard_event();
-}
-
-boost::statechart::result Active::react(const MInfoRec& infoevt)
-{
-  logger().info("Active <MInfoRec>");
-  auto& pg = context<Machine>().pg;
-  assert(pg.is_primary());
-
-  if (pg.is_last_activated_peer(infoevt.from)) {
-    post_event(AllReplicasActivated{});
-  }
-  return discard_event();
-}
-
-boost::statechart::result Active::react(const MLogRec& logevt)
-{
-  logger().info("Active <MLogRec>");
-  auto& pg = context<Machine>().pg;
-  pg.proc_replica_log(logevt.from,
-                     logevt.msg->info,
-                     logevt.msg->log,
-                     logevt.msg->missing);
-  // todo
-  return discard_event();
-}
-
-boost::statechart::result Active::react(const AllReplicasActivated &evt)
-{
-  logger().info("Active <AllReplicasActivated>");
-  all_replicas_activated = true;
-  auto& pg = context<Machine>().pg;
-  pg.clear_state(PG_STATE_ACTIVATING);
-  pg.clear_state(PG_STATE_CREATING);
-  pg.on_activated();
-  pg.share_pg_info();
-  post_event(AllReplicasRecovered{});
-  return discard_event();
-}
-
-void Active::exit()
-{
-  auto& pg = context<Machine>().pg;
-  pg.clear_state(PG_STATE_ACTIVATING);
-  pg.clear_state(PG_STATE_DEGRADED);
-  pg.clear_state(PG_STATE_UNDERSIZED);
-
-  logger().info("Leaving Active");
-}
-
-/*------Activating--------*/
-Activating::Activating(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering Activating");
-}
-
-void Activating::exit()
-{
-  logger().info("Leaving Activating");
-}
-
-/*------ReplicaActive-----*/
-ReplicaActive::ReplicaActive(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering ReplicaActive");
-}
-
-
-boost::statechart::result ReplicaActive::react(const Activate& actevt)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("In ReplicaActive, about to call activate");
-  pg.activate(actevt.activation_epoch);
-  logger().info("Activate Finished");
-  return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MInfoRec& infoevt)
-{
-  return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MLogRec& logevt)
-{
-  return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const ActMap&)
-{
-  auto& pg = context<Machine>().pg;
-  if (pg.should_send_notify()) {
-    context<Machine>().send_notify(pg.get_primary(),
-                                  pg.get_notify(pg.get_osdmap_epoch()),
-                                  pg.get_past_intervals());
-  };
-  return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MQuery& query)
-{
-  auto& pg = context<Machine>().pg;
-  context<Machine>().send_notify(query.from,
-                                pg.get_notify(query.query_epoch),
-                                pg.get_past_intervals());
-  return discard_event();
-}
-
-void ReplicaActive::exit()
-{}
-
-/*---RepNotRecovering----*/
-RepNotRecovering::RepNotRecovering(my_context ctx)
-  : my_base(ctx)
-{}
-
-void RepNotRecovering::exit()
-{}
-
-/*-------Stray---*/
-Stray::Stray(my_context ctx)
-  : my_base(ctx)
-{
-  auto& pg = context<Machine>().pg;
-  assert(!pg.is_primary());
-  logger().info("{}, Entering Stray", pg);
-}
-
-boost::statechart::result Stray::react(const MLogRec& logevt)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("{} Stray <MLogRec>", pg);
-  MOSDPGLog* msg = logevt.msg.get();
-  logger().info("got info+log from osd.{} {} {}",
-                logevt.from, msg->info, msg->log);
-  if (msg->info.last_backfill == hobject_t()) {
-    // todo: restart backfill
-  } else {
-    // todo: merge log
-  }
-  post_event(Activate{logevt.msg->info.last_epoch_started});
-  return transit<ReplicaActive>();
-}
-
-boost::statechart::result Stray::react(const MInfoRec& infoevt)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("{} Stray <MInfoRec>", pg);
-  logger().info("got info from osd.{} {}",
-                infoevt.from, infoevt.info);
-  // todo
-  post_event(Activate{infoevt.info.last_epoch_started});
-  return transit<ReplicaActive>();
-}
-
-boost::statechart::result Stray::react(const MQuery& query)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("{} Stray <MQuery>", pg);
-  pg.reply_pg_query(query, context<Machine>().get_context());
-  return discard_event();
-}
-
-boost::statechart::result Stray::react(const ActMap&)
-{
-  auto& pg = context<Machine>().pg;
-  logger().info("{} Stray <ActMap>", pg);
-  if (pg.should_send_notify()) {
-    context<Machine>().send_notify(pg.get_primary(),
-                                  pg.get_notify(pg.get_osdmap_epoch()),
-                                  pg.get_past_intervals());
-  }
-  return discard_event();
-}
-
-void Stray::exit()
-{
-  logger().info("Leaving Stray");
-}
-
-/*------Down--------*/
-Down::Down(my_context ctx)
-  : my_base(ctx)
-{}
-
-void Down::exit()
-{
-}
-
-boost::statechart::result Down::react(const MNotifyRec& infoevt)
-{
-  // todo
-  return discard_event();
-}
-
-/*------GetMissing--------*/
-GetMissing::GetMissing(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering GetMissing");
-  auto& pg = context<Machine>().pg;
-  if (pg.get_need_up_thru() != 0) {
-    logger().info("still need up_thru update before going active");
-    post_event(NeedUpThru{});
-  } else {
-    // all good!
-    post_event(Activate{pg.get_osdmap_epoch()});
-  }
-}
-
-boost::statechart::result GetMissing::react(const MLogRec& logevt)
-{
-  logger().info("GetMissing <MLogRec>");
-  auto& pg = context<Machine>().pg;
-  pg.proc_replica_log(logevt.from,
-                     logevt.msg->info,
-                     logevt.msg->log,
-                     logevt.msg->missing);
-  if (pg.get_need_up_thru() != 0) {
-    logger().info(" still need up_thru update before going active");
-    post_event(NeedUpThru{});
-  } else {
-    logger().info("Got last missing, don't need missing posting Activate");
-    post_event(Activate{pg.get_osdmap_epoch()});
-  }
-  return discard_event();
-}
-
-void GetMissing::exit()
-{
-  logger().info("Leaving GetMissing");
-}
-
-/*------WaitUpThru--------*/
-WaitUpThru::WaitUpThru(my_context ctx)
-  : my_base(ctx)
-{
-  logger().info("Entering WaitUpThru");
-}
-
-boost::statechart::result WaitUpThru::react(const ActMap& am)
-{
-  logger().info("WaitUpThru <ActMap>");
-  auto& pg = context<Machine>().pg;
-  if (!pg.get_need_up_thru()) {
-    logger().info("WaitUpThru: no need up thru!");
-    post_event(Activate{pg.get_osdmap_epoch()});
-  }
-  return forward_event();
-}
-
-boost::statechart::result WaitUpThru::react(const MLogRec& logevt)
-{
-  logger().info("WaitUpThru: <MLogRec>");
-  auto& pg = context<Machine>().pg;
-  pg.proc_replica_log(logevt.from,
-                     logevt.msg->info,
-                     logevt.msg->log,
-                     logevt.msg->missing);
-  return discard_event();
-}
-
-void WaitUpThru::exit()
-{
-  logger().info("Leaving WaitUpThru");
-}
-
-}
diff --git a/src/crimson/osd/recovery_states.h b/src/crimson/osd/recovery_states.h
deleted file mode 100644 (file)
index b363f9f..0000000
+++ /dev/null
@@ -1,335 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <boost/statechart/custom_reaction.hpp>
-#include <boost/statechart/event.hpp>
-#include <boost/statechart/event_base.hpp>
-#include <boost/statechart/simple_state.hpp>
-#include <boost/statechart/state.hpp>
-#include <boost/statechart/transition.hpp>
-
-#include "recovery_machine.h"
-#include "recovery_events.h"
-
-// Initial
-// Reset
-// Start
-//   Started
-//     Primary
-//       WaitActingChange
-//       Peering
-//         GetInfo
-//         GetLog
-//         GetMissing
-//         WaitUpThru
-//         Incomplete
-//       Active
-//         Activating
-//         Clean
-//         Recovered
-//         Backfilling
-//         WaitRemoteBackfillReserved
-//         WaitLocalBackfillReserved
-//         NotBackfilling
-//         NotRecovering
-//         Recovering
-//         WaitRemoteRecoveryReserved
-//         WaitLocalRecoveryReserved
-//     ReplicaActive
-//       RepNotRecovering
-//       RepRecovering
-//       RepWaitBackfillReserved
-//       RepWaitRecoveryReserved
-//     Stray
-//     ToDelete
-//       WaitDeleteReserved
-//       Deleting
-// Crashed
-
-namespace recovery {
-
-struct Crashed : boost::statechart::state<Crashed, Machine> {
-  explicit Crashed(my_context ctx);
-};
-
-struct Reset;
-
-struct Initial : boost::statechart::state<Initial, Machine> {
-  explicit Initial(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::transition<Initialize, Reset>,
-    boost::statechart::custom_reaction<NullEvt>,
-    boost::statechart::transition<boost::statechart::event_base, Crashed>
-    >;
-
-  boost::statechart::result react(const MNotifyRec&);
-  boost::statechart::result react(const MInfoRec&);
-  boost::statechart::result react(const MLogRec&);
-  boost::statechart::result react(const boost::statechart::event_base&) {
-    return discard_event();
-  }
-};
-
-struct Reset : boost::statechart::state<Reset, Machine> {
-  explicit Reset(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<AdvMap>,
-    boost::statechart::custom_reaction<ActMap>,
-    boost::statechart::custom_reaction<NullEvt>,
-    boost::statechart::transition<boost::statechart::event_base, Crashed>
-    >;
-  boost::statechart::result react(const AdvMap&);
-  boost::statechart::result react(const ActMap&);
-  boost::statechart::result react(const boost::statechart::event_base&) {
-    return discard_event();
-  }
-};
-
-struct Start;
-
-struct Started : boost::statechart::state<Started, Machine, Start> {
-  explicit Started(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<AdvMap>,
-    // ignored
-    boost::statechart::custom_reaction<NullEvt>,
-    // crash
-    boost::statechart::transition<boost::statechart::event_base, Crashed>
-    >;
-  boost::statechart::result react(const AdvMap&);
-  boost::statechart::result react(const boost::statechart::event_base&) {
-    return discard_event();
-  }
-};
-
-struct Primary;
-struct Stray;
-
-struct MakePrimary : boost::statechart::event<MakePrimary> {};
-struct MakeStray : boost::statechart::event<MakeStray> {};
-
-struct Start : boost::statechart::state<Start, Started> {
-  explicit Start(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::transition<MakePrimary, Primary>,
-    boost::statechart::transition<MakeStray, Stray>
-    >;
-};
-
-struct Peering;
-struct WaitActingChange;
-
-struct Primary : boost::statechart::state<Primary, Started, Peering> {
-  explicit Primary(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<ActMap>,
-    boost::statechart::custom_reaction<MNotifyRec>
-    >;
-  boost::statechart::result react(const ActMap&);
-  boost::statechart::result react(const MNotifyRec&);
-};
-
-struct GetInfo;
-struct Active;
-
-struct Peering : boost::statechart::state<Peering, Primary, GetInfo> {
-  PastIntervals::PriorSet prior_set;
-  /// need osd_find_best_info_ignore_history_les
-  bool history_les_bound = false;
-
-  explicit Peering(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::transition<Activate, Active>,
-    boost::statechart::custom_reaction<AdvMap>
-    >;
-  boost::statechart::result react(const AdvMap &advmap);
-};
-
-struct Activating;
-
-struct AllReplicasActivated : boost::statechart::event<AllReplicasActivated> {};
-
-struct Active : boost::statechart::state<Active, Primary, Activating> {
-  explicit Active(my_context ctx);
-  void exit();
-
-  bool all_replicas_activated = false;
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction< ActMap >,
-    boost::statechart::custom_reaction< AdvMap >,
-    boost::statechart::custom_reaction< MInfoRec >,
-    boost::statechart::custom_reaction< MNotifyRec >,
-    boost::statechart::custom_reaction< MLogRec >,
-    boost::statechart::custom_reaction< AllReplicasActivated >
-    >;
-  boost::statechart::result react(const ActMap&);
-  boost::statechart::result react(const AdvMap&);
-  boost::statechart::result react(const MInfoRec& infoevt);
-  boost::statechart::result react(const MNotifyRec& notevt);
-  boost::statechart::result react(const MLogRec& logevt);
-  boost::statechart::result react(const AllReplicasActivated&);
-};
-
-struct GoClean : boost::statechart::event<GoClean> {};
-
-struct Clean : boost::statechart::state<Clean, Active> {
-  explicit Clean(my_context ctx);
-  void exit();
-  boost::statechart::result react(const boost::statechart::event_base&) {
-    return discard_event();
-  }
-};
-
-struct Recovered : boost::statechart::state<Recovered, Active> {
-  using reactions = boost::mpl::list<
-    boost::statechart::transition<GoClean, Clean>,
-    boost::statechart::custom_reaction<AllReplicasActivated>
-    >;
-  explicit Recovered(my_context ctx);
-  void exit();
-  boost::statechart::result react(const AllReplicasActivated&);
-};
-
-struct AllReplicasRecovered : boost::statechart::event<AllReplicasRecovered>
-{};
-
-struct Activating : boost::statechart::state<Activating, Active> {
-  using reactions = boost::mpl::list <
-    boost::statechart::transition< AllReplicasRecovered, Recovered >
-    >;
-  explicit Activating(my_context ctx);
-  void exit();
-};
-
-struct RepNotRecovering;
-
-struct ReplicaActive : boost::statechart::state<ReplicaActive, Started, RepNotRecovering> {
-  explicit ReplicaActive(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<ActMap>,
-    boost::statechart::custom_reaction<MQuery>,
-    boost::statechart::custom_reaction<MInfoRec>,
-    boost::statechart::custom_reaction<MLogRec>,
-    boost::statechart::custom_reaction<Activate>
-    >;
-  boost::statechart::result react(const MInfoRec& infoevt);
-  boost::statechart::result react(const MLogRec& logevt);
-  boost::statechart::result react(const ActMap&);
-  boost::statechart::result react(const MQuery&);
-  boost::statechart::result react(const Activate&);
-};
-
-struct RepNotRecovering : boost::statechart::state<RepNotRecovering, ReplicaActive> {
-  explicit RepNotRecovering(my_context ctx);
-  void exit();
-};
-
-struct Stray : boost::statechart::state<Stray, Started> {
-  explicit Stray(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<MQuery>,
-    boost::statechart::custom_reaction<MLogRec>,
-    boost::statechart::custom_reaction<MInfoRec>,
-    boost::statechart::custom_reaction<ActMap>
-    >;
-  boost::statechart::result react(const MQuery& query);
-  boost::statechart::result react(const MLogRec& logevt);
-  boost::statechart::result react(const MInfoRec& infoevt);
-  boost::statechart::result react(const ActMap&);
-};
-
-struct GetLog;
-struct Down;
-
-struct GotInfo : boost::statechart::event<GotInfo> {};
-struct IsDown : boost::statechart::event<IsDown> {};
-
-struct GetInfo : boost::statechart::state<GetInfo, Peering> {
-  std::set<pg_shard_t> peer_info_requested;
-
-  explicit GetInfo(my_context ctx);
-  void exit();
-  void get_infos();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::transition<GotInfo, GetLog>,
-    boost::statechart::custom_reaction<MNotifyRec>,
-    boost::statechart::transition<IsDown, Down>
-    >;
-  boost::statechart::result react(const MNotifyRec& infoevt);
-};
-
-struct GotLog : boost::statechart::event<GotLog> {};
-
-struct GetLog : boost::statechart::state<GetLog, Peering> {
-  pg_shard_t auth_log_shard;
-  boost::intrusive_ptr<MOSDPGLog> msg;
-
-  explicit GetLog(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<MLogRec>,
-    boost::statechart::custom_reaction<GotLog>,
-    boost::statechart::custom_reaction<AdvMap>
-    >;
-  boost::statechart::result react(const AdvMap&);
-  boost::statechart::result react(const MLogRec& logevt);
-  boost::statechart::result react(const GotLog&);
-};
-
-struct NeedUpThru : boost::statechart::event<NeedUpThru> {};
-struct WaitUpThru;
-
-struct GetMissing : boost::statechart::state<GetMissing, Peering> {
-  explicit GetMissing(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<MLogRec>,
-    boost::statechart::transition<NeedUpThru, WaitUpThru>
-    >;
-  boost::statechart::result react(const MLogRec& logevt);
-};
-
-struct WaitUpThru : boost::statechart::state<WaitUpThru, Peering> {
-  explicit WaitUpThru(my_context ctx);
-  void exit();
-
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<ActMap>,
-    boost::statechart::custom_reaction<MLogRec>
-    >;
-  boost::statechart::result react(const ActMap& am);
-  boost::statechart::result react(const MLogRec& logrec);
-};
-
-struct Down : boost::statechart::state<Down, Peering> {
-  explicit Down(my_context ctx);
-  using reactions = boost::mpl::list <
-    boost::statechart::custom_reaction<MNotifyRec>
-    >;
-  boost::statechart::result react(const MNotifyRec& infoevt);
-  void exit();
-};
-
-}
diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc
new file mode 100644 (file)
index 0000000..e36c03c
--- /dev/null
@@ -0,0 +1,230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/shard_services.h"
+
+#include "osd/osd_perf_counters.h"
+#include "osd/PeeringState.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/mgr/client.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/net/Connection.h"
+#include "crimson/os/cyan_store.h"
+#include "messages/MOSDPGTemp.h"
+#include "messages/MOSDPGCreated.h"
+#include "messages/MOSDPGNotify.h"
+#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGQuery.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+ShardServices::ShardServices(
+  ceph::net::Messenger &cluster_msgr,
+  ceph::net::Messenger &public_msgr,
+  ceph::mon::Client &monc,
+  ceph::mgr::Client &mgrc,
+  ceph::os::CyanStore &store)
+    : cluster_msgr(cluster_msgr),
+      public_msgr(public_msgr),
+      monc(monc),
+      mgrc(mgrc),
+      store(store) {
+  perf = build_osd_logger(&cct);
+  cct.get_perfcounters_collection()->add(perf);
+
+  recoverystate_perf = build_recoverystate_perf(&cct);
+  cct.get_perfcounters_collection()->add(recoverystate_perf);
+}
+
+seastar::future<> ShardServices::send_to_osd(
+  int peer, Ref<Message> m, epoch_t from_epoch) {
+  if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
+    return seastar::now();
+  } else {
+    return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
+      CEPH_ENTITY_TYPE_OSD)
+      .then([m, this] (auto xconn) {
+             return (*xconn)->send(m);
+           });
+  }
+}
+
+seastar::future<> ShardServices::dispatch_context_transaction(
+  ceph::os::CollectionRef col, PeeringCtx &ctx) {
+  auto ret = store.do_transaction(
+    col,
+    std::move(ctx.transaction));
+  ctx.reset_transaction();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context_messages(
+  PeeringCtx &ctx)
+{
+  auto ret = seastar::when_all_succeed(
+    seastar::parallel_for_each(std::move(ctx.notify_list),
+      [this](auto& osd_notifies) {
+       auto& [peer, notifies] = osd_notifies;
+       auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
+                                           std::move(notifies));
+       logger().debug("dispatch_context_messages sending notify to {}", peer);
+       return send_to_osd(peer, m, osdmap->get_epoch());
+      }),
+    seastar::parallel_for_each(std::move(ctx.query_map),
+      [this](auto& osd_queries) {
+       auto& [peer, queries] = osd_queries;
+       auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
+                                          std::move(queries));
+       logger().debug("dispatch_context_messages sending query to {}", peer);
+       return send_to_osd(peer, m, osdmap->get_epoch());
+      }),
+    seastar::parallel_for_each(std::move(ctx.info_map),
+      [this](auto& osd_infos) {
+       auto& [peer, infos] = osd_infos;
+       auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
+                                         std::move(infos));
+       logger().debug("dispatch_context_messages sending info to {}", peer);
+       return send_to_osd(peer, m, osdmap->get_epoch());
+      }));
+  ctx.notify_list.clear();
+  ctx.query_map.clear();
+  ctx.info_map.clear();
+  return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context(
+  ceph::os::CollectionRef col,
+  PeeringCtx &&ctx)
+{
+  ceph_assert(col || ctx.transaction.empty());
+  return seastar::do_with(
+    PeeringCtx{ctx},
+    [this, col](auto& todo) {
+      return seastar::when_all_succeed(
+       dispatch_context_messages(todo),
+       col ? dispatch_context_transaction(col, todo) : seastar::now());
+    });
+}
+
+void ShardServices::queue_want_pg_temp(pg_t pgid,
+                                   const vector<int>& want,
+                                   bool forced)
+{
+  auto p = pg_temp_pending.find(pgid);
+  if (p == pg_temp_pending.end() ||
+      p->second.acting != want ||
+      forced) {
+    pg_temp_wanted[pgid] = {want, forced};
+  }
+}
+
+void ShardServices::remove_want_pg_temp(pg_t pgid)
+{
+  pg_temp_wanted.erase(pgid);
+  pg_temp_pending.erase(pgid);
+}
+
+void ShardServices::_sent_pg_temp()
+{
+#ifdef HAVE_STDLIB_MAP_SPLICING
+  pg_temp_pending.merge(pg_temp_wanted);
+#else
+  pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+                        make_move_iterator(end(pg_temp_wanted)));
+#endif
+  pg_temp_wanted.clear();
+}
+
+void ShardServices::requeue_pg_temp()
+{
+  unsigned old_wanted = pg_temp_wanted.size();
+  unsigned old_pending = pg_temp_pending.size();
+  _sent_pg_temp();
+  pg_temp_wanted.swap(pg_temp_pending);
+  logger().debug(
+    "{}: {} + {} -> {}",
+    __func__ ,
+    old_wanted,
+    old_pending,
+    pg_temp_wanted.size());
+}
+
+std::ostream& operator<<(
+  std::ostream& out,
+  const ShardServices::pg_temp_t& pg_temp)
+{
+  out << pg_temp.acting;
+  if (pg_temp.forced) {
+    out << " (forced)";
+  }
+  return out;
+}
+
+void ShardServices::send_pg_temp()
+{
+  if (pg_temp_wanted.empty())
+    return;
+  logger().debug("{}: {}", __func__, pg_temp_wanted);
+  boost::intrusive_ptr<MOSDPGTemp> ms[2] = {nullptr, nullptr};
+  for (auto& [pgid, pg_temp] : pg_temp_wanted) {
+    auto m = ms[pg_temp.forced];
+    if (!m) {
+      m = make_message<MOSDPGTemp>(osdmap->get_epoch());
+      m->forced = pg_temp.forced;
+    }
+    m->pg_temp.emplace(pgid, pg_temp.acting);
+  }
+  for (auto &m : ms) {
+    if (m) {
+      monc.send_message(m);
+    }
+  }
+  _sent_pg_temp();
+}
+
+seastar::future<> ShardServices::send_pg_created(pg_t pgid)
+{
+  logger().debug(__func__);
+  auto o = get_osdmap();
+  ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
+  pg_created.insert(pgid);
+  return monc.send_message(new MOSDPGCreated(pgid));
+}
+
+seastar::future<> ShardServices::send_pg_created()
+{
+  logger().debug(__func__);
+  auto o = get_osdmap();
+  ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
+  return seastar::parallel_for_each(pg_created,
+    [this](auto &pgid) {
+      return monc.send_message(new MOSDPGCreated(pgid));
+    });
+}
+
+void ShardServices::prune_pg_created()
+{
+  logger().debug(__func__);
+  auto o = get_osdmap();
+  auto i = pg_created.begin();
+  while (i != pg_created.end()) {
+    auto p = o->get_pg_pool(i->pool());
+    if (!p || !p->has_flag(pg_pool_t::FLAG_CREATING)) {
+      logger().debug("{} pruning {}", __func__, *i);
+      i = pg_created.erase(i);
+    } else {
+      logger().debug(" keeping {}", __func__, *i);
+      ++i;
+    }
+  }
+}
+
+};
diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h
new file mode 100644 (file)
index 0000000..719c27e
--- /dev/null
@@ -0,0 +1,138 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+
+#include "msg/MessageRef.h"
+#include "crimson/os/cyan_collection.h"
+
+namespace ceph::net {
+  class Messenger;
+}
+
+namespace ceph::mgr {
+  class Client;
+}
+
+namespace ceph::mon {
+  class Client;
+}
+
+namespace ceph::os {
+  class CyanStore;
+}
+
+class PerfCounters;
+class OSDMap;
+class PeeringCtx;
+
+namespace ceph::osd {
+
+/**
+ * Represents services available to each PG
+ */
+class ShardServices {
+  using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+  ceph::net::Messenger &cluster_msgr;
+  ceph::net::Messenger &public_msgr;
+  ceph::mon::Client &monc;
+  ceph::mgr::Client &mgrc;
+  ceph::os::CyanStore &store;
+
+  CephContext cct;
+
+  PerfCounters *perf = nullptr;
+  PerfCounters *recoverystate_perf = nullptr;
+
+public:
+  ShardServices(
+    ceph::net::Messenger &cluster_msgr,
+    ceph::net::Messenger &public_msgr,
+    ceph::mon::Client &monc,
+    ceph::mgr::Client &mgrc,
+    ceph::os::CyanStore &store);
+
+  seastar::future<> send_to_osd(
+    int peer,
+    MessageRef m,
+    epoch_t from_epoch);
+
+  ceph::os::CyanStore &get_store() {
+    return store;
+  }
+
+  CephContext *get_cct() {
+    return &cct;
+  }
+
+  // Loggers
+  PerfCounters &get_recoverystate_perf_logger() {
+    return *recoverystate_perf;
+  }
+  PerfCounters &get_perf_logger() {
+    return *perf;
+  }
+
+  /// Dispatch and reset ctx transaction
+  seastar::future<> dispatch_context_transaction(
+    ceph::os::CollectionRef col, PeeringCtx &ctx);
+
+  /// Dispatch and reset ctx messages
+  seastar::future<> dispatch_context_messages(
+    PeeringCtx &ctx);
+
+  /// Dispatch ctx and dispose of context
+  seastar::future<> dispatch_context(
+    ceph::os::CollectionRef col,
+    PeeringCtx &&ctx);
+
+  /// Dispatch ctx and dispose of ctx, transaction must be empty
+  seastar::future<> dispatch_context(
+    PeeringCtx &&ctx) {
+    return dispatch_context({}, std::move(ctx));
+  }
+
+  // PG Temp State
+private:
+  // TODO: hook into map processing and some kind of heartbeat/peering
+  // message processing
+  struct pg_temp_t {
+    std::vector<int> acting;
+    bool forced = false;
+  };
+  map<pg_t, pg_temp_t> pg_temp_wanted;
+  map<pg_t, pg_temp_t> pg_temp_pending;
+  void _sent_pg_temp();
+  friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
+public:
+  void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
+                         bool forced = false);
+  void remove_want_pg_temp(pg_t pgid);
+  void requeue_pg_temp();
+  void send_pg_temp();
+
+  // Shard-local OSDMap
+private:
+  cached_map_t osdmap;
+public:
+  void update_map(cached_map_t new_osdmap) {
+    osdmap = std::move(new_osdmap);
+  }
+  cached_map_t &get_osdmap() {
+    return osdmap;
+  }
+
+  // PG Created State
+private:
+  set<pg_t> pg_created;
+public:
+  seastar::future<> send_pg_created(pg_t pgid);
+  seastar::future<> send_pg_created();
+  void prune_pg_created();
+};
+
+
+}