]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: handle MOSDPGNotify in OSD
authorKefu Chai <kchai@redhat.com>
Mon, 25 Feb 2019 13:42:09 +0000 (21:42 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 22 Mar 2019 05:21:32 +0000 (13:21 +0800)
will handle it in PG in follow-up changes.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 0bd9071bc4f665cb66cc46b0fa8a9812a87d3740..a81770e900612a58dcc23c4f3098082f9659e768 100644 (file)
@@ -9,9 +9,12 @@
 #include "messages/MOSDBeacon.h"
 #include "messages/MOSDBoot.h"
 #include "messages/MOSDMap.h"
+#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGNotify.h"
 #include "messages/MPGStats.h"
 
 #include "crimson/mon/MonClient.h"
+
 #include "crimson/net/Connection.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/os/cyan_collection.h"
@@ -23,6 +26,8 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_meta.h"
 
+#include "osd/PGPeeringEvent.h"
+
 namespace {
   seastar::logger& logger() {
     return ceph::get_logger(ceph_subsys_osd);
@@ -357,6 +362,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
   switch (m->get_type()) {
   case CEPH_MSG_OSD_MAP:
     return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+  case MSG_OSD_PG_NOTIFY:
+    return handle_pg_notify(conn, boost::static_pointer_cast<MOSDPGNotify>(m));
   default:
     return seastar::now();
   }
@@ -690,3 +697,62 @@ void OSD::update_heartbeat_peers()
   }
   heartbeat->update_peers(whoami);
 }
+
+seastar::future<> OSD::handle_pg_notify(ceph::net::ConnectionRef conn,
+                                        Ref<MOSDPGNotify> m)
+{
+  // assuming all pgs reside in a single shard
+  // see OSD::dequeue_peering_evt()
+  const int from = m->get_source().num();
+  return seastar::parallel_for_each(m->get_pg_list(),
+    [from, this](pair<pg_notify_t, PastIntervals> p) {
+      auto& [pg_notify, past_intervals] = p;
+      spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+      MNotifyRec notify{pgid,
+                        pg_shard_t{from, pg_notify.from},
+                        pg_notify,
+                        0, // the features is not used
+                        past_intervals};
+      auto create_info = new PGCreateInfo{pgid,
+                                          pg_notify.query_epoch,
+                                          pg_notify.info.history,
+                                          past_intervals,
+                                          false};
+      auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
+                                                  pg_notify.query_epoch,
+                                                  notify,
+                                                  true, // requires_pg
+                                                  create_info);
+      return do_peering_event(pgid, std::move(evt));
+  });
+}
+
+seastar::future<>
+OSD::do_peering_event(spg_t pgid,
+                      std::unique_ptr<PGPeeringEvent> evt)
+{
+  if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+    return advance_pg_to(pg->second, osdmap->get_epoch()).then(
+      [pg, evt=std::move(evt)]() mutable {
+        return pg->second->do_peering_event(std::move(evt));
+      });
+  } else {
+    // todo: handle_pg_query_nopg()
+    return seastar::now();
+  }
+}
+
+seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
+{
+  auto from = pg->get_osdmap_epoch();
+  // todo: merge/split support
+  return seastar::do_for_each(boost::make_counting_iterator(from + 1),
+                              boost::make_counting_iterator(to + 1),
+    [pg, this](epoch_t next_epoch) {
+      return get_map(next_epoch).then([pg, this] (cached_map_t&& next_map) {
+        return pg->handle_advance_map(next_map);
+      }).then([pg, this] {
+        return pg->handle_activate_map();
+      });
+    });
+}
index 1bf351b9880779de0ad93406c0bd08f1c1c4d058..74b80176f86981fabddb0b6123932596d9ecd802 100644 (file)
 #include "crimson/osd/osdmap_service.h"
 #include "crimson/osd/state.h"
 
-#include "osd/OSDMap.h"
+#include "osd/osd_types.h"
 
 class MOSDMap;
 class OSDMap;
 class OSDMeta;
 class PG;
+class PGPeeringEvent;
 class Heartbeat;
 
 namespace ceph::mon {
@@ -125,9 +126,17 @@ private:
 
   seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn,
                                    Ref<MOSDMap> m);
+  seastar::future<> handle_pg_notify(ceph::net::ConnectionRef conn,
+                                    Ref<MOSDPGNotify> m);
+
   seastar::future<> committed_osd_maps(version_t first,
                                        version_t last,
                                        Ref<MOSDMap> m);
+
+  seastar::future<> do_peering_event(spg_t pgid,
+                                    std::unique_ptr<PGPeeringEvent> evt);
+  seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
+
   bool should_restart() const;
   seastar::future<> restart();
   seastar::future<> shutdown();
index 8d3f973f44e7b14c2eb4d03b98d497f1a4b3520b..fb60176e7229750f922ee64cedd4dbfb53040672 100644 (file)
@@ -1,5 +1,6 @@
 #include "pg.h"
 
+#include "messages/MOSDPGLog.h"
 #include "osd/OSDMap.h"
 
 #include "crimson/os/cyan_store.h"
@@ -158,3 +159,21 @@ void PG::update_last_peering_reset()
 {
   last_peering_reset = get_osdmap_epoch();
 }
+
+seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
+{
+  // todo
+  return seastar::now();
+}
+
+seastar::future<> PG::handle_advance_map(cached_map_t next_map)
+{
+  // todo
+  return seastar::now();
+}
+
+seastar::future<> PG::handle_activate_map()
+{
+  // todo
+  return seastar::now();
+}
index 413f0e93d511f231e772b5cd659382a5d7372744..601f876806d3aa327a7fb1a325dc3b8035cdc80c 100644 (file)
@@ -12,6 +12,7 @@
 
 template<typename T> using Ref = boost::intrusive_ptr<T>;
 class OSDMap;
+class PGPeeringEvent;
 
 namespace ceph::net {
   class Messenger;
@@ -52,6 +53,11 @@ public:
 
   seastar::future<> read_state(ceph::os::CyanStore* store);
 
+  // peering/recovery
+  seastar::future<> do_peering_event(std::unique_ptr<PGPeeringEvent> evt);
+  seastar::future<> handle_advance_map(cached_map_t next_map);
+  seastar::future<> handle_activate_map();
+
 private:
   void update_primary_state(const std::vector<int>& new_up,
                            int new_up_primary,