]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/: restructure client and peering operation handling
authorSamuel Just <sjust@redhat.com>
Tue, 11 Jun 2019 00:31:20 +0000 (17:31 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 20 Jun 2019 20:51:25 +0000 (13:51 -0700)
This patch:
* Breaks pg map handling out into a separate module (pg_map.*)
* Breaks osdmap waiting into a separate module
  - Ops actually need to wait twice (potentially): once at
    the osd and a second time at the pg.  The same structure
    is used for both.
* Op ordering is enforced via OrderedExclusivePipelineStages
  defined in osd_operations.h.
  - Peering and client ops each define a pipeline section
    centered on the connection as well as one centered on the
    pg.

Signed-off-by: Samuel Just <sjust@redhat.com>
18 files changed:
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_connection_priv.h [new file with mode: 0644]
src/crimson/osd/osd_operations/client_request.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/client_request.h [new file with mode: 0644]
src/crimson/osd/osd_operations/compound_peering_request.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/compound_peering_request.h [new file with mode: 0644]
src/crimson/osd/osd_operations/peering_event.cc [new file with mode: 0644]
src/crimson/osd/osd_operations/peering_event.h [new file with mode: 0644]
src/crimson/osd/osdmap_gate.cc [new file with mode: 0644]
src/crimson/osd/osdmap_gate.h [new file with mode: 0644]
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_map.cc [new file with mode: 0644]
src/crimson/osd/pg_map.h [new file with mode: 0644]
src/crimson/osd/shard_services.cc
src/crimson/osd/shard_services.h

index 77ac3858b69e98090d978188282ce6dfee09832d..0d3da12a113fa6753190c984f78c5424f9a76630 100644 (file)
@@ -11,6 +11,11 @@ add_executable(crimson-osd
   replicated_backend.cc
   shard_services.cc
   osd_operation.cc
+  osd_operations/client_request.cc
+  osd_operations/peering_event.cc
+  osd_operations/compound_peering_request.cc
+  osdmap_gate.cc
+  pg_map.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
index 707abacfc25208be17881b30a479a3e13b67ac29..ab3a4fc7b5683164bdd57d6ca350a550c91932f2 100644 (file)
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDMap.h"
 #include "messages/MOSDOp.h"
-#include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGQuery.h"
 #include "messages/MPGStats.h"
-#include "messages/MOSDPGCreate2.h"
 
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Connection.h"
@@ -36,6 +32,9 @@
 #include "crimson/osd/pg_meta.h"
 #include "osd/PGPeeringEvent.h"
 #include "osd/PeeringState.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/client_request.h"
 
 namespace {
   seastar::logger& logger() {
@@ -66,7 +65,8 @@ OSD::OSD(int id, uint32_t nonce,
     store{ceph::os::FuturizedStore::create(
       local_conf().get_val<std::string>("osd_objectstore"),
       local_conf().get_val<std::string>("osd_data"))},
-    shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store}
+    shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store},
+    osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
 {
   osdmaps[0] = boost::make_local_shared<OSDMap>();
   for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
@@ -193,6 +193,7 @@ seastar::future<> OSD::start()
     return get_map(superblock.current_epoch);
   }).then([this](cached_map_t&& map) {
     shard_services.update_map(osdmap);
+    osdmap_gate.got_map(map->get_epoch());
     osdmap = std::move(map);
     return load_pgs();
   }).then([this] {
@@ -264,9 +265,9 @@ seastar::future<> OSD::_preboot(version_t oldest, version_t newest)
   }
   // get all the latest maps
   if (osdmap->get_epoch() + 1 >= oldest) {
-    return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+    return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
   } else {
-    return osdmap_subscribe(oldest - 1, true);
+    return shard_services.osdmap_subscribe(oldest - 1, true);
   }
 }
 
@@ -328,7 +329,7 @@ seastar::future<> OSD::load_pgs()
       if (coll.is_pg(&pgid)) {
         return load_pg(pgid).then([pgid, this](auto&& pg) {
           logger().info("load_pgs: loaded {}", pgid);
-          pgs.emplace(pgid, std::move(pg));
+          pg_map.pg_loaded(pgid, std::move(pg));
           return seastar::now();
         });
       } else if (coll.is_temp(&pgid)) {
@@ -401,16 +402,17 @@ seastar::future<> OSD::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
     return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
   case CEPH_MSG_OSD_OP:
     return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+  case MSG_OSD_PG_CREATE2:
   case MSG_OSD_PG_NOTIFY:
-    return handle_pg_notify(conn, boost::static_pointer_cast<MOSDPGNotify>(m));
   case MSG_OSD_PG_INFO:
-    return handle_pg_info(conn, boost::static_pointer_cast<MOSDPGInfo>(m));
   case MSG_OSD_PG_QUERY:
-    return handle_pg_query(conn, boost::static_pointer_cast<MOSDPGQuery>(m));
+    shard_services.start_operation<CompoundPeeringRequest>(
+      *this,
+      conn->get_shared(),
+      m);
+    return seastar::now();
   case MSG_OSD_PG_LOG:
     return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
-  case MSG_OSD_PG_CREATE2:
-    return handle_pg_create(conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
   default:
     logger().info("{} unhandled message {}", __func__, *m);
     return seastar::now();
@@ -452,7 +454,7 @@ MessageRef OSD::get_stats()
   // MPGStats::had_map_for is not used since PGMonitor was removed
   auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
 
-  for (auto [pgid, pg] : pgs) {
+  for (auto [pgid, pg] : pg_map.get_pgs()) {
     if (pg->is_primary()) {
       auto stats = pg->get_stats();
       // todo: update reported_epoch,reported_seq,last_fresh
@@ -543,17 +545,6 @@ seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
   });
 }
 
-seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
-{
-  logger().info("{}({})", __func__, epoch);
-  if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
-      force_request) {
-    return monc->renew_subs();
-  } else {
-    return seastar::now();
-  }
-}
-
 bool OSD::require_mon_peer(ceph::net::Connection *conn, Ref<Message> m)
 {
   if (!conn->peer_is_mon()) {
@@ -651,7 +642,8 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
          pg->handle_activate_map(rctx);
 
          logger().info("{} new pg {}", __func__, *pg);
-         pgs.emplace(info->pgid, pg);
+         pg_map.pg_created(info->pgid, pg);
+
          return seastar::when_all_succeed(
            advance_pg_to(pg, osdmap->get_epoch()),
            pg->get_need_up_thru() ? _send_alive() : seastar::now(),
@@ -663,59 +655,6 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
     });
 }
 
-seastar::future<> OSD::handle_pg_create(
-  ceph::net::Connection* conn,
-  Ref<MOSDPGCreate2> m)
-{
-  logger().info("{}: {} from {}", __func__, *m, m->get_source());
-  if (!require_mon_peer(conn, m)) {
-    return seastar::now();
-  }
-  return handle_batch_pg_message(
-    m->pgs,
-    [this, conn, m](auto p)
-    -> std::optional<std::tuple<spg_t, std::unique_ptr<PGPeeringEvent>>> {
-      const spg_t &pgid = p.first;
-      const auto &[created, created_stamp] = p.second;
-
-      auto q = m->pg_extra.find(pgid);
-      ceph_assert(q != m->pg_extra.end());
-      logger().debug(
-       "{} {} e{} @{} history {} pi {}",
-       __func__,
-       pgid,
-       created,
-       created_stamp,
-       q->second.first,
-       q->second.second);
-      if (!q->second.second.empty() &&
-         m->epoch < q->second.second.get_bounds().second) {
-       logger().error(
-         "got pg_create on {} epoch {} unmatched past_intervals (history {})",
-         pgid,
-         m->epoch,
-         q->second.second,
-         q->second.first);
-       return std::nullopt;
-      } else {
-       return std::make_optional(
-         std::make_tuple(
-           pgid,
-           std::make_unique<PGPeeringEvent>(
-             m->epoch,
-             m->epoch,
-             NullEvt(),
-             true,
-             new PGCreateInfo(
-               pgid,
-               m->epoch,
-               q->second.first,
-               q->second.second,
-               true))));
-      }
-    });
-}
-
 seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
                                       Ref<MOSDMap> m)
 {
@@ -745,14 +684,14 @@ seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
     logger().info("handle_osd_map message skips epochs {}..{}",
                   start, first - 1);
     if (m->oldest_map <= start) {
-      return osdmap_subscribe(start, false);
+      return shard_services.osdmap_subscribe(start, false);
     }
     // always try to get the full range of maps--as many as we can.  this
     //  1- is good to have
     //  2- is at present the only way to ensure that we get a *full* map as
     //     the first map!
     if (m->oldest_map < first) {
-      return osdmap_subscribe(m->oldest_map - 1, true);
+      return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
     }
     skip_maps = true;
     start = first;
@@ -851,19 +790,11 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
 seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn,
                                      Ref<MOSDOp> m)
 {
-  return wait_for_map(m->get_map_epoch()).then([=](epoch_t epoch) {
-    if (auto found = pgs.find(m->get_spg()); found != pgs.end()) {
-      return found->second->handle_op(conn, std::move(m));
-    } else if (osdmap->is_up_acting_osd_shard(m->get_spg(), whoami)) {
-      logger().info("no pg, should exist e{}, will wait", epoch);
-      // todo, wait for peering, etc
-      return seastar::now();
-    } else {
-      logger().info("no pg, shouldn't exist e{}, dropping", epoch);
-      // todo: share map with client
-      return seastar::now();
-    }
-  });
+  shard_services.start_operation<ClientRequest>(
+    *this,
+    conn->get_shared(),
+    std::move(m));
+  return seastar::now();
 }
 
 bool OSD::should_restart() const
@@ -923,7 +854,7 @@ void OSD::update_heartbeat_peers()
   if (!state.is_active()) {
     return;
   }
-  for (auto& pg : pgs) {
+  for (auto& pg : pg_map.get_pgs()) {
     vector<int> up, acting;
     osdmap->pg_to_up_acting_osds(pg.first.pgid,
                                  &up, nullptr,
@@ -937,115 +868,20 @@ void OSD::update_heartbeat_peers()
   heartbeat->update_peers(whoami);
 }
 
-seastar::future<> OSD::handle_pg_notify(
-  ceph::net::Connection* conn,
-  Ref<MOSDPGNotify> m)
-{
-  // assuming all pgs reside in a single shard
-  // see OSD::dequeue_peering_evt()
-  const int from = m->get_source().num();
-  return handle_batch_pg_message(
-    m->get_pg_list(),
-    [from, this](pair<pg_notify_t, PastIntervals> p) {
-      auto& [pg_notify, past_intervals] = p;
-      spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
-      MNotifyRec notify{pgid,
-                        pg_shard_t{from, pg_notify.from},
-                        pg_notify,
-                        0, // the features is not used
-                        past_intervals};
-      logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
-      auto create_info = new PGCreateInfo{
-       pgid,
-       pg_notify.query_epoch,
-       pg_notify.info.history,
-       past_intervals,
-       false};
-      return std::make_optional(
-       std::make_tuple(
-         pgid,
-         std::make_unique<PGPeeringEvent>(
-           pg_notify.epoch_sent,
-           pg_notify.query_epoch,
-           notify,
-           true, // requires_pg
-           create_info)));
-  });
-}
-
-seastar::future<> OSD::handle_pg_info(
-  ceph::net::Connection* conn,
-  Ref<MOSDPGInfo> m)
-{
-  // assuming all pgs reside in a single shard
-  // see OSD::dequeue_peering_evt()
-  const int from = m->get_source().num();
-  return handle_batch_pg_message(
-    m->pg_list,
-    [from, this](pair<pg_notify_t, PastIntervals> p) {
-      auto& pg_notify = p.first;
-      spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
-      logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
-      MInfoRec info{pg_shard_t{from, pg_notify.from},
-                    pg_notify.info,
-                    pg_notify.epoch_sent};
-      return std::make_optional(
-       std::tuple(
-         pgid,
-         std::make_unique<PGPeeringEvent>(
-           pg_notify.epoch_sent,
-           pg_notify.query_epoch,
-           std::move(info))));
-    });
-}
-
-seastar::future<> OSD::handle_pg_query(ceph::net::Connection* conn,
-                                       Ref<MOSDPGQuery> m)
-{
-  // assuming all pgs reside in a single shard
-  // see OSD::dequeue_peering_evt()
-  const int from = m->get_source().num();
-  // TODO: handle missing pg -- handle_batch_pg_message ignores pgs
-  // that don't exist
-  return handle_batch_pg_message_with_missing_handler(
-    m->pg_list,
-    [from, this](pair<spg_t, pg_query_t> p) {
-      auto& [pgid, pg_query] = p;
-      MQuery query{pgid, pg_shard_t{from, pg_query.from},
-                  pg_query, pg_query.epoch_sent};
-      logger().debug("handle_pg_query on {} from {}", pgid, from);
-      return std::make_optional(
-       std::make_tuple(
-         pgid,
-         std::make_unique<PGPeeringEvent>(
-           pg_query.epoch_sent,
-           pg_query.epoch_sent,
-           std::move(query))));
-    },
-    [this, from](pair<spg_t, pg_query_t> p, PeeringCtx &ctx) {
-      auto &[pgid, query] = p;
-      logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
-      pg_info_t empty(spg_t(pgid.pgid, query.to));
-      ceph_assert(query.type == pg_query_t::INFO);
-      ctx.notify_list[from].emplace_back(
-        pg_notify_t(
-          query.from, query.to,
-          query.epoch_sent,
-         osdmap->get_epoch(),
-         empty),
-       PastIntervals());
-    });
-}
-
 seastar::future<> OSD::handle_pg_log(
   ceph::net::Connection* conn,
   Ref<MOSDPGLog> m)
 {
   const int from = m->get_source().num();
   logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
-  return do_peering_event_and_dispatch(
-    m->get_spg(),
-    PGPeeringEventURef(m->get_event()));
+  shard_services.start_operation<RemotePeeringEvent>(
+    *this,
+    conn->get_shared(),
+    shard_services,
+    pg_shard_t(from, m->from),
+    spg_t(m->info.pgid.pgid, m->to),
+    std::move(*m->get_event()));
+  return seastar::now();
 }
 
 void OSD::check_osdmap_features()
@@ -1060,124 +896,34 @@ void OSD::check_osdmap_features()
 seastar::future<> OSD::consume_map(epoch_t epoch)
 {
   // todo: m-to-n: broadcast this news to all shards
+  auto &pgs = pg_map.get_pgs();
   return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
     return advance_pg_to(pg.second, epoch);
   }).then([epoch, this] {
-    auto first = waiting_peering.begin();
-    auto last = waiting_peering.upper_bound(epoch);
-    std::for_each(first, last, [epoch, this](auto& blocked_requests) {
-      blocked_requests.second.set_value(epoch);
-    });
-    waiting_peering.erase(first, last);
-    return seastar::now();
+    osdmap_gate.got_map(epoch);
+    return seastar::make_ready_future();
   });
 }
 
 
-seastar::future<Ref<PG>>
-OSD::get_pg(
+blocking_future<Ref<PG>>
+OSD::get_or_create_pg(
   spg_t pgid,
   epoch_t epoch,
   std::unique_ptr<PGCreateInfo> info)
 {
-  return wait_for_map(epoch).then([this, pgid, epoch, info=std::move(info)](epoch_t) mutable {
-    if (auto pg = pgs.find(pgid); pg != pgs.end()) {
-      return advance_pg_to(pg->second, epoch).then([pg=pg->second]() {
-       return seastar::make_ready_future<Ref<PG>>(pg);
-      });
-    } else if (!info) {
-      return seastar::make_ready_future<Ref<PG>>();
-    } else {
-      auto creating = pgs_creating.find(pgid);
-      if (creating == pgs_creating.end()) {
-       creating = pgs_creating.emplace(
-         pgid,
-         seastar::shared_future<Ref<PG>>(handle_pg_create_info(std::move(info)).then([this, pgid](auto pg) {
-           pgs_creating.erase(pgid);
-           return seastar::make_ready_future<Ref<PG>>(pg);
-         }))).first;
-      }
-      return creating->second.get_future().then([this, epoch](auto pg) {
-       return advance_pg_to(pg, epoch).then([pg]() {
-         return seastar::make_ready_future<Ref<PG>>(pg);
-       });
-      });
-    }
-  });
-}
-
-seastar::future<Ref<PG>>
-OSD::do_peering_event(
-  spg_t pgid,
-  PGPeeringEventURef evt,
-  PeeringCtx &rctx)
-{
-  return get_pg(pgid, evt->get_epoch_sent(), std::move(evt->create_info))
-    .then([this, evt=std::move(evt), &rctx](Ref<PG> pg) mutable {
-      if (pg) {
-       pg->do_peering_event(std::move(evt), rctx);
-      }
-      return seastar::make_ready_future<Ref<PG>>(pg);
-    });
-}
-
-seastar::future<bool>
-OSD::do_peering_event_and_dispatch_transaction(
-  spg_t pgid,
-  std::unique_ptr<PGPeeringEvent> evt,
-  PeeringCtx &rctx)
-{
-  return do_peering_event(pgid, std::move(evt), rctx).then(
-    [this, pgid, &rctx](Ref<PG> pg) mutable {
-      if (pg) {
-       return seastar::when_all_succeed(
-         pg->get_need_up_thru() ? _send_alive() : seastar::now(),
-         shard_services.dispatch_context_transaction(
-           pg->get_collection_ref(), rctx)).then([] { return true; });
-      } else {
-       return seastar::make_ready_future<bool>(false);
-      }
-    });
-}
-
-seastar::future<>
-OSD::do_peering_event_and_dispatch(
-  spg_t pgid,
-  std::unique_ptr<PGPeeringEvent> evt)
-{
-  return seastar::do_with(
-    PeeringCtx{},
-    [this, pgid, evt=std::move(evt)](auto &rctx) mutable {
-      return do_peering_event(pgid, std::move(evt), rctx).then(
-       [this, pgid, &rctx](Ref<PG> pg) mutable {
-         if (pg) {
-           return seastar::when_all_succeed(
-             pg->get_need_up_thru() ? _send_alive() : seastar::now(),
-             shard_services.dispatch_context(
-               pg->get_collection_ref(), std::move(rctx)));
-         } else {
-           return seastar::now();
-         }
-       });
-    }).handle_exception([](auto ep) {
-      logger().error("do_peering_event_and_dispatch saw {}", ep);
-      return seastar::make_exception_future<>(ep);
-    });
+  auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
+  if (!creating && info) {
+    pg_map.set_creating(pgid);
+    handle_pg_create_info(std::move(info));
+  }
+  return std::move(fut);
 }
 
-seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
+blocking_future<Ref<PG>> OSD::wait_for_pg(
+  spg_t pgid)
 {
-  const auto mine = osdmap->get_epoch();
-  if (mine >= epoch) {
-    return seastar::make_ready_future<epoch_t>(mine);
-  } else {
-    logger().info("evt epoch is {}, i have {}, will wait", epoch, mine);
-    auto fut = waiting_peering[epoch].get_shared_future();
-    return osdmap_subscribe(osdmap->get_epoch(), true).then(
-      [fut=std::move(fut)]() mutable {
-      return std::move(fut);
-    });
-  }
+  return pg_map.get_pg(pgid).first;
 }
 
 seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
@@ -1193,7 +939,7 @@ seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
        [this, pg, &rctx](epoch_t next_epoch) {
          return get_map(next_epoch).then(
            [pg, this, &rctx] (cached_map_t&& next_map) {
-             return pg->handle_advance_map(next_map, rctx);
+             pg->handle_advance_map(next_map, rctx);
            });
        }).then([this, &rctx, pg] {
          pg->handle_activate_map(rctx);
index a87ba49db27308068aee254f69182cc57437a967..5cf093972af5a78e0004fda82411bdddfe3dc7e2 100644 (file)
@@ -23,6 +23,9 @@
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/osd/state.h"
 #include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/osd/osd_operations/peering_event.h"
 
 #include "osd/PeeringState.h"
 #include "osd/osd_types.h"
@@ -152,100 +155,38 @@ private:
   seastar::future<Ref<PG>> handle_pg_create_info(
     std::unique_ptr<PGCreateInfo> info);
 
-  template <typename C, typename F, typename G>
-  seastar::future<> handle_batch_pg_message_with_missing_handler(
-    const C &c,
-    F &&f,
-    G &&on_missing_pg) {
-    using mapped_type = const typename C::value_type &;
-    using event_type = std::optional<std::tuple<
-      spg_t,
-      std::unique_ptr<PGPeeringEvent>>>;
-    return seastar::do_with(
-      PeeringCtx{},
-      std::move(f),
-      std::move(on_missing_pg),
-      [this, &c] (auto &rctx, auto &f, auto &on_missing_pg) {
-       return seastar::parallel_for_each(
-         c,
-         [this, &rctx, &f, &on_missing_pg](mapped_type m) {
-           event_type result = f(m);
-           if (result) {
-             auto [pgid, event] = std::move(*result);
-             return do_peering_event_and_dispatch_transaction(
-               pgid,
-               std::move(event),
-               rctx).then([m, &on_missing_pg, &rctx] (bool found) {
-                 if (!found) {
-                   on_missing_pg(m, rctx);
-                 }
-                 return seastar::now();
-               });
-           } else {
-             return seastar::now();
-           }
-         }).then([this, &rctx] {
-              return shard_services.dispatch_context(std::move(rctx));
-         });
-      });
-  }
-
-  template <typename C, typename F>
-  seastar::future<> handle_batch_pg_message(
-    const C &c,
-    F &&f) {
-    return handle_batch_pg_message_with_missing_handler(
-      c,
-      std::move(f),
-      [](const typename C::value_type &, PeeringCtx &){});
-  }
-
-  seastar::future<> handle_pg_create(ceph::net::Connection *conn,
-                                    Ref<MOSDPGCreate2> m);
   seastar::future<> handle_osd_map(ceph::net::Connection* conn,
                                    Ref<MOSDMap> m);
   seastar::future<> handle_osd_op(ceph::net::Connection* conn,
                                  Ref<MOSDOp> m);
   seastar::future<> handle_pg_log(ceph::net::Connection* conn,
                                  Ref<MOSDPGLog> m);
-  seastar::future<> handle_pg_notify(ceph::net::Connection* conn,
-                                    Ref<MOSDPGNotify> m);
-  seastar::future<> handle_pg_info(ceph::net::Connection* conn,
-                                  Ref<MOSDPGInfo> m);
-  seastar::future<> handle_pg_query(ceph::net::Connection* conn,
-                                   Ref<MOSDPGQuery> m);
 
   seastar::future<> committed_osd_maps(version_t first,
                                        version_t last,
                                        Ref<MOSDMap> m);
+
   void check_osdmap_features();
-  // order the promises in descending order of the waited osdmap epoch,
-  // so we can access all the waiters expecting a map whose epoch is less
-  // than a given epoch
-  using waiting_peering_t = std::map<epoch_t, seastar::shared_promise<epoch_t>,
-                                    std::greater<epoch_t>>;
-  waiting_peering_t waiting_peering;
-  // wait for an osdmap whose epoch is greater or equal to given epoch
-  seastar::future<epoch_t> wait_for_map(epoch_t epoch);
+
+public:
+  OSDMapGate osdmap_gate;
+
+  ShardServices &get_shard_services() {
+    return shard_services;
+  }
+
   seastar::future<> consume_map(epoch_t epoch);
 
-  std::map<spg_t, seastar::shared_future<Ref<PG>>> pgs_creating;
-  seastar::future<Ref<PG>> get_pg(
+private:
+  PGMap pg_map;
+
+public:
+  blocking_future<Ref<PG>> get_or_create_pg(
     spg_t pgid,
     epoch_t epoch,
     std::unique_ptr<PGCreateInfo> info);
-
-  seastar::future<Ref<PG>> do_peering_event(
-    spg_t pgid,
-    std::unique_ptr<PGPeeringEvent> evt,
-    PeeringCtx &rctx);
-  seastar::future<> do_peering_event_and_dispatch(
-    spg_t pgid,
-    std::unique_ptr<PGPeeringEvent> evt);
-  seastar::future<bool> do_peering_event_and_dispatch_transaction(
-    spg_t pgid,
-    std::unique_ptr<PGPeeringEvent> evt,
-    PeeringCtx &rctx);
+  blocking_future<Ref<PG>> wait_for_pg(
+    spg_t pgid);
 
   seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
   bool should_restart() const;
@@ -254,6 +195,7 @@ private:
 
   seastar::future<> send_beacon();
   void update_heartbeat_peers();
+
 };
 
 }
diff --git a/src/crimson/osd/osd_connection_priv.h b/src/crimson/osd/osd_connection_priv.h
new file mode 100644 (file)
index 0000000..25c72a8
--- /dev/null
@@ -0,0 +1,25 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+
+namespace ceph::osd {
+
+struct OSDConnectionPriv : public ceph::net::Connection::user_private_t {
+  ClientRequest::ConnectionPipeline client_request_conn_pipeline;
+  RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
+};
+
+static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) {
+  if (!conn->has_user_private()) {
+    conn->set_user_private(std::make_unique<OSDConnectionPriv>());
+  }
+  return static_cast<OSDConnectionPriv&>(conn->get_user_private());
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc
new file mode 100644 (file)
index 0000000..d65887d
--- /dev/null
@@ -0,0 +1,76 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+ClientRequest::ClientRequest(
+  OSD &osd, ceph::net::ConnectionRef conn, Ref<MOSDOp> &&m)
+  : osd(osd), conn(conn), m(m)
+{}
+
+void ClientRequest::print(std::ostream &lhs) const
+{
+  lhs << *m;
+}
+
+void ClientRequest::dump_detail(Formatter *f) const
+{
+}
+
+ClientRequest::ConnectionPipeline &ClientRequest::cp()
+{
+  return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+{
+  return pg.client_request_pg_pipeline;
+}
+
+seastar::future<> ClientRequest::start()
+{
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  with_blocking_future(handle.enter(cp().await_map))
+    .then([this]() {
+      return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch()));
+    }).then([this](epoch_t epoch) {
+      return with_blocking_future(handle.enter(cp().get_pg));
+    }).then([this] {
+      return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+    }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+      return seastar::do_with(
+       std::move(pg), std::move(ref), [this](auto pg, auto op) {
+       return with_blocking_future(
+         handle.enter(pp(*pg).await_map)
+       ).then([this, pg] {
+         return with_blocking_future(
+           pg->osdmap_gate.wait_for_map(m->get_map_epoch()));
+       }).then([this, pg] (auto) {
+         return with_blocking_future(handle.enter(pp(*pg).process));
+       }).then([this, pg] {
+         return pg->handle_op(conn.get(), std::move(m));
+       });
+      });
+    });
+  return seastar::make_ready_future();
+}
+
+}
diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h
new file mode 100644 (file)
index 0000000..91bcfd3
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDOp;
+
+namespace ceph::osd {
+class PG;
+class OSD;
+
+class ClientRequest final : public OperationT<ClientRequest> {
+  OSD &osd;
+  ceph::net::ConnectionRef conn;
+  Ref<MOSDOp> m;
+  OrderedPipelinePhase::Handle handle;
+
+public:
+  class ConnectionPipeline {
+    OrderedPipelinePhase await_map = {
+      "ClientRequest::ConnectionPipeline::await_map"
+    };
+    OrderedPipelinePhase get_pg = {
+      "ClientRequest::ConnectionPipeline::get_pg"
+    };
+    friend class ClientRequest;
+  };
+  class PGPipeline {
+    OrderedPipelinePhase await_map = {
+      "ClientRequest::PGPipeline::await_map"
+    };
+    OrderedPipelinePhase process = {
+      "ClientRequest::PGPipeline::process"
+    };
+    friend class ClientRequest;
+  };
+
+  static constexpr OperationTypeCode type = OperationTypeCode::client_request;
+
+  ClientRequest(OSD &osd, ceph::net::ConnectionRef, Ref<MOSDOp> &&m);
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+
+private:
+  ConnectionPipeline &cp();
+  PGPipeline &pp(PG &pg);
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/compound_peering_request.cc b/src/crimson/osd/osd_operations/compound_peering_request.cc
new file mode 100644 (file)
index 0000000..f1abf29
--- /dev/null
@@ -0,0 +1,321 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "osd/PeeringState.h"
+
+#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGNotify.h"
+#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGCreate2.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace {
+using namespace ceph::osd;
+
+struct compound_state {
+  seastar::promise<BufferedRecoveryMessages> promise;
+  BufferedRecoveryMessages ctx;
+  ~compound_state() {
+    promise.set_value(std::move(ctx));
+  }
+};
+using compound_state_ref = seastar::lw_shared_ptr<compound_state>;
+
+class PeeringSubEvent : public RemotePeeringEvent {
+  compound_state_ref state;
+public:
+  template <typename... Args>
+  PeeringSubEvent(compound_state_ref state, Args &&... args) :
+    RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
+
+  seastar::future<> complete_rctx(Ref<ceph::osd::PG> pg) final {
+    logger().debug("{}: submitting ctx transaction", *this);
+    state->ctx.accept_buffered_messages(ctx);
+    state = {};
+    if (!pg) {
+      ceph_assert(ctx.transaction.empty());
+      return seastar::now();
+    } else {
+      return osd.get_shard_services().dispatch_context_transaction(
+       pg->get_collection_ref(), ctx);
+    }
+  }
+};
+
+std::vector<OperationRef> handle_pg_create(
+  OSD &osd,
+  ceph::net::ConnectionRef conn,
+  compound_state_ref state,
+  Ref<MOSDPGCreate2> m)
+{
+  std::vector<OperationRef> ret;
+  for (auto &p : m->pgs) {
+    const spg_t &pgid = p.first;
+    const auto &[created, created_stamp] = p.second;
+    auto q = m->pg_extra.find(pgid);
+    ceph_assert(q != m->pg_extra.end());
+    logger().debug(
+      "{}, {} {} e{} @{} history {} pi {}",
+      __func__,
+      pgid,
+      created,
+      created_stamp,
+      q->second.first,
+      q->second.second);
+    if (!q->second.second.empty() &&
+       m->epoch < q->second.second.get_bounds().second) {
+      logger().error(
+       "got pg_create on {} epoch {} unmatched past_intervals (history {})",
+       pgid,
+       m->epoch,
+       q->second.second,
+       q->second.first);
+    } else {
+      auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+       state,
+       osd,
+       conn,
+       osd.get_shard_services(),
+       pg_shard_t(),
+       pgid,
+       m->epoch,
+       m->epoch,
+       NullEvt(),
+       true,
+       new PGCreateInfo(
+         pgid,
+         m->epoch,
+         q->second.first,
+         q->second.second,
+         true));
+    }
+  }
+  return ret;
+}
+
+std::vector<OperationRef> handle_pg_notify(
+  OSD &osd,
+  ceph::net::ConnectionRef conn,
+  compound_state_ref state,
+  Ref<MOSDPGNotify> m)
+{
+  std::vector<OperationRef> ret;
+  ret.reserve(m->get_pg_list().size());
+  const int from = m->get_source().num();
+  for (auto &p : m->get_pg_list()) {
+    auto& [pg_notify, past_intervals] = p;
+    spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+    MNotifyRec notify{pgid,
+                     pg_shard_t{from, pg_notify.from},
+                     pg_notify,
+                     0, // the features is not used
+                     past_intervals};
+    logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
+    auto create_info = new PGCreateInfo{
+      pgid,
+      pg_notify.query_epoch,
+      pg_notify.info.history,
+      past_intervals,
+      false};
+    auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+      state,
+      osd,
+      conn,
+      osd.get_shard_services(),
+      pg_shard_t(from, pg_notify.from),
+      pgid,
+      pg_notify.epoch_sent,
+      pg_notify.query_epoch,
+      notify,
+      true, // requires_pg
+      create_info);
+    op->start();
+    ret.push_back(op);
+  }
+  return ret;
+}
+
+std::vector<OperationRef> handle_pg_info(
+  OSD &osd,
+  ceph::net::ConnectionRef conn,
+  compound_state_ref state,
+  Ref<MOSDPGInfo> m)
+{
+  std::vector<OperationRef> ret;
+  ret.reserve(m->pg_list.size());
+  const int from = m->get_source().num();
+  for (auto &p : m->pg_list) {
+    auto& pg_notify = p.first;
+    spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+    logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
+    MInfoRec info{pg_shard_t{from, pg_notify.from},
+                 pg_notify.info,
+                 pg_notify.epoch_sent};
+    auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+       state,
+       osd,
+       conn,
+       osd.get_shard_services(),
+       pg_shard_t(from, pg_notify.from),
+       pgid,
+       pg_notify.epoch_sent,
+       pg_notify.query_epoch,
+       std::move(info));
+    ret.push_back(op);
+  }
+  return ret;
+}
+
+class QuerySubEvent : public PeeringSubEvent {
+public:
+  template <typename... Args>
+  QuerySubEvent(Args &&... args) :
+    PeeringSubEvent(std::forward<Args>(args)...) {}
+
+  void on_pg_absent() final {
+    logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
+    pg_info_t empty(pgid);
+    ctx.notify_list[from.osd].emplace_back(
+      pg_notify_t(
+       from.shard, pgid.shard,
+       evt.get_epoch_sent(),
+       osd.get_shard_services().get_osdmap()->get_epoch(),
+       empty),
+      PastIntervals());
+  }
+};
+
+std::vector<OperationRef> handle_pg_query(
+  OSD &osd,
+  ceph::net::ConnectionRef conn,
+  compound_state_ref state,
+  Ref<MOSDPGQuery> m)
+{
+  std::vector<OperationRef> ret;
+  ret.reserve(m->pg_list.size());
+  const int from = m->get_source().num();
+  for (auto &p : m->pg_list) {
+    auto& [pgid, pg_query] = p;
+    MQuery query{pgid, pg_shard_t{from, pg_query.from},
+                pg_query, pg_query.epoch_sent};
+    logger().debug("handle_pg_query on {} from {}", pgid, from);
+    auto op = osd.get_shard_services().start_operation<QuerySubEvent>(
+      state,
+      osd,
+      conn,
+      osd.get_shard_services(),
+      pg_shard_t(from, pg_query.from),
+      pgid,
+      pg_query.epoch_sent,
+      pg_query.epoch_sent,
+      std::move(query));
+    ret.push_back(op);
+  }
+  return ret;
+}
+
+struct SubOpBlocker : BlockerT<SubOpBlocker> {
+  static constexpr const char * type_name = "CompoundOpBlocker";
+
+  std::vector<OperationRef> subops;
+  SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+
+  virtual void dump_detail(Formatter *f) const {
+    f->open_array_section("dependent_operations");
+    {
+      for (auto &i : subops) {
+       i->dump_brief(f);
+      }
+    }
+    f->close_section();
+  }
+};
+
+} // namespace
+
+namespace ceph::osd {
+
+CompoundPeeringRequest::CompoundPeeringRequest(
+  OSD &osd, ceph::net::ConnectionRef conn, Ref<Message> m)
+  : osd(osd),
+    conn(conn),
+    m(m)
+{}
+
+void CompoundPeeringRequest::print(std::ostream &lhs) const
+{
+  lhs << *m;
+}
+
+void CompoundPeeringRequest::dump_detail(Formatter *f) const
+{
+  f->dump_stream("message") << *m;
+}
+
+seastar::future<> CompoundPeeringRequest::start()
+{
+  logger().info("{}: starting", *this);
+  auto state = seastar::make_lw_shared<compound_state>();
+  auto blocker = std::make_unique<SubOpBlocker>(
+    [&] {
+      switch (m->get_type()) {
+      case MSG_OSD_PG_CREATE2:
+       return handle_pg_create(
+         osd,
+         conn,
+         state,
+         boost::static_pointer_cast<MOSDPGCreate2>(m));
+      case MSG_OSD_PG_NOTIFY:
+       return handle_pg_notify(
+         osd,
+         conn,
+         state,
+         boost::static_pointer_cast<MOSDPGNotify>(m));
+      case MSG_OSD_PG_INFO:
+       return handle_pg_info(
+         osd,
+         conn,
+         state,
+         boost::static_pointer_cast<MOSDPGInfo>(m));
+      case MSG_OSD_PG_QUERY:
+       return handle_pg_query(
+         osd,
+         conn,
+         state,
+         boost::static_pointer_cast<MOSDPGQuery>(m));
+      default:
+       ceph_assert("Invalid message type" == 0);
+       return std::vector<OperationRef>();
+      }
+    }());
+
+  add_blocker(blocker.get());
+  IRef ref = this;
+  logger().info("{}: about to fork future", *this);
+  state->promise.get_future().then(
+    [this, blocker=std::move(blocker)](auto &&ctx) {
+      clear_blocker(blocker.get());
+      logger().info("{}: sub events complete", *this);
+      return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+    }).then([this, ref=std::move(ref)] {
+      logger().info("{}: complete", *this);
+    });
+
+  logger().info("{}: forked, returning", *this);
+  return seastar::now();
+}
+
+} // namespace ceph::osd
diff --git a/src/crimson/osd/osd_operations/compound_peering_request.h b/src/crimson/osd/osd_operations/compound_peering_request.h
new file mode 100644 (file)
index 0000000..ac901f8
--- /dev/null
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "msg/MessageRef.h"
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+using osd_id_t = int;
+
+class CompoundPeeringRequest : public OperationT<CompoundPeeringRequest> {
+public:
+  static constexpr OperationTypeCode type =
+    OperationTypeCode::compound_peering_request;
+
+private:
+  OSD &osd;
+  ceph::net::ConnectionRef conn;
+  Ref<Message> m;
+
+public:
+  CompoundPeeringRequest(
+    OSD &osd, ceph::net::ConnectionRef conn, Ref<Message> m);
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+};
+
+}
diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc
new file mode 100644 (file)
index 0000000..6112d1f
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+void PeeringEvent::print(std::ostream &lhs) const
+{
+  lhs << "PeeringEvent("
+      << "from=" << from
+      << " pgid=" << pgid
+      << " sent=" << evt.get_epoch_sent()
+      << " requested=" << evt.get_epoch_requested()
+      << " evt=" << evt.get_desc()
+      << ")";
+}
+
+void PeeringEvent::dump_detail(Formatter *f) const
+{
+  f->open_object_section("PeeringEvent");
+  f->dump_stream("from") << from;
+  f->dump_stream("pgid") << pgid;
+  f->dump_int("sent", evt.get_epoch_sent());
+  f->dump_int("requested", evt.get_epoch_requested());
+  f->dump_string("evt", evt.get_desc());
+  f->close_section();
+}
+
+
+PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
+{
+  return pg.peering_request_pg_pipeline;
+}
+
+seastar::future<> PeeringEvent::start()
+{
+
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  get_pg().then([this](Ref<PG> pg) {
+    if (!pg) {
+      logger().debug("{}: pg absent, did not create", *this);
+      on_pg_absent();
+      handle.exit();
+      return complete_rctx(pg);
+    } else {
+      logger().debug("{}: pg present", *this);
+      return with_blocking_future(handle.enter(pp(*pg).await_map)
+      ).then([this, pg] {
+       return with_blocking_future(
+         pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+      }).then([this, pg](auto) {
+       return with_blocking_future(handle.enter(pp(*pg).process));
+      }).then([this, pg] {
+       pg->do_peering_event(evt, ctx);
+       handle.exit();
+       return complete_rctx(pg);
+      });
+    }
+  }).then([this, ref=std::move(ref)] {
+    logger().debug("{}: complete", *this);
+  });
+  return seastar::make_ready_future();
+}
+
+void PeeringEvent::on_pg_absent()
+{
+  logger().debug("{}: pg absent, dropping", *this);
+}
+
+seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+{
+  logger().debug("{}: submitting ctx", *this);
+  return shard_services.dispatch_context(
+    pg->get_collection_ref(),
+    std::move(ctx));
+}
+
+RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
+{
+  return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+seastar::future<Ref<PG>> RemotePeeringEvent::get_pg() {
+  return with_blocking_future(
+    handle.enter(cp().await_map)
+  ).then([this] {
+    return with_blocking_future(
+      osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+  }).then([this](auto epoch) {
+    logger().debug("{}: got map {}", *this, epoch);
+    return with_blocking_future(handle.enter(cp().get_pg));
+  }).then([this] {
+    return with_blocking_future(
+      osd.get_or_create_pg(
+       pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
+  });
+}
+
+seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
+  return seastar::make_ready_future<Ref<PG>>(pg);
+}
+
+LocalPeeringEvent::~LocalPeeringEvent() {}
+
+}
diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h
new file mode 100644 (file)
index 0000000..995df4b
--- /dev/null
@@ -0,0 +1,123 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+namespace ceph::osd {
+
+class OSD;
+class ShardServices;
+class PG;
+
+class PeeringEvent : public OperationT<PeeringEvent> {
+public:
+  static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
+
+  class PGPipeline {
+    OrderedPipelinePhase await_map = {
+      "PeeringEvent::PGPipeline::await_map"
+    };
+    OrderedPipelinePhase process = {
+      "PeeringEvent::PGPipeline::process"
+    };
+    friend class PeeringEvent;
+  };
+
+protected:
+  OrderedPipelinePhase::Handle handle;
+  PGPipeline &pp(PG &pg);
+
+  ShardServices &shard_services;
+  PeeringCtx ctx;
+  pg_shard_t from;
+  spg_t pgid;
+  PGPeeringEvent evt;
+
+  const pg_shard_t get_from() const {
+    return from;
+  }
+
+  const spg_t get_pgid() const {
+    return pgid;
+  }
+
+  const PGPeeringEvent &get_event() const {
+    return evt;
+  }
+
+  virtual void on_pg_absent();
+  virtual seastar::future<> complete_rctx(Ref<PG>);
+  virtual seastar::future<Ref<PG>> get_pg() = 0;
+
+public:
+  template <typename... Args>
+  PeeringEvent(
+    ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+    Args&&... args) :
+    shard_services(shard_services),
+    from(from),
+    pgid(pgid),
+    evt(std::forward<Args>(args)...)
+  {}
+
+
+  void print(std::ostream &) const final;
+  void dump_detail(Formatter *f) const final;
+  seastar::future<> start();
+};
+
+class RemotePeeringEvent : public PeeringEvent {
+protected:
+  OSD &osd;
+  ceph::net::ConnectionRef conn;
+
+  seastar::future<Ref<PG>> get_pg() final;
+
+public:
+  class ConnectionPipeline {
+    OrderedPipelinePhase await_map = {
+      "PeeringRequest::ConnectionPipeline::await_map"
+    };
+    OrderedPipelinePhase get_pg = {
+      "PeeringRequest::ConnectionPipeline::get_pg"
+    };
+    friend class RemotePeeringEvent;
+  };
+
+  template <typename... Args>
+  RemotePeeringEvent(OSD &osd, ceph::net::ConnectionRef conn, Args&&... args) :
+    PeeringEvent(std::forward<Args>(args)...),
+    osd(osd),
+    conn(conn)
+  {}
+
+private:
+  ConnectionPipeline &cp();
+};
+
+class LocalPeeringEvent final : public PeeringEvent {
+protected:
+  seastar::future<Ref<PG>> get_pg() final;
+
+  Ref<PG> pg;
+
+public:
+  template <typename... Args>
+  LocalPeeringEvent(Ref<PG> pg, Args&&... args) :
+    PeeringEvent(std::forward<Args>(args)...),
+    pg(pg)
+  {}
+
+  virtual ~LocalPeeringEvent();
+};
+
+
+}
diff --git a/src/crimson/osd/osdmap_gate.cc b/src/crimson/osd/osdmap_gate.cc
new file mode 100644 (file)
index 0000000..f837434
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+void OSDMapGate::OSDMapBlocker::dump_detail(Formatter *f) const
+{
+  f->open_object_section("OSDMapGate");
+  f->dump_int("epoch", epoch);
+  f->close_section();
+}
+
+blocking_future<epoch_t> OSDMapGate::wait_for_map(epoch_t epoch)
+{
+  if (current >= epoch) {
+    return make_ready_blocking_future<epoch_t>(current);
+  } else {
+    logger().info("evt epoch is {}, i have {}, will wait", epoch, current);
+    auto &blocker = waiting_peering.emplace(
+      epoch, make_pair(blocker_type, epoch)).first->second;
+    auto fut = blocker.promise.get_shared_future();
+    if (shard_services) {
+      return blocker.make_blocking_future(
+       (*shard_services).get().osdmap_subscribe(current, true).then(
+         [fut=std::move(fut)]() mutable {
+           return std::move(fut);
+         }));
+    } else {
+      return blocker.make_blocking_future(std::move(fut));
+    }
+  }
+}
+
+void OSDMapGate::got_map(epoch_t epoch) {
+  current = epoch;
+  auto first = waiting_peering.begin();
+  auto last = waiting_peering.upper_bound(epoch);
+  std::for_each(first, last, [epoch, this](auto& blocked_requests) {
+    blocked_requests.second.promise.set_value(epoch);
+  });
+  waiting_peering.erase(first, last);
+}
+
+}
diff --git a/src/crimson/osd/osdmap_gate.h b/src/crimson/osd/osdmap_gate.h
new file mode 100644 (file)
index 0000000..073ce84
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <functional>
+#include <map>
+#include <optional>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/types.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace ceph {
+  class Formatter;
+  namespace osd {
+    class ShardServices;
+  }
+}
+
+namespace ceph::osd {
+
+class OSDMapGate {
+  struct OSDMapBlocker : public Blocker {
+    const char * type_name;
+    epoch_t epoch;
+
+    OSDMapBlocker(std::pair<const char *, epoch_t> args)
+      : type_name(args.first), epoch(args.second) {}
+
+    OSDMapBlocker(const OSDMapBlocker &) = delete;
+    OSDMapBlocker(OSDMapBlocker &&) = delete;
+    OSDMapBlocker &operator=(const OSDMapBlocker &) = delete;
+    OSDMapBlocker &operator=(OSDMapBlocker &&) = delete;
+
+    seastar::shared_promise<epoch_t> promise;
+
+    void dump_detail(Formatter *f) const final;
+    const char *get_type_name() const final {
+      return type_name;
+    }
+  };
+
+  // order the promises in descending order of the waited osdmap epoch,
+  // so we can access all the waiters expecting a map whose epoch is less
+  // than a given epoch
+  using waiting_peering_t = std::map<epoch_t,
+                                    OSDMapBlocker,
+                                    std::greater<epoch_t>>;
+  const char *blocker_type;
+  waiting_peering_t waiting_peering;
+  epoch_t current = 0;
+  std::optional<std::reference_wrapper<ShardServices>> shard_services;
+public:
+  OSDMapGate(
+    const char *blocker_type,
+    std::optional<std::reference_wrapper<ShardServices>> shard_services)
+    : blocker_type(blocker_type), shard_services(shard_services) {}
+
+  // wait for an osdmap whose epoch is greater or equal to given epoch
+  blocking_future<epoch_t> wait_for_map(epoch_t epoch);
+  void got_map(epoch_t epoch);
+};
+
+}
index 3c83724bd401af31e9a712d15b1659fe7f70189a..d78f94403d95240db8057d65d202e59f0e9ecb95 100644 (file)
 
 #include "osd/OSDMap.h"
 
+#include "os/Transaction.h"
+
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/os/cyan_collection.h"
-#include "crimson/os/futurized_store.h"
 #include "os/Transaction.h"
+#include "crimson/os/cyan_store.h"
+
 #include "crimson/osd/exceptions.h"
 #include "crimson/osd/pg_meta.h"
-
-#include "pg_backend.h"
+#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/osd_operations/peering_event.h"
 
 namespace {
   seastar::logger& logger() {
@@ -72,6 +75,7 @@ PG::PG(
     pg_whoami{pg_shard},
     coll_ref(shard_services.get_store().open_collection(coll)),
     pgmeta_oid{pgid.make_pgmeta_oid()},
+    osdmap_gate("PG::osdmap_gate", std::nullopt),
     shard_services{shard_services},
     osdmap{osdmap},
     backend(
@@ -98,27 +102,26 @@ PG::PG(
   peering_state.set_backend_predicates(
     new ReadablePredicate(pg_whoami),
     new RecoverablePredicate());
+  osdmap_gate.got_map(osdmap->get_epoch());
 }
 
+PG::~PG() {}
+
 bool PG::try_flush_or_schedule_async() {
-// FIXME once there's a good way to schedule an "async" peering event
-#if 0
   shard_services.get_store().do_transaction(
     coll_ref,
     ObjectStore::Transaction()).then(
-      [this, epoch=peering_state.get_osdmap()->get_epoch()](){
-       if (!peering_state.pg_has_reset_since(epoch)) {
-         PeeringCtx rctx;
-         auto evt = PeeringState::IntervalFlush();
-         do_peering_event(evt, rctx);
-         return shard_services.dispatch_context(std::move(rctx));
-       } else {
-         return seastar::now();
-       }
+      [this, epoch=peering_state.get_osdmap()->get_epoch()]() {
+       return shard_services.start_operation<LocalPeeringEvent>(
+         this,
+         shard_services,
+         pg_whoami,
+         pgid,
+         epoch,
+         epoch,
+         PeeringState::IntervalFlush());
       });
   return false;
-#endif
-  return true;
 }
 
 void PG::log_state_enter(const char *state) {
@@ -232,6 +235,7 @@ void PG::handle_advance_map(
     newacting,
     acting_primary,
     rctx);
+  osdmap_gate.got_map(next_map->get_epoch());
 }
 
 void PG::handle_activate_map(PeeringCtx &rctx)
index 21d1cb56ac0a28bf9d2818e22b771c37260fdc61..7c8e4518d5d54509d5092df4188b80c9c70e2cee 100644 (file)
 #include "common/dout.h"
 #include "crimson/net/Fwd.h"
 #include "os/Transaction.h"
-#include "crimson/osd/shard_services.h"
 #include "osd/osd_types.h"
 #include "osd/osd_internal_types.h"
 #include "osd/PeeringState.h"
 
 #include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+
 class OSDMap;
 class MQuery;
 class PGBackend;
@@ -36,6 +40,9 @@ namespace ceph::os {
   class FuturizedStore;
 }
 
+namespace ceph::osd {
+class ClientRequest;
+
 class PG : public boost::intrusive_ref_counter<
   PG,
   boost::thread_unsafe_counter>,
@@ -45,6 +52,9 @@ class PG : public boost::intrusive_ref_counter<
   using ec_profile_t = std::map<std::string,std::string>;
   using cached_map_t = boost::local_shared_ptr<const OSDMap>;
 
+  ClientRequest::PGPipeline client_request_pg_pipeline;
+  PeeringEvent::PGPipeline peering_request_pg_pipeline;
+
   spg_t pgid;
   pg_shard_t pg_whoami;
   coll_t coll;
@@ -56,9 +66,11 @@ public:
      pg_pool_t&& pool,
      std::string&& name,
      cached_map_t osdmap,
-     ceph::osd::ShardServices &shard_services,
+     ShardServices &shard_services,
      ec_profile_t profile);
 
+  ~PG();
+
   // EpochSource
   epoch_t get_osdmap_epoch() const final {
     return peering_state.get_osdmap_epoch();
@@ -187,7 +199,7 @@ public:
     t.register_on_commit(
       new LambdaContext([this, on_commit](){
        PeeringCtx rctx;
-        do_peering_event(on_commit, rctx);
+        do_peering_event(*on_commit, rctx);
        shard_services.dispatch_context(std::move(rctx));
     }));
   }
@@ -389,16 +401,6 @@ public:
 
   void do_peering_event(
     PGPeeringEvent& evt, PeeringCtx &rctx);
-  void do_peering_event(
-    std::unique_ptr<PGPeeringEvent> evt,
-    PeeringCtx &rctx) {
-    return do_peering_event(*evt, rctx);
-  }
-  void do_peering_event(
-    PGPeeringEventRef evt,
-    PeeringCtx &rctx) {
-    return do_peering_event(*evt, rctx);
-  }
 
   void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
   void handle_activate_map(PeeringCtx &rctx);
@@ -421,6 +423,7 @@ private:
                                             uint64_t limit);
 
 private:
+  OSDMapGate osdmap_gate;
   ShardServices &shard_services;
 
   cached_map_t osdmap;
@@ -432,6 +435,10 @@ private:
   seastar::future<> wait_for_active();
 
   friend std::ostream& operator<<(std::ostream&, const PG& pg);
+  friend class ClientRequest;
+  friend class PeeringEvent;
 };
 
 std::ostream& operator<<(std::ostream&, const PG& pg);
+
+}
diff --git a/src/crimson/osd/pg_map.cc b/src/crimson/osd/pg_map.cc
new file mode 100644 (file)
index 0000000..536ba99
--- /dev/null
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/pg_map.h"
+
+#include "crimson/osd/pg.h"
+#include "common/Formatter.h"
+
+namespace {
+  seastar::logger& logger() {
+    return ceph::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace ceph::osd {
+
+PGMap::PGCreationState::PGCreationState(spg_t pgid) : pgid(pgid) {}
+PGMap::PGCreationState::~PGCreationState() {}
+
+void PGMap::PGCreationState::dump_detail(Formatter *f) const
+{
+  f->dump_stream("pgid") << pgid;
+  f->dump_bool("creating", creating);
+}
+
+std::pair<blocking_future<Ref<PG>>, bool> PGMap::get_pg(spg_t pgid, bool wait)
+{
+  if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+    return make_pair(make_ready_blocking_future<Ref<PG>>(pg->second), true);
+  } else if (!wait) {
+    return make_pair(make_ready_blocking_future<Ref<PG>>(nullptr), true);
+  } else {
+    auto &state = pgs_creating.emplace(pgid, pgid).first->second;
+    return make_pair(
+      state.make_blocking_future(state.promise.get_shared_future()),
+      state.creating);
+  }
+}
+
+void PGMap::set_creating(spg_t pgid)
+{
+  logger().debug("Creating {}", pgid);
+  ceph_assert(pgs.count(pgid) == 0);
+  auto pg = pgs_creating.find(pgid);
+  ceph_assert(pg != pgs_creating.end());
+  ceph_assert(pg->second.creating == false);
+  pg->second.creating = true;
+}
+
+void PGMap::pg_created(spg_t pgid, Ref<PG> pg)
+{
+  logger().debug("Created {}", pgid);
+  ceph_assert(!pgs.count(pgid));
+  pgs.emplace(pgid, pg);
+
+  auto state = pgs_creating.find(pgid);
+  ceph_assert(state != pgs_creating.end());
+  state->second.promise.set_value(pg);
+  pgs_creating.erase(pgid);
+}
+
+void PGMap::pg_loaded(spg_t pgid, Ref<PG> pg)
+{
+  ceph_assert(!pgs.count(pgid));
+  pgs.emplace(pgid, pg);
+}
+
+PGMap::~PGMap() {}
+
+}
diff --git a/src/crimson/osd/pg_map.h b/src/crimson/osd/pg_map.h
new file mode 100644 (file)
index 0000000..8b4086e
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/types.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+
+namespace ceph::osd {
+class PG;
+
+class PGMap {
+  struct PGCreationState : BlockerT<PGCreationState> {
+    static constexpr const char * type_name = "PGCreation";
+
+    void dump_detail(Formatter *f) const final;
+
+    spg_t pgid;
+    seastar::shared_promise<Ref<PG>> promise;
+    bool creating = false;
+    PGCreationState(spg_t pgid);
+
+    PGCreationState(const PGCreationState &) = delete;
+    PGCreationState(PGCreationState &&) = delete;
+    PGCreationState &operator=(const PGCreationState &) = delete;
+    PGCreationState &operator=(PGCreationState &&) = delete;
+
+    ~PGCreationState();
+  };
+
+  std::map<spg_t, PGCreationState> pgs_creating;
+  std::map<spg_t, Ref<PG>> pgs;
+
+public:
+  /**
+   * Get future for pg with a bool indicating whether it's already being
+   * created.
+   */
+  std::pair<blocking_future<Ref<PG>>, bool> get_pg(spg_t pgid, bool wait=true);
+
+  /**
+   * Set creating
+   */
+  void set_creating(spg_t pgid);
+
+  /**
+   * Set newly created pg
+   */
+  void pg_created(spg_t pgid, Ref<PG> pg);
+
+  /**
+   * Add newly loaded pg
+   */
+  void pg_loaded(spg_t pgid, Ref<PG> pg);
+
+  decltype(pgs) &get_pgs() { return pgs; }
+
+  PGMap() = default;
+  ~PGMap();
+};
+
+}
index 9d4c9984205d4e4838e7148fd67c5f25a374af3e..f5502e9272169fe98d5cd65d5fb970a97a71a66a 100644 (file)
@@ -36,7 +36,8 @@ ShardServices::ShardServices(
       public_msgr(public_msgr),
       monc(monc),
       mgrc(mgrc),
-      store(store) {
+      store(store)
+{
   perf = build_osd_logger(&cct);
   cct.get_perfcounters_collection()->add(perf);
 
@@ -67,7 +68,7 @@ seastar::future<> ShardServices::dispatch_context_transaction(
 }
 
 seastar::future<> ShardServices::dispatch_context_messages(
-  PeeringCtx &ctx)
+  BufferedRecoveryMessages &&ctx)
 {
   auto ret = seastar::when_all_succeed(
     seastar::parallel_for_each(std::move(ctx.notify_list),
@@ -105,13 +106,9 @@ seastar::future<> ShardServices::dispatch_context(
   PeeringCtx &&ctx)
 {
   ceph_assert(col || ctx.transaction.empty());
-  return seastar::do_with(
-    PeeringCtx{ctx},
-    [this, col](auto& todo) {
-      return seastar::when_all_succeed(
-       dispatch_context_messages(todo),
-       col ? dispatch_context_transaction(col, todo) : seastar::now());
-    });
+  return seastar::when_all_succeed(
+    dispatch_context_messages(BufferedRecoveryMessages(ctx)),
+    col ? dispatch_context_transaction(col, ctx) : seastar::now());
 }
 
 void ShardServices::queue_want_pg_temp(pg_t pgid,
@@ -190,6 +187,16 @@ void ShardServices::send_pg_temp()
   _sent_pg_temp();
 }
 
+void ShardServices::update_map(cached_map_t new_osdmap)
+{
+  osdmap = std::move(new_osdmap);
+}
+
+ShardServices::cached_map_t &ShardServices::get_osdmap()
+{
+  return osdmap;
+}
+
 seastar::future<> ShardServices::send_pg_created(pg_t pgid)
 {
   logger().debug(__func__);
@@ -227,4 +234,15 @@ void ShardServices::prune_pg_created()
   }
 }
 
+seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request)
+{
+  logger().info("{}({})", __func__, epoch);
+  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+      force_request) {
+    return monc.renew_subs();
+  } else {
+    return seastar::now();
+  }
+}
+
 };
index fa3c72b1b0b8c089b249ed80082125c026baafd4..9d04ba2e9dc4508a6dddb5b4638190a09ddbf313 100644 (file)
@@ -6,6 +6,7 @@
 #include <boost/intrusive_ptr.hpp>
 #include <seastar/core/future.hh>
 
+#include "osd_operation.h"
 #include "msg/MessageRef.h"
 #include "crimson/os/cyan_collection.h"
 
@@ -28,6 +29,7 @@ namespace ceph::os {
 class PerfCounters;
 class OSDMap;
 class PeeringCtx;
+class BufferedRecoveryMessages;
 
 namespace ceph::osd {
 
@@ -68,6 +70,16 @@ public:
     return &cct;
   }
 
+  // Op Tracking
+  OperationRegistry registry;
+
+  template <typename T, typename... Args>
+  typename T::IRef start_operation(Args&&... args) {
+    auto op = registry.create_operation<T>(std::forward<Args>(args)...);
+    op->start();
+    return op;
+  }
+
   // Loggers
   PerfCounters &get_recoverystate_perf_logger() {
     return *recoverystate_perf;
@@ -82,7 +94,7 @@ public:
 
   /// Dispatch and reset ctx messages
   seastar::future<> dispatch_context_messages(
-    PeeringCtx &ctx);
+    BufferedRecoveryMessages &&ctx);
 
   /// Dispatch ctx and dispose of context
   seastar::future<> dispatch_context(
@@ -118,12 +130,8 @@ public:
 private:
   cached_map_t osdmap;
 public:
-  void update_map(cached_map_t new_osdmap) {
-    osdmap = std::move(new_osdmap);
-  }
-  cached_map_t &get_osdmap() {
-    return osdmap;
-  }
+  void update_map(cached_map_t new_osdmap);
+  cached_map_t &get_osdmap();
 
   // PG Created State
 private:
@@ -132,6 +140,8 @@ public:
   seastar::future<> send_pg_created(pg_t pgid);
   seastar::future<> send_pg_created();
   void prune_pg_created();
+
+  seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
 };