]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PeeringState: add message_map to PeeringCtx/BufferedRecoveryMessages
authorSage Weil <sage@redhat.com>
Thu, 22 Aug 2019 03:11:34 +0000 (22:11 -0500)
committerSage Weil <sage@redhat.com>
Mon, 9 Sep 2019 16:22:11 +0000 (11:22 -0500)
Have a generalized map of messages that are queued to be sent out.  Send
them when we dispatch the PeeringCtx.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/PeeringState.cc
src/osd/PeeringState.h

index cb14e8d1b37eb621dad725bed7e30a9a38dfcef1..579a232ea78ab1e372aab85149e33bce6ff0e319 100644 (file)
@@ -9265,6 +9265,25 @@ void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
     do_notifies(ctx.notify_list, curmap);
     do_queries(ctx.query_map, curmap);
     do_infos(ctx.info_map, curmap);
+
+    for (auto& [osd, ls] : ctx.message_map) {
+      if (!curmap->is_up(osd)) {
+       dout(20) << __func__ << " skipping down osd." << osd << dendl;
+       continue;
+      }
+      ConnectionRef con = service.get_con_osd_cluster(
+       osd, curmap->get_epoch());
+      if (!con) {
+       dout(20) << __func__ << " skipping osd." << osd << " (NULL con)"
+                << dendl;
+       continue;
+      }
+      service.maybe_share_map(con.get(), curmap);
+      for (auto m : ls) {
+       con->send_message2(m);
+      }
+      ls.clear();
+    }
   }
   if ((!ctx.transaction.empty() || ctx.transaction.has_contexts()) && pg) {
     int tr = store->queue_transaction(
index 77c6b20d0b3a1c30e90f0484f75e2f7cb3d7debe..cd4c385ac6608108984b3ea8e8190f73282ab123 100644 (file)
 BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
   : query_map(std::move(ctx.query_map)),
     info_map(std::move(ctx.info_map)),
-    notify_list(std::move(ctx.notify_list))
+    notify_list(std::move(ctx.notify_list)),
+    message_map(std::move(ctx.message_map))
 {
   ctx.query_map.clear();
   ctx.info_map.clear();
   ctx.notify_list.clear();
+  ctx.message_map.clear();
 }
 
 void PGPool::update(CephContext *cct, OSDMapRef map)
index ed95e82c08aa5695f667591a7e399499dd50c1ac..56a3835ccff8f12483f23a2f5fe0208fb432901d 100644 (file)
@@ -54,6 +54,7 @@ struct BufferedRecoveryMessages {
   map<int, map<spg_t, pg_query_t> > query_map;
   map<int, vector<pg_notify_t>> info_map;
   map<int, vector<pg_notify_t>> notify_list;
+  map<int, vector<MessageRef>> message_map;
 
   BufferedRecoveryMessages() = default;
   BufferedRecoveryMessages(PeeringCtx &);
@@ -75,6 +76,14 @@ struct BufferedRecoveryMessages {
       ovec.reserve(ovec.size() + nlist.size());
       ovec.insert(ovec.end(), nlist.begin(), nlist.end());
     }
+    for (auto &[target, ls] : m.message_map) {
+      auto &ovec = message_map[target];
+      // put buffered messages in front
+      ls.reserve(ls.size() + ovec.size());
+      ls.insert(ls.end(), ovec.begin(), ovec.end());
+      ovec.clear();
+      ovec.swap(ls);
+    }
   }
 };
 
@@ -373,6 +382,7 @@ private:
     map<int, map<spg_t, pg_query_t> > &query_map;
     map<int, vector<pg_notify_t>> &info_map;
     map<int, vector<pg_notify_t>> &notify_list;
+    map<int, vector<MessageRef>> &message_map;
     ObjectStore::Transaction &transaction;
     HBHandle * const handle = nullptr;
 
@@ -380,6 +390,7 @@ private:
       query_map(wrapped.query_map),
       info_map(wrapped.info_map),
       notify_list(wrapped.notify_list),
+      message_map(wrapped.message_map),
       transaction(wrapped.transaction),
       handle(wrapped.handle) {}
 
@@ -387,11 +398,15 @@ private:
       : query_map(buf.query_map),
        info_map(buf.info_map),
        notify_list(buf.notify_list),
+       message_map(buf.message_map),
        transaction(wrapped.transaction),
         handle(wrapped.handle) {}
 
     PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
 
+    void send_osd_message(int target, Message *m) {
+      message_map[target].push_back(m);
+    }
     void send_notify(pg_shard_t to, const pg_notify_t &n) {
       notify_list[to.osd].emplace_back(n);
     }