]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: move peering events into primary work queue
authorSage Weil <sage@redhat.com>
Sun, 5 Nov 2017 16:53:17 +0000 (10:53 -0600)
committerSage Weil <sage@redhat.com>
Mon, 4 Dec 2017 18:45:16 +0000 (12:45 -0600)
For the moment we leave the old infratructure in place too; that will be
cleaned up next.

Two key differences:

- each pg event is processed in isolation, which means MOSDPGNotify etc
messages won't be batched.  This will mean more messages for small clusters
and no change for large clusters.
- the map processing for PGs will be done in completion by a single event.
There is now a wait in handle_osd_map to ensure that all PGs are processing
maps that overlap with the map cache.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpQueueItem.cc
src/osd/OpQueueItem.h
src/osd/PG.cc
src/osd/PG.h

index 3476122f431f8064d8433281458ad459a031d888..6a89f82a290e1ce3c831befd0fb6509153bee3e4 100644 (file)
@@ -874,6 +874,7 @@ OPTION(mon_rocksdb_options, OPT_STR)
  */
 OPTION(osd_client_op_priority, OPT_U32)
 OPTION(osd_recovery_op_priority, OPT_U32)
+OPTION(osd_peering_op_priority, OPT_U32)
 
 OPTION(osd_snap_trim_priority, OPT_U32)
 OPTION(osd_snap_trim_cost, OPT_U32) // set default cost equal to 1MB io
index 0064af6652312f65d52f74251d0f6854940729e2..1db1763e3ae175739f58152018e17640f6d21290 100644 (file)
@@ -3000,6 +3000,10 @@ std::vector<Option> get_global_options() {
     .set_default(3)
     .set_description(""),
 
+    Option("osd_peering_op_priority", Option::TYPE_UINT, Option::LEVEL_DEV)
+    .set_default(255)
+    .set_description(""),
+
     Option("osd_snap_trim_priority", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(5)
     .set_description(""),
index fcdaacb2ec2ab79b693ba7f46473ec1f638bd336..d3c737247999bedab5756e289af3f29d63a7a280 100644 (file)
@@ -9170,6 +9170,32 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
       epoch));
 }
 
+void OSD::enqueue_peering_evt(PG *pg, PGPeeringEventRef evt)
+{
+  dout(15) << __func__ << " " << pg->get_pgid() << " " << evt->get_desc() << dendl;
+  op_shardedwq.queue(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pg->get_pgid(), evt)),
+      10,
+      cct->_conf->osd_peering_op_priority,
+      utime_t(),
+      0,
+      evt->get_epoch_sent()));
+}
+
+void OSD::enqueue_peering_evt_front(PG *pg, PGPeeringEventRef evt)
+{
+  dout(15) << __func__ << " " << pg->get_pgid() << " " << evt->get_desc() << dendl;
+  op_shardedwq.queue_front(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pg->get_pgid(), evt)),
+      10,
+      cct->_conf->osd_peering_op_priority,
+      utime_t(),
+      0,
+      evt->get_epoch_sent()));
+}
+
 /*
  * NOTE: dequeue called in worker thread, with pg lock
  */
@@ -9289,6 +9315,39 @@ void OSD::process_peering_events(
   service.send_pg_temp();
 }
 
+void OSD::dequeue_peering_evt(
+  PG *pg,
+  PGPeeringEventRef evt,
+  ThreadPool::TPHandle& handle)
+{
+  if (pg->is_deleting()) {
+    pg->unlock();
+    return;
+  }
+  auto curmap = service.get_osdmap();
+  PG::RecoveryCtx rctx = create_context();
+  set<PGRef> split_pgs;
+  if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
+    advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs, true);
+  }
+  pg->do_peering_event(evt, &rctx);
+  auto need_up_thru = pg->get_need_up_thru();
+  auto same_interval_since = pg->get_same_interval_since();
+  if (!split_pgs.empty()) {
+    rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
+    split_pgs.clear();
+ }
+  dispatch_context_transaction(rctx, pg, &handle);
+  pg->unlock();
+
+  if (need_up_thru) {
+    queue_want_up_thru(same_interval_since);
+  }
+  dispatch_context(rctx, 0, curmap, &handle);
+
+  service.send_pg_temp();
+}
+
 // --------------------------------
 
 const char** OSD::get_tracked_conf_keys() const
index 52e27c622358224fc983a8f097226cdab85282d0..4f45b47a69f3326f2ca81918740374192f24a5f2 100644 (file)
@@ -1593,6 +1593,7 @@ private:
    * and already requeued the items.
    */
   friend class PGOpItem;
+  friend class PGPeeringItem;
   friend class PGRecovery;
 
   class ShardedOpWQ
@@ -1799,6 +1800,17 @@ private:
     PGRef pg, OpRequestRef op,
     ThreadPool::TPHandle &handle);
 
+  void enqueue_peering_evt(
+    PG *pg,
+    PGPeeringEventRef ref);
+  void enqueue_peering_evt_front(
+    PG *pg,
+    PGPeeringEventRef ref);
+  void dequeue_peering_evt(
+    PG *pg,
+    PGPeeringEventRef ref,
+    ThreadPool::TPHandle& handle);
+
   // -- peering queue --
   struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
     list<PG*> peering_queue;
index a6e5e70809ca1435d92cc6bead69f37a10064263..fc7011c13284d5e9738da34fed19aa2f35b5b989 100644 (file)
@@ -23,6 +23,14 @@ void PGOpItem::run(OSD *osd,
   pg->unlock();
 }
 
+void PGPeeringItem::run(
+  OSD *osd,
+  PGRef& pg,
+  ThreadPool::TPHandle &handle)
+{
+  osd->dequeue_peering_evt(pg.get(), evt, handle);
+}
+
 void PGSnapTrim::run(OSD *osd,
                    PGRef& pg,
                    ThreadPool::TPHandle &handle)
index 43a544da5e849ca0ba11f002e0872b280a55b9a5..b34414b1211552736afce745a6439b0e939e59bc 100644 (file)
@@ -23,7 +23,7 @@
 #include "osd/PG.h"
 #include "common/mClockCommon.h"
 #include "messages/MOSDOp.h"
-
+#include "PGPeeringEvent.h"
 
 class OSD;
 
@@ -41,6 +41,7 @@ public:
   public:
     enum class op_type_t {
       client_op,
+      peering_event,
       bg_snaptrim,
       bg_recovery,
       bg_scrub
@@ -198,6 +199,19 @@ public:
   void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
 };
 
+class PGPeeringItem : public PGOpQueueable {
+  PGPeeringEventRef evt;
+public:
+  PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+  op_type_t get_op_type() const override final {
+    return op_type_t::peering_event;
+  }
+  ostream &print(ostream &rhs) const override final {
+    return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+  }
+  void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
 class PGSnapTrim : public PGOpQueueable {
   epoch_t epoch_queued;
 public:
index 8d8e3caaed381b4f085b31c47149a49e4cb4daf8..7042d94bb68821fba29f4cb15a850954e90bc273 100644 (file)
@@ -5787,11 +5787,11 @@ void PG::take_waiters()
 {
   dout(10) << "take_waiters" << dendl;
   requeue_map_waiters();
-  for (list<PGPeeringEventRef>::iterator i = peering_waiters.begin();
-       i != peering_waiters.end();
-       ++i) osd->queue_for_peering(this);
-  peering_queue.splice(peering_queue.begin(), peering_waiters,
-                      peering_waiters.begin(), peering_waiters.end());
+  for (auto i = peering_waiters.rbegin();
+       i != peering_waiters.rend();
+       ++i) {
+    osd->osd->enqueue_peering_evt_front(this, *i);
+  }
 }
 
 void PG::process_peering_event(RecoveryCtx *rctx)
@@ -5799,7 +5799,11 @@ void PG::process_peering_event(RecoveryCtx *rctx)
   assert(!peering_queue.empty());
   PGPeeringEventRef evt = peering_queue.front();
   peering_queue.pop_front();
+  do_peering_event(evt, rctx);
+}
 
+void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
+{
   dout(10) << __func__ << ": " << evt->get_desc() << dendl;
   if (!have_same_or_newer_map(evt->get_epoch_sent())) {
     dout(10) << "deferring event " << evt->get_desc() << dendl;
@@ -5816,8 +5820,7 @@ void PG::queue_peering_event(PGPeeringEventRef evt)
 {
   if (old_peering_evt(evt))
     return;
-  peering_queue.push_back(evt);
-  osd->queue_for_peering(this);
+  osd->osd->enqueue_peering_evt(this, evt);
 }
 
 void PG::queue_null(epoch_t msg_epoch,
index edbdb15d1aad6d32887acc24bf8b2c23b0b82b55..874a0863839d19f0ce162d9353f7b0e4d55dba8f 100644 (file)
@@ -408,6 +408,7 @@ public:
 
   void queue_peering_event(PGPeeringEventRef evt);
   void process_peering_event(RecoveryCtx *rctx);
+  void do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rcx);
   void queue_query(epoch_t msg_epoch, epoch_t query_epoch,
                   pg_shard_t from, const pg_query_t& q);
   void queue_null(epoch_t msg_epoch, epoch_t query_epoch);