]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PeeringState: send notifies via message_map (not notify_list)
authorSage Weil <sage@redhat.com>
Thu, 22 Aug 2019 03:27:45 +0000 (22:27 -0500)
committerSage Weil <sage@redhat.com>
Mon, 9 Sep 2019 16:22:11 +0000 (11:22 -0500)
Queue up a separate message for each notify, and drop the old notify_list
plumbing.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PeeringState.cc
src/osd/PeeringState.h

index 579a232ea78ab1e372aab85149e33bce6ff0e319..1cbbdd619917d4d13a4aea72c574eb45e4431394 100644 (file)
@@ -9262,7 +9262,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
   } else if (!is_active()) {
     dout(20) << __func__ << " not active" << dendl;
   } else {
-    do_notifies(ctx.notify_list, curmap);
     do_queries(ctx.query_map, curmap);
     do_infos(ctx.info_map, curmap);
 
@@ -9294,36 +9293,6 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
   }
 }
 
-/** do_notifies
- * Send an MOSDPGNotify to a primary, with a list of PGs that I have
- * content for, and they are primary for.
- */
-
-void OSD::do_notifies(
-  map<int,vector<pg_notify_t>>& notify_list,
-  OSDMapRef curmap)
-{
-  for (auto& [osd, notifies] : notify_list) {
-    if (!curmap->is_up(osd)) {
-      dout(20) << __func__ << " skipping down osd." << osd << dendl;
-      continue;
-    }
-    ConnectionRef con = service.get_con_osd_cluster(
-      osd, curmap->get_epoch());
-    if (!con) {
-      dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
-      continue;
-    }
-    service.maybe_share_map(con.get(), curmap);
-    dout(7) << __func__ << " osd." << osd
-           << " on " << notifies.size() << " PGs" << dendl;
-    MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
-                                      std::move(notifies));
-    con->send_message(m);
-  }
-}
-
-
 /** do_queries
  * send out pending queries for info | summaries
  */
index c7a2750cbfee5bf67d673ad96ab0449c8f18fc34..4fa7f7db8611ba9aa09bc08ade3ca08bea1d85b3 100644 (file)
@@ -1916,8 +1916,6 @@ protected:
   void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
                         ThreadPool::TPHandle *handle = NULL);
   void discard_context(PeeringCtx &ctx);
-  void do_notifies(map<int,vector<pg_notify_t>>& notify_list,
-                  OSDMapRef map);
   void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
                  OSDMapRef map);
   void do_infos(map<int,vector<pg_notify_t>>& info_map,
index cd4c385ac6608108984b3ea8e8190f73282ab123..1df535093ac4b5410a67b9eceb66dbcfafd9a69d 100644 (file)
@@ -13,6 +13,7 @@
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGTrim.h"
 #include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGNotify.h"
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
 BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
   : query_map(std::move(ctx.query_map)),
     info_map(std::move(ctx.info_map)),
-    notify_list(std::move(ctx.notify_list)),
     message_map(std::move(ctx.message_map))
 {
   ctx.query_map.clear();
   ctx.info_map.clear();
-  ctx.notify_list.clear();
   ctx.message_map.clear();
 }
 
+void PeeringCtxWrapper::send_notify(int to, const pg_notify_t &n)
+{
+  vector<pg_notify_t> notifies;
+  notifies.push_back(n);
+  message_map[to].push_back(
+    new MOSDPGNotify(n.epoch_sent, std::move(notifies))
+    );
+}
+
 void PGPool::update(CephContext *cct, OSDMapRef map)
 {
   const pg_pool_t *pi = map->get_pg_pool(id);
@@ -2566,7 +2574,7 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
     update_history(query.query.history);
     fulfill_info(query.from, query.query, notify_info);
     rctx.send_notify(
-      notify_info.first,
+      notify_info.first.osd,
       pg_notify_t(
        notify_info.first.shard, pg_whoami.shard,
        query.query_epoch,
@@ -4104,7 +4112,7 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&)
   DECLARE_LOCALS;
   if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
     context< PeeringMachine >().send_notify(
-      ps->get_primary(),
+      ps->get_primary().osd,
       pg_notify_t(
        ps->get_primary().shard, ps->pg_whoami.shard,
        ps->get_osdmap_epoch(),
@@ -5690,7 +5698,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&)
   DECLARE_LOCALS;
   if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
     context< PeeringMachine >().send_notify(
-      ps->get_primary(),
+      ps->get_primary().osd,
       pg_notify_t(
        ps->get_primary().shard, ps->pg_whoami.shard,
        ps->get_osdmap_epoch(),
@@ -5810,7 +5818,7 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&)
   DECLARE_LOCALS;
   if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
     context< PeeringMachine >().send_notify(
-      ps->get_primary(),
+      ps->get_primary().osd,
       pg_notify_t(
        ps->get_primary().shard, ps->pg_whoami.shard,
        ps->get_osdmap_epoch(),
index 56a3835ccff8f12483f23a2f5fe0208fb432901d..f60f9d66bf5e854dc7b05b0b736a82d624daae3b 100644 (file)
@@ -53,7 +53,6 @@ class PeeringCtx;
 struct BufferedRecoveryMessages {
   map<int, map<spg_t, pg_query_t> > query_map;
   map<int, vector<pg_notify_t>> info_map;
-  map<int, vector<pg_notify_t>> notify_list;
   map<int, vector<MessageRef>> message_map;
 
   BufferedRecoveryMessages() = default;
@@ -71,11 +70,6 @@ struct BufferedRecoveryMessages {
       ovec.reserve(ovec.size() + ivec.size());
       ovec.insert(ovec.end(), ivec.begin(), ivec.end());
     }
-    for (auto &[target, nlist] : m.notify_list) {
-      auto &ovec = notify_list[target];
-      ovec.reserve(ovec.size() + nlist.size());
-      ovec.insert(ovec.end(), nlist.begin(), nlist.end());
-    }
     for (auto &[target, ls] : m.message_map) {
       auto &ovec = message_map[target];
       // put buffered messages in front
@@ -197,6 +191,40 @@ struct PeeringCtx : BufferedRecoveryMessages {
   }
 };
 
+/**
+ * Wraps PeeringCtx to hide the difference between buffering messages to
+ * be sent after flush or immediately.
+ */
+struct PeeringCtxWrapper {
+  utime_t start_time;
+  map<int, map<spg_t, pg_query_t> > &query_map;
+  map<int, vector<pg_notify_t>> &info_map;
+  map<int, vector<MessageRef>> &message_map;
+  ObjectStore::Transaction &transaction;
+  HBHandle * const handle = nullptr;
+
+  PeeringCtxWrapper(PeeringCtx &wrapped) :
+    query_map(wrapped.query_map),
+    info_map(wrapped.info_map),
+    message_map(wrapped.message_map),
+    transaction(wrapped.transaction),
+    handle(wrapped.handle) {}
+
+  PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
+    : query_map(buf.query_map),
+      info_map(buf.info_map),
+      message_map(buf.message_map),
+      transaction(wrapped.transaction),
+      handle(wrapped.handle) {}
+
+  PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
+
+  void send_osd_message(int target, Message *m) {
+    message_map[target].push_back(m);
+  }
+  void send_notify(int to, const pg_notify_t &n);
+};
+
   /* Encapsulates PG recovery process */
 class PeeringState : public MissingLoc::MappingInfo {
 public:
@@ -372,47 +400,6 @@ public:
     virtual ~PeeringListener() {}
   };
 
-private:
-  /**
-   * Wraps PeeringCtx to hide the difference between buffering messages to
-   * be sent after flush or immediately.
-   */
-  struct PeeringCtxWrapper {
-    utime_t start_time;
-    map<int, map<spg_t, pg_query_t> > &query_map;
-    map<int, vector<pg_notify_t>> &info_map;
-    map<int, vector<pg_notify_t>> &notify_list;
-    map<int, vector<MessageRef>> &message_map;
-    ObjectStore::Transaction &transaction;
-    HBHandle * const handle = nullptr;
-
-    PeeringCtxWrapper(PeeringCtx &wrapped) :
-      query_map(wrapped.query_map),
-      info_map(wrapped.info_map),
-      notify_list(wrapped.notify_list),
-      message_map(wrapped.message_map),
-      transaction(wrapped.transaction),
-      handle(wrapped.handle) {}
-
-    PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
-      : query_map(buf.query_map),
-       info_map(buf.info_map),
-       notify_list(buf.notify_list),
-       message_map(buf.message_map),
-       transaction(wrapped.transaction),
-        handle(wrapped.handle) {}
-
-    PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
-
-    void send_osd_message(int target, Message *m) {
-      message_map[target].push_back(m);
-    }
-    void send_notify(pg_shard_t to, const pg_notify_t &n) {
-      notify_list[to.osd].emplace_back(n);
-    }
-  };
-public:
-
   struct QueryState : boost::statechart::event< QueryState > {
     Formatter *f;
     explicit QueryState(Formatter *f) : f(f) {}
@@ -594,7 +581,7 @@ public:
       return *(state->rctx);
     }
 
-    void send_notify(pg_shard_t to, const pg_notify_t &n) {
+    void send_notify(int to, const pg_notify_t &n) {
       ceph_assert(state->rctx);
       state->rctx->send_notify(to, n);
     }