]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: remove old peering work queue
authorSage Weil <sage@redhat.com>
Sun, 5 Nov 2017 17:08:40 +0000 (11:08 -0600)
committerSage Weil <sage@redhat.com>
Mon, 4 Dec 2017 18:45:16 +0000 (12:45 -0600)
Signed-off-by: Sage Weil <sage@redhat.com>
src/common/legacy_config_opts.h
src/common/options.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PrimaryLogPG.cc

index 6a89f82a290e1ce3c831befd0fb6509153bee3e4..22d62ab4705a6b5d5a232410d148fd1296e6084c 100644 (file)
@@ -631,8 +631,6 @@ OPTION(osd_inject_failure_on_pg_removal, OPT_BOOL)
 OPTION(osd_max_markdown_period , OPT_INT)
 OPTION(osd_max_markdown_count, OPT_INT)
 
-OPTION(osd_peering_wq_threads, OPT_INT)
-OPTION(osd_peering_wq_batch_size, OPT_U64)
 OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64)
 OPTION(osd_op_pq_min_cost, OPT_U64)
 OPTION(osd_disk_threads, OPT_INT)
index 1db1763e3ae175739f58152018e17640f6d21290..bc8ca9c42f743101f1c0808233e3da60ab01146b 100644 (file)
@@ -1931,14 +1931,6 @@ std::vector<Option> get_global_options() {
     .set_default(5)
     .set_description(""),
 
-    Option("osd_peering_wq_threads", Option::TYPE_INT, Option::LEVEL_ADVANCED)
-    .set_default(2)
-    .set_description(""),
-
-    Option("osd_peering_wq_batch_size", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
-    .set_default(20)
-    .set_description(""),
-
     Option("osd_op_pq_max_tokens_per_priority", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
     .set_default(4194304)
     .set_description(""),
index 8cc099cc7ecbf088d4fc6afda508d4146ca1afec..edd8edfd958cb5b81b4ad2d73294d19dd34f7869 100644 (file)
@@ -213,7 +213,6 @@ OSDService::OSDService(OSD *osd) :
   logger(osd->logger),
   recoverystate_perf(osd->recoverystate_perf),
   monc(osd->monc),
-  peering_wq(osd->peering_wq),
   recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
                  &osd->disk_tp),
   class_handler(osd->class_handler),
@@ -1683,11 +1682,6 @@ void OSDService::enqueue_front(OpQueueItem&& qi)
   osd->op_shardedwq.queue_front(std::move(qi));
 }
 
-void OSDService::queue_for_peering(PG *pg)
-{
-  peering_wq.queue(pg);
-}
-
 void OSDService::queue_for_snap_trim(PG *pg)
 {
   dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
@@ -1953,9 +1947,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   trace_endpoint("0.0.0.0", 0, "osd"),
   asok_hook(NULL),
   osd_compat(get_osd_compat_set()),
-  peering_tp(cct, "OSD::peering_tp", "tp_peering",
-            cct->_conf->osd_peering_wq_threads,
-            "osd_peering_tp_threads"),
   osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
            get_num_op_threads()),
   disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
@@ -1983,11 +1974,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_op_thread_timeout,
     cct->_conf->osd_op_thread_suicide_timeout,
     &osd_op_tp),
-  peering_wq(
-    this,
-    cct->_conf->osd_op_thread_timeout,
-    cct->_conf->osd_op_thread_suicide_timeout,
-    &peering_tp),
   map_lock("OSD::map_lock"),
   pg_map_lock("OSD::pg_map_lock"),
   last_pg_create_epoch(0),
@@ -2602,7 +2588,6 @@ int OSD::init()
   monc->set_log_client(&log_client);
   update_log_config();
 
-  peering_tp.start();
   osd_op_tp.start();
   disk_tp.start();
   command_tp.start();
@@ -2668,7 +2653,6 @@ int OSD::init()
 
   dout(10) << "ensuring pgs have consumed prior maps" << dendl;
   consume_map();
-  peering_wq.drain();
 
   dout(0) << "done with init, starting boot process" << dendl;
 
@@ -3331,11 +3315,6 @@ int OSD::shutdown()
   heartbeat_lock.Unlock();
   heartbeat_thread.join();
 
-  peering_tp.drain();
-  peering_wq.clear();
-  peering_tp.stop();
-  dout(10) << "osd tp stopped" << dendl;
-
   osd_op_tp.drain();
   osd_op_tp.stop();
   dout(10) << "op sharded tp stopped" << dendl;
@@ -3437,8 +3416,6 @@ int OSD::shutdown()
   hb_front_server_messenger->shutdown();
   hb_back_server_messenger->shutdown();
 
-  peering_wq.clear();
-
   return r;
 }
 
@@ -9270,51 +9247,6 @@ struct C_CompleteSplits : public Context {
   }
 };
 
-void OSD::process_peering_events(
-  const list<PG*> &pgs,
-  ThreadPool::TPHandle &handle
-  )
-{
-  bool need_up_thru = false;
-  epoch_t same_interval_since = 0;
-  OSDMapRef curmap;
-  PG::RecoveryCtx rctx = create_context();
-  rctx.handle = &handle;
-  for (list<PG*>::const_iterator i = pgs.begin();
-       i != pgs.end();
-       ++i) {
-    set<PGRef> split_pgs;
-    PG *pg = *i;
-    pg->lock_suspend_timeout(handle);
-    curmap = service.get_osdmap();
-    if (pg->is_deleting()) {
-      pg->unlock();
-      continue;
-    }
-    if (!advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs, false)) {
-      // we need to requeue the PG explicitly since we didn't actually
-      // handle an event
-      peering_wq.queue(pg);
-    } else {
-      pg->process_peering_event(&rctx);
-    }
-    need_up_thru = pg->get_need_up_thru() || need_up_thru;
-    same_interval_since = MAX(pg->get_same_interval_since(),
-                             same_interval_since);
-    if (!split_pgs.empty()) {
-      rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
-      split_pgs.clear();
-    }
-    dispatch_context_transaction(rctx, pg, &handle);
-    pg->unlock();
-  }
-  if (need_up_thru)
-    queue_want_up_thru(same_interval_since);
-  dispatch_context(rctx, 0, curmap, &handle);
-
-  service.send_pg_temp();
-}
-
 void OSD::dequeue_peering_evt(
   PG *pg,
   PGPeeringEventRef evt,
@@ -9730,21 +9662,6 @@ int OSD::init_op_flags(OpRequestRef& op)
   return 0;
 }
 
-void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
-  for (list<PG*>::iterator i = peering_queue.begin();
-      i != peering_queue.end() &&
-      out->size() < osd->cct->_conf->osd_peering_wq_batch_size;
-      ) {
-        if (in_use.count(*i)) {
-          ++i;
-        } else {
-          out->push_back(*i);
-          peering_queue.erase(i++);
-        }
-  }
-  in_use.insert(out->begin(), out->end());
-}
-
 
 // =============================================================
 
index 4f45b47a69f3326f2ca81918740374192f24a5f2..d0230ca4bb5c95bd8335872d20079fd25f47a755 100644 (file)
@@ -364,7 +364,6 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ThreadPool::BatchWorkQueue<PG> &peering_wq;
   GenContextWQ recovery_gen_wq;
   ClassHandler  *&class_handler;
 
@@ -850,8 +849,6 @@ public:
 
   void send_pg_created(pg_t pgid);
 
-  void queue_for_peering(PG *pg);
-
   Mutex snap_sleep_lock;
   SafeTimer snap_sleep_timer;
 
@@ -1344,7 +1341,6 @@ public:
 
 private:
 
-  ThreadPool peering_tp;
   ShardedThreadPool osd_op_tp;
   ThreadPool disk_tp;
   ThreadPool command_tp;
@@ -1811,63 +1807,6 @@ private:
     PGPeeringEventRef ref,
     ThreadPool::TPHandle& handle);
 
-  // -- peering queue --
-  struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
-    list<PG*> peering_queue;
-    OSD *osd;
-    set<PG*> in_use;
-    PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::BatchWorkQueue<PG>(
-       "OSD::PeeringWQ", ti, si, tp), osd(o) {}
-
-    void _dequeue(PG *pg) override {
-      for (list<PG*>::iterator i = peering_queue.begin();
-          i != peering_queue.end();
-          ) {
-       if (*i == pg) {
-         peering_queue.erase(i++);
-         pg->put("PeeringWQ");
-       } else {
-         ++i;
-       }
-      }
-    }
-    bool _enqueue(PG *pg) override {
-      pg->get("PeeringWQ");
-      peering_queue.push_back(pg);
-      return true;
-    }
-    bool _empty() override {
-      return peering_queue.empty();
-    }
-    void _dequeue(list<PG*> *out) override;
-    void _process(
-      const list<PG *> &pgs,
-      ThreadPool::TPHandle &handle) override {
-      assert(!pgs.empty());
-      osd->process_peering_events(pgs, handle);
-      for (list<PG *>::const_iterator i = pgs.begin();
-          i != pgs.end();
-          ++i) {
-       (*i)->put("PeeringWQ");
-      }
-    }
-    void _process_finish(const list<PG *> &pgs) override {
-      for (list<PG*>::const_iterator i = pgs.begin();
-          i != pgs.end();
-          ++i) {
-       in_use.erase(*i);
-      }
-    }
-    void _clear() override {
-      assert(peering_queue.empty());
-    }
-  } peering_wq;
-
-  void process_peering_events(
-    const list<PG*> &pg,
-    ThreadPool::TPHandle &handle);
-
   friend class PG;
   friend class PrimaryLogPG;
 
index 7042d94bb68821fba29f4cb15a850954e90bc273..f979944fa0d1ec265b773e56f115b0aed66eca58 100644 (file)
@@ -5794,14 +5794,6 @@ void PG::take_waiters()
   }
 }
 
-void PG::process_peering_event(RecoveryCtx *rctx)
-{
-  assert(!peering_queue.empty());
-  PGPeeringEventRef evt = peering_queue.front();
-  peering_queue.pop_front();
-  do_peering_event(evt, rctx);
-}
-
 void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
 {
   dout(10) << __func__ << ": " << evt->get_desc() << dendl;
index 874a0863839d19f0ce162d9353f7b0e4d55dba8f..e44537250bec95ecfab3d862cebc347fbdc86a2e 100644 (file)
@@ -407,7 +407,6 @@ public:
   bool set_force_backfill(bool b);
 
   void queue_peering_event(PGPeeringEventRef evt);
-  void process_peering_event(RecoveryCtx *rctx);
   void do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rcx);
   void queue_query(epoch_t msg_epoch, epoch_t query_epoch,
                   pg_shard_t from, const pg_query_t& q);
@@ -1671,7 +1670,6 @@ protected:
   };
 
 
-  list<PGPeeringEventRef> peering_queue;  // op queue
   list<PGPeeringEventRef> peering_waiters;
 
   struct QueryState : boost::statechart::event< QueryState > {
index 60247fdc0b7202f659d93fbd68c87ce304bb9de8..bbd7535cb9d3660f2b543ab37ea89aab3614039e 100644 (file)
@@ -10751,9 +10751,6 @@ void PrimaryLogPG::on_shutdown()
 {
   dout(10) << __func__ << dendl;
 
-  // remove from queues
-  osd->peering_wq.dequeue(this);
-
   // handles queue races
   deleting = true;