]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: fast dispatch peering events (part 2)
authorSage Weil <sage@redhat.com>
Tue, 2 Jan 2018 21:36:52 +0000 (15:36 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Apr 2018 13:26:47 +0000 (08:26 -0500)
This actually puts the remaining peering events into fast dispatch.  The
only remaining event is the pg create from the mon.

Signed-off-by: Sage Weil <sage@redhat.com>
src/messages/MOSDPGLog.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PGPeeringEvent.h

index 37ed5089cc91156754e12086300cb6121e3b05dc..821fb063c6b0d039398193678c420ddcd07d4964 100644 (file)
@@ -16,9 +16,9 @@
 #ifndef CEPH_MOSDPGLOG_H
 #define CEPH_MOSDPGLOG_H
 
-#include "msg/Message.h"
+#include "messages/MOSDPeeringOp.h"
 
-class MOSDPGLog : public Message {
+class MOSDPGLog : public MOSDPeeringOp {
 
   static const int HEAD_VERSION = 5;
   static const int COMPAT_VERSION = 5;
@@ -42,11 +42,34 @@ public:
   spg_t get_pgid() const { return spg_t(info.pgid.pgid, to); }
   epoch_t get_query_epoch() const { return query_epoch; }
 
-  MOSDPGLog() : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) { 
+  spg_t get_spg() const override {
+    return spg_t(info.pgid.pgid, to);
+  }
+  epoch_t get_map_epoch() const override {
+    return epoch;
+  }
+  epoch_t get_min_epoch() const override {
+    return query_epoch;
+  }
+
+  PGPeeringEvent *get_event() override {
+    return new PGPeeringEvent(
+      epoch, query_epoch,
+      MLogRec(pg_shard_t(get_source().num(), from),
+             this),
+      true,
+      new PGCreateInfo(
+       get_spg(),
+       epoch,
+       info.history,
+       past_intervals));
+  }
+
+  MOSDPGLog() : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) {
     set_priority(CEPH_MSG_PRIO_HIGH); 
   }
   MOSDPGLog(shard_id_t to, shard_id_t from, version_t mv, pg_info_t& i)
-    : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
+    : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
       epoch(mv), query_epoch(mv),
       to(to), from(from),
       info(i)  {
@@ -54,7 +77,7 @@ public:
   }
   MOSDPGLog(shard_id_t to, shard_id_t from,
            version_t mv, pg_info_t& i, epoch_t query_epoch)
-    : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
+    : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
       epoch(mv), query_epoch(query_epoch),
       to(to), from(from),
       info(i)  {
@@ -66,13 +89,11 @@ private:
 
 public:
   const char *get_type_name() const override { return "PGlog"; }
-  void print(ostream& out) const override {
+  void inner_print(ostream& out) const override {
     // NOTE: log is not const, but operator<< doesn't touch fields
     // swapped out by OSD code.
-    out << "pg_log(" << info.pgid << " epoch " << epoch
-       << " log " << log
-       << " pi " << past_intervals
-       << " query_epoch " << query_epoch << ")";
+    out << "log " << log
+       << " pi " << past_intervals;
   }
 
   void encode_payload(uint64_t features) override {
index cb346bff64a9372b9b25a9c2a1dd385243ede063..be039ef55a97cf69e054ad84a1929aa31002693f 100644 (file)
@@ -6623,6 +6623,17 @@ void OSD::ms_fast_dispatch(Message *m)
 
   // peering event?
   switch (m->get_type()) {
+  case MSG_OSD_PG_QUERY:
+    return handle_fast_pg_query(static_cast<MOSDPGQuery*>(m));
+  case MSG_OSD_PG_NOTIFY:
+    return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
+  case MSG_OSD_PG_INFO:
+    return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));
+  case MSG_OSD_PG_REMOVE:
+    return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));
+
+    // these are single-pg messages that handle themselves
+  case MSG_OSD_PG_LOG:
   case MSG_OSD_PG_TRIM:
   case MSG_OSD_BACKFILL_RESERVE:
   case MSG_OSD_RECOVERY_RESERVE:
@@ -6821,21 +6832,6 @@ void OSD::dispatch_op(OpRequestRef op)
   case MSG_OSD_PG_CREATE:
     handle_pg_create(op);
     break;
-  case MSG_OSD_PG_NOTIFY:
-    handle_pg_notify(op);
-    break;
-  case MSG_OSD_PG_QUERY:
-    handle_pg_query(op);
-    break;
-  case MSG_OSD_PG_LOG:
-    handle_pg_log(op);
-    break;
-  case MSG_OSD_PG_REMOVE:
-    handle_pg_remove(op);
-    break;
-  case MSG_OSD_PG_INFO:
-    handle_pg_info(op);
-    break;
   }
 }
 
@@ -8577,98 +8573,84 @@ void OSD::do_infos(map<int,
 }
 
 
-/** PGNotify
- * from non-primary to primary
- * includes pg_info_t.
- * NOTE: called with opqueue active.
- */
-void OSD::handle_pg_notify(OpRequestRef op)
+void OSD::handle_fast_pg_query(MOSDPGQuery *m)
 {
-  const MOSDPGNotify *m = static_cast<const MOSDPGNotify*>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_NOTIFY);
-
-  dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
+  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
   int from = m->get_source().num();
-
-  if (!require_osd_peer(op->get_req()))
-    return;
-
-  if (!require_same_or_newer_map(op, m->get_epoch(), false))
-    return;
-
-  op->mark_started();
-
-  for (auto it = m->get_pg_list().begin();
-       it != m->get_pg_list().end();
-       ++it) {
-    handle_pg_peering_evt(
-      spg_t(it->first.info.pgid.pgid, it->first.to),
-      it->first.info.history, it->second,
-      it->first.query_epoch,
+  for (auto& p : m->pg_list) {
+    enqueue_peering_evt(
+      p.first,
       PGPeeringEventRef(
-       new PGPeeringEvent(
-         it->first.epoch_sent, it->first.query_epoch,
-         MNotifyRec(pg_shard_t(from, it->first.from), it->first,
-          op->get_req()->get_connection()->get_features())))
+       std::make_shared<PGPeeringEvent>(
+         p.second.epoch_sent, p.second.epoch_sent,
+         MQuery(
+           p.first,
+           pg_shard_t(from, p.second.from),
+           p.second,
+           p.second.epoch_sent),
+         false))
       );
   }
 }
 
-void OSD::handle_pg_log(OpRequestRef op)
+void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
 {
-  MOSDPGLog *m = static_cast<MOSDPGLog*>(op->get_nonconst_req());
-  assert(m->get_type() == MSG_OSD_PG_LOG);
-  dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
-
-  if (!require_osd_peer(op->get_req()))
-    return;
-
+  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
   int from = m->get_source().num();
-  if (!require_same_or_newer_map(op, m->get_epoch(), false))
-    return;
-
-  op->mark_started();
-  handle_pg_peering_evt(
-    spg_t(m->get_pgid().pgid, m->to),
-    m->info.history, m->past_intervals, m->get_epoch(),
-    PGPeeringEventRef(
-      new PGPeeringEvent(
-       m->get_epoch(), m->get_query_epoch(),
-       MLogRec(pg_shard_t(from, m->from), m)))
-    );
-}
-
-void OSD::handle_pg_info(OpRequestRef op)
-{
-  const MOSDPGInfo *m = static_cast<const MOSDPGInfo *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_INFO);
-  dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
-
-  if (!require_osd_peer(op->get_req()))
-    return;
-
+  for (auto& p : m->get_pg_list()) {
+    spg_t pgid(p.first.info.pgid.pgid, p.first.to);
+    enqueue_peering_evt(
+      pgid,
+      PGPeeringEventRef(
+       std::make_shared<PGPeeringEvent>(
+         p.first.epoch_sent,
+         p.first.query_epoch,
+         MNotifyRec(
+           pgid, pg_shard_t(from, p.first.from),
+           p.first,
+           m->get_connection()->get_features(),
+           p.second),
+         true,
+         new PGCreateInfo(
+           pgid,
+           p.first.query_epoch,
+           p.first.info.history,
+           p.second)
+         )));
+  }
+}
+
+void OSD::handle_fast_pg_info(MOSDPGInfo* m)
+{
+  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
   int from = m->get_source().num();
-  if (!require_same_or_newer_map(op, m->get_epoch(), false))
-    return;
-
-  op->mark_started();
-
-  for (auto p = m->pg_list.begin();
-       p != m->pg_list.end();
-       ++p) {
-    handle_pg_peering_evt(
-      spg_t(p->first.info.pgid.pgid, p->first.to),
-      p->first.info.history, p->second, p->first.epoch_sent,
+  for (auto& p : m->pg_list) {
+    enqueue_peering_evt(
+      spg_t(p.first.info.pgid.pgid, p.first.to),
       PGPeeringEventRef(
-       new PGPeeringEvent(
-         p->first.epoch_sent, p->first.query_epoch,
+       std::make_shared<PGPeeringEvent>(
+         p.first.epoch_sent, p.first.query_epoch,
          MInfoRec(
-           pg_shard_t(
-             from, p->first.from), p->first.info, p->first.epoch_sent)))
+           pg_shard_t(from, p.first.from),
+           p.first.info,
+           p.first.epoch_sent)))
       );
   }
 }
 
+void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
+{
+  dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+  for (auto& pgid : m->pg_list) {
+    enqueue_peering_evt(
+      pgid,
+      PGPeeringEventRef(
+       std::make_shared<PGPeeringEvent>(
+         m->get_epoch(), m->get_epoch(),
+         PG::DeleteStart())));
+  }
+}
+
 void OSD::handle_force_recovery(Message *m)
 {
   MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
@@ -8697,139 +8679,64 @@ void OSD::handle_force_recovery(Message *m)
   msg->put();
 }
 
-/** PGQuery
- * from primary to replica | stray
- * NOTE: called with opqueue active.
- */
-void OSD::handle_pg_query(OpRequestRef op)
+void OSD::handle_pg_query_nopg(const MQuery& q)
 {
-  assert(osd_lock.is_locked());
-
-  const MOSDPGQuery *m = static_cast<const MOSDPGQuery*>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_QUERY);
+  spg_t pgid = q.pgid;
+  dout(10) << __func__ << " " << pgid << dendl;
 
-  if (!require_osd_peer(op->get_req()))
+  OSDMapRef osdmap = get_osdmap();
+  if (!osdmap->have_pg_pool(pgid.pool()))
     return;
 
-  dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << dendl;
-  int from = m->get_source().num();
+  // get active crush mapping
+  int up_primary, acting_primary;
+  vector<int> up, acting;
+  osdmap->pg_to_up_acting_osds(
+    pgid.pgid, &up, &up_primary, &acting, &acting_primary);
 
-  if (!require_same_or_newer_map(op, m->get_epoch(), false))
+  // same primary?
+  pg_history_t history = q.query.history;
+  bool valid_history = project_pg_history(
+    pgid, history, q.query.epoch_sent,
+    up, up_primary, acting, acting_primary);
+
+  if (!valid_history ||
+      q.query.epoch_sent < history.same_interval_since) {
+    dout(10) << " pg " << pgid << " dne, and pg has changed in "
+            << history.same_interval_since
+            << " (msg from " << q.query.epoch_sent << ")" << dendl;
     return;
+  }
 
-  op->mark_started();
-
-  map< int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
-
-  for (auto it = m->pg_list.begin();
-       it != m->pg_list.end();
-       ++it) {
-    spg_t pgid = it->first;
-
-    if (service.splitting(pgid)) {
-      peering_wait_for_split[pgid].push_back(
-       PGPeeringEventRef(
-         new PGPeeringEvent(
-           it->second.epoch_sent, it->second.epoch_sent,
-           MQuery(pg_shard_t(from, it->second.from),
-                      it->second, it->second.epoch_sent))));
-      continue;
-    }
-
-    {
-      RWLock::RLocker l(pg_map_lock);
-      if (pg_map.count(pgid)) {
-        PG *pg = 0;
-        pg = _lookup_lock_pg_with_map_lock_held(pgid);
-        pg->queue_query(
-            it->second.epoch_sent, it->second.epoch_sent,
-            pg_shard_t(from, it->second.from), it->second);
-        pg->unlock();
-        continue;
-      }
-    }
-
-    if (!osdmap->have_pg_pool(pgid.pool()))
-      continue;
-
-    // get active crush mapping
-    int up_primary, acting_primary;
-    vector<int> up, acting;
-    osdmap->pg_to_up_acting_osds(
-      pgid.pgid, &up, &up_primary, &acting, &acting_primary);
-
-    // same primary?
-    pg_history_t history = it->second.history;
-    bool valid_history = project_pg_history(
-      pgid, history, it->second.epoch_sent,
-      up, up_primary, acting, acting_primary);
-
-    if (!valid_history ||
-        it->second.epoch_sent < history.same_interval_since) {
-      dout(10) << " pg " << pgid << " dne, and pg has changed in "
-              << history.same_interval_since
-              << " (msg from " << it->second.epoch_sent << ")" << dendl;
-      continue;
-    }
-
-    dout(10) << " pg " << pgid << " dne" << dendl;
-    pg_info_t empty(spg_t(pgid.pgid, it->second.to));
-    if (it->second.type == pg_query_t::LOG ||
-       it->second.type == pg_query_t::FULLLOG) {
-      ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
-      if (con) {
-       MOSDPGLog *mlog = new MOSDPGLog(
-         it->second.from, it->second.to,
-         osdmap->get_epoch(), empty,
-         it->second.epoch_sent);
-       service.share_map_peer(from, con.get(), osdmap);
-       con->send_message(mlog);
-      }
+  dout(10) << " pg " << pgid << " dne" << dendl;
+  pg_info_t empty(spg_t(pgid.pgid, q.query.to));
+  ConnectionRef con = service.get_con_osd_cluster(q.from.osd, osdmap->get_epoch());
+  if (con) {
+    Message *m;
+    if (q.query.type == pg_query_t::LOG ||
+       q.query.type == pg_query_t::FULLLOG) {
+      m = new MOSDPGLog(
+       q.query.from, q.query.to,
+       osdmap->get_epoch(), empty,
+       q.query.epoch_sent);
     } else {
-      notify_list[from].push_back(
+      vector<pair<pg_notify_t,PastIntervals>> ls;
+      ls.push_back(
        make_pair(
          pg_notify_t(
-           it->second.from, it->second.to,
-           it->second.epoch_sent,
+           q.query.from, q.query.to,
+           q.query.epoch_sent,
            osdmap->get_epoch(),
            empty),
          PastIntervals()));
+      m = new MOSDPGNotify(osdmap->get_epoch(), ls);
     }
+    service.share_map_peer(q.from.osd, con.get(), osdmap);
+    con->send_message(m);
   }
-  do_notifies(notify_list, osdmap);
 }
 
 
-void OSD::handle_pg_remove(OpRequestRef op)
-{
-  const MOSDPGRemove *m = static_cast<const MOSDPGRemove *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_REMOVE);
-  assert(osd_lock.is_locked());
-
-  if (!require_osd_peer(op->get_req()))
-    return;
-
-  dout(7) << "handle_pg_remove from " << m->get_source() << " on "
-         << m->pg_list.size() << " pgs" << dendl;
-
-  if (!require_same_or_newer_map(op, m->get_epoch(), false))
-    return;
-
-  op->mark_started();
-
-  for (auto it = m->pg_list.begin();
-       it != m->pg_list.end();
-       ++it) {
-    spg_t pgid = *it;
-    enqueue_peering_evt(
-      pgid,
-      PGPeeringEventRef(
-       new PGPeeringEvent(
-         m->get_epoch(), m->get_epoch(),
-         PG::DeleteStart())));
-  }
-}
-
 // =========================================================
 // RECOVERY
 
@@ -9183,7 +9090,7 @@ void OSD::dequeue_delete(
   dequeue_peering_evt(
     pg,
     PGPeeringEventRef(
-      new PGPeeringEvent(
+      std::make_shared<PGPeeringEvent>(
        e, e,
        PG::DeleteSome())),
     handle);
index 544a7eaecdddf83461e4ec5fa8a509878a9bc322..a9b7ecc5548f332b3bc723e592590a29a9461f48 100644 (file)
@@ -248,6 +248,11 @@ class LogChannel;
 class CephContext;
 class MOSDOp;
 
+class MOSDPGQuery;
+class MOSDPGNotify;
+class MOSDPGInfo;
+class MOSDPGRemove;
+
 class OSD;
 
 class OSDService {
@@ -2029,15 +2034,16 @@ protected:
   bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
                                 bool is_fast_dispatch);
 
-  void handle_pg_query(OpRequestRef op);
-  void handle_pg_notify(OpRequestRef op);
-  void handle_pg_log(OpRequestRef op);
-  void handle_pg_info(OpRequestRef op);
+  void handle_fast_pg_query(MOSDPGQuery *m);
+  void handle_pg_query_nopg(const MQuery& q);
+  void handle_fast_pg_notify(MOSDPGNotify *m);
+  void handle_pg_notify_nopg(const MNotifyRec& q);
+  void handle_fast_pg_info(MOSDPGInfo *m);
+  void handle_fast_pg_remove(MOSDPGRemove *m);
 
   PGRef handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info);
-  void handle_force_recovery(Message *m);
 
-  void handle_pg_remove(OpRequestRef op);
+  void handle_force_recovery(Message *m);
 
   // -- commands --
   struct Command {
@@ -2117,7 +2123,12 @@ private:
     switch (m->get_type()) {
     case CEPH_MSG_OSD_OP:
     case CEPH_MSG_OSD_BACKOFF:
+    case MSG_OSD_PG_QUERY:
+    case MSG_OSD_PG_INFO:
+    case MSG_OSD_PG_NOTIFY:
+    case MSG_OSD_PG_LOG:
     case MSG_OSD_PG_TRIM:
+    case MSG_OSD_PG_REMOVE:
     case MSG_OSD_BACKFILL_RESERVE:
     case MSG_OSD_RECOVERY_RESERVE:
     case MSG_OSD_REPOP:
index daef636424e07298773b765c731d4d4bfdd5968f..a0f634075f580d0f31ccfbd1c90e365938c63479 100644 (file)
@@ -6357,8 +6357,10 @@ void PG::queue_query(epoch_t msg_epoch,
 {
   dout(10) << "handle_query " << q << " from replica " << from << dendl;
   queue_peering_event(
-    PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
-                                        MQuery(from, q, query_epoch))));
+    PGPeeringEventRef(
+      std::make_shared<PGPeeringEvent>(
+       msg_epoch, query_epoch,
+       MQuery(info.pgid, from, q, query_epoch))));
 }
 
 void PG::find_unfound(epoch_t queued, RecoveryCtx *rctx)
index 302e3028f2a1e2bf2716aae4a12f0c13f67e4b66..0248d160c808e7f28308bb7b5cc3884b9d4e7d56 100644 (file)
@@ -88,25 +88,30 @@ struct MLogRec : boost::statechart::event< MLogRec > {
 };
 
 struct MNotifyRec : boost::statechart::event< MNotifyRec > {
+  spg_t pgid;
   pg_shard_t from;
   pg_notify_t notify;
   uint64_t features;
-  MNotifyRec(pg_shard_t from, const pg_notify_t &notify, uint64_t f) :
-    from(from), notify(notify), features(f) {}
+  PastIntervals past_intervals;
+  MNotifyRec(spg_t p, pg_shard_t from, const pg_notify_t &notify, uint64_t f,
+            const PastIntervals& pi)
+    : pgid(p), from(from), notify(notify), features(f), past_intervals(pi) {}
   void print(std::ostream *out) const {
-    *out << "MNotifyRec from " << from << " notify: " << notify
-        << " features: 0x" << hex << features << dec;
+    *out << "MNotifyRec " << pgid << " from " << from << " notify: " << notify
+        << " features: 0x" << hex << features << dec
+        << " " << past_intervals;
   }
 };
 
 struct MQuery : boost::statechart::event< MQuery > {
+  spg_t pgid;
   pg_shard_t from;
   pg_query_t query;
   epoch_t query_epoch;
-  MQuery(pg_shard_t from, const pg_query_t &query, epoch_t query_epoch):
-    from(from), query(query), query_epoch(query_epoch) {}
+  MQuery(spg_t p, pg_shard_t from, const pg_query_t &query, epoch_t query_epoch)
+    : pgid(p), from(from), query(query), query_epoch(query_epoch) {}
   void print(std::ostream *out) const {
-    *out << "MQuery from " << from
+    *out << "MQuery " << pgid << " from " << from
         << " query_epoch " << query_epoch
         << " query: " << query;
   }