]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: move recovery into opwq
authorSamuel Just <sjust@redhat.com>
Fri, 12 Dec 2014 22:06:27 +0000 (14:06 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 16 May 2016 23:40:55 +0000 (16:40 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 67d729ce3370620c0692bc7378b1ac00a6ffb336..2da949a2edd8437e8aa57d0d009bcc58622312f9 100644 (file)
@@ -865,6 +865,10 @@ OPTION(osd_scrub_priority, OPT_U32, 5)
 // set default cost equal to 50MB io
 OPTION(osd_scrub_cost, OPT_U32, 50<<20) 
 
+OPTION(osd_recovery_priority, OPT_U32, 5)
+// set default cost equal to 20MB io
+OPTION(osd_recovery_cost, OPT_U32, 20<<20)
+
 /**
  * osd_recovery_op_warn_multiple scales the normal warning threshhold,
  * osd_op_complaint_time, so that slow recovery ops won't cause noise
index 973f3eb2cd9013e8b67ca456c3ef9bb16f57c56e..c57a776e8daa69b25a2d8e15648e40de0a456642 100644 (file)
@@ -171,6 +171,11 @@ void PGQueueable::RunVis::operator()(const PGScrub &op) {
   return pg->scrub(op.epoch_queued, handle);
 }
 
+void PGQueueable::RunVis::operator()(const PGRecovery &op) {
+  /// TODO: need to handle paused recovery
+  return osd->do_recovery(pg.get(), op.epoch_queued, handle);
+}
+
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -215,9 +220,8 @@ OSDService::OSDService(OSD *osd) :
   monc(osd->monc),
   op_wq(osd->op_shardedwq),
   peering_wq(osd->peering_wq),
-  recovery_wq(osd->recovery_wq),
   recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
-                 &osd->recovery_tp),
+                 &osd->disk_tp),
   op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
   class_handler(osd->class_handler),
   pg_epoch_lock("OSDService::pg_epoch_lock"),
@@ -1356,17 +1360,6 @@ OSDMapRef OSDService::try_get_map(epoch_t epoch)
   return _add_map(map);
 }
 
-bool OSDService::queue_for_recovery(PG *pg)
-{
-  bool b = recovery_wq.queue(pg);
-  if (b)
-    dout(10) << "queue_for_recovery queued " << *pg << dendl;
-  else
-    dout(10) << "queue_for_recovery already queued " << *pg << dendl;
-  return b;
-}
-
-
 // ops
 
 
@@ -1659,7 +1652,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   osd_tp(cct, "OSD::osd_tp", "tp_osd", cct->_conf->osd_op_threads, "osd_op_threads"),
   osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
     cct->_conf->osd_op_num_threads_per_shard * cct->_conf->osd_op_num_shards),
-  recovery_tp(cct, "OSD::recovery_tp", "tp_osd_recov", cct->_conf->osd_recovery_threads, "osd_recovery_threads"),
   disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", "tp_osd_cmd",  1),
   paused_recovery(false),
@@ -1705,12 +1697,8 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_command_thread_timeout,
     cct->_conf->osd_command_thread_suicide_timeout,
     &command_tp),
+  recovery_lock("OSD::recovery_lock"),
   recovery_ops_active(0),
-  recovery_wq(
-    this,
-    cct->_conf->osd_recovery_thread_timeout,
-    cct->_conf->osd_recovery_thread_suicide_timeout,
-    &recovery_tp),
   replay_queue_lock("OSD::replay_queue_lock"),
   remove_wq(
     store,
@@ -2194,7 +2182,6 @@ int OSD::init()
 
   osd_tp.start();
   osd_op_tp.start();
-  recovery_tp.start();
   disk_tp.start();
   command_tp.start();
 
@@ -2684,10 +2671,6 @@ int OSD::shutdown()
   heartbeat_lock.Unlock();
   heartbeat_thread.join();
 
-  recovery_tp.drain();
-  recovery_tp.stop();
-  dout(10) << "recovery tp stopped" << dendl;
-
   osd_tp.drain();
   peering_wq.clear();
   osd_tp.stop();
@@ -4340,8 +4323,9 @@ void OSD::tick()
   }
 
   if (is_active()) {
-    // periodically kick recovery work queue
-    recovery_tp.wake();
+    if (!scrub_random_backoff()) {
+      sched_scrub();
+    }
 
     check_replay_queue();
 
@@ -5768,7 +5752,7 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
        << "to " << cct->_conf->osd_recovery_delay_start;
     defer_recovery_until = ceph_clock_now(cct);
     defer_recovery_until += cct->_conf->osd_recovery_delay_start;
-    recovery_wq.wake();
+    /// TODO
   }
 
   else if (prefix == "cpu_profiler") {
@@ -7374,13 +7358,13 @@ void OSD::activate_map()
     if (!paused_recovery) {
       dout(1) << "pausing recovery (NORECOVER flag set)" << dendl;
       paused_recovery = true;
-      recovery_tp.pause_new();
+      /// TODO
     }
   } else {
     if (paused_recovery) {
       dout(1) << "resuming recovery (NORECOVER flag cleared)" << dendl;
+      /// TODO
       paused_recovery = false;
-      recovery_tp.unpause();
     }
   }
 
@@ -8375,7 +8359,7 @@ bool OSD::_recover_now()
   return true;
 }
 
-void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
+void OSD::do_recovery(PG *pg, epoch_t queued, ThreadPool::TPHandle &handle)
 {
   if (g_conf->osd_recovery_sleep > 0) {
     handle.suspend_tp_timeout();
@@ -8387,7 +8371,7 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
   }
 
   // see how many we should try to start.  note that this is a bit racy.
-  recovery_wq.lock();
+  recovery_lock.Lock();
   int max = MIN(cct->_conf->osd_recovery_max_active - recovery_ops_active,
       cct->_conf->osd_recovery_max_single_start);
   if (max > 0) {
@@ -8398,19 +8382,23 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
     dout(10) << "do_recovery can start 0 (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active
             << " rops)" << dendl;
   }
-  recovery_wq.unlock();
+  recovery_lock.Unlock();
 
   if (max <= 0) {
     dout(10) << "do_recovery raced and failed to start anything; requeuing " << *pg << dendl;
-    recovery_wq.queue(pg);
+    service.queue_for_recovery(pg, true);
     return;
   } else {
     pg->lock_suspend_timeout(handle);
-    if (pg->deleting || !(pg->is_peered() && pg->is_primary())) {
+    if (pg->pg_has_reset_since(queued) ||
+       pg->deleting || !(pg->is_peered() && pg->is_primary())) {
       pg->unlock();
       goto out;
     }
 
+    assert(pg->recovery_queued);
+    pg->recovery_queued = false;
+    
     dout(10) << "do_recovery starting " << max << " " << *pg << dendl;
 #ifdef DEBUG_RECOVERY_OIDS
     dout(20) << "  active was " << recovery_oids[pg->info.pgid] << dendl;
@@ -8421,7 +8409,6 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
     dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
     // If no recovery op is started, don't bother to manipulate the RecoveryCtx
     if (!started && (more || !pg->have_unfound())) {
-      pg->unlock();
       goto out;
     }
 
@@ -8438,9 +8425,9 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
       pg->discover_all_missing(*rctx.query_map);
       if (rctx.query_map->empty()) {
        dout(10) << "do_recovery  no luck, giving up on this pg for now" << dendl;
-       recovery_wq.lock();
-       recovery_wq._dequeue(pg);
-       recovery_wq.unlock();
+      } else {
+       dout(10) << "do_recovery  no luck, giving up on this pg for now" << dendl;
+       pg->queue_recovery();
       }
     }
 
@@ -8451,18 +8438,17 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
   }
 
  out:
-  recovery_wq.lock();
+  recovery_lock.Lock();
   if (max > 0) {
     assert(recovery_ops_active >= max);
     recovery_ops_active -= max;
   }
-  recovery_wq._wake();
-  recovery_wq.unlock();
+  recovery_lock.Unlock();
 }
 
 void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
 {
-  recovery_wq.lock();
+  recovery_lock.Lock();
   dout(10) << "start_recovery_op " << *pg << " " << soid
           << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
           << dendl;
@@ -8475,12 +8461,12 @@ void OSD::start_recovery_op(PG *pg, const hobject_t& soid)
   recovery_oids[pg->info.pgid].insert(soid);
 #endif
 
-  recovery_wq.unlock();
+  recovery_lock.Unlock();
 }
 
 void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 {
-  recovery_wq.lock();
+  recovery_lock.Lock();
   dout(10) << "finish_recovery_op " << *pg << " " << soid
           << " dequeue=" << dequeue
           << " (" << recovery_ops_active << "/" << cct->_conf->osd_recovery_max_active << " rops)"
@@ -8496,14 +8482,7 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
   recovery_oids[pg->info.pgid].erase(soid);
 #endif
 
-  if (dequeue)
-    recovery_wq._dequeue(pg);
-  else {
-    recovery_wq._queue_front(pg);
-  }
-
-  recovery_wq._wake();
-  recovery_wq.unlock();
+  recovery_lock.Unlock();
 }
 
 // =========================================================
@@ -9338,20 +9317,6 @@ int OSD::init_op_flags(OpRequestRef& op)
   return 0;
 }
 
-bool OSD::RecoveryWQ::_enqueue(PG *pg) {
-  if (!pg->recovery_item.is_on_list()) {
-    pg->get("RecoveryWQ");
-    osd->recovery_queue.push_back(&pg->recovery_item);
-
-    if (osd->cct->_conf->osd_recovery_delay_start > 0) {
-      osd->defer_recovery_until = ceph_clock_now(osd->cct);
-      osd->defer_recovery_until += osd->cct->_conf->osd_recovery_delay_start;
-    }
-    return true;
-  }
-  return false;
-}
-
 void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
   set<PG*> got;
   for (list<PG*>::iterator i = peering_queue.begin();
index fd40d3b1867b2a3fbe8f3f9d713316b27cbb36aa..87dc5522ea6a8db828c084435d53ff57ab4533a1 100644 (file)
@@ -345,11 +345,21 @@ struct PGSnapTrim {
   }
 };
 
+struct PGRecovery {
+  epoch_t epoch_queued;
+  PGRecovery(epoch_t e) : epoch_queued(e) {}
+  ostream &operator<<(ostream &rhs) {
+    return rhs << "PGRecovery";
+  }
+};
+
+
 class PGQueueable {
   typedef boost::variant<
     OpRequestRef,
     PGSnapTrim,
-    PGScrub
+    PGScrub,
+    PGRecovery
     > QVariant;
   QVariant qvariant;
   int cost; 
@@ -365,6 +375,7 @@ class PGQueueable {
     void operator()(const OpRequestRef &op);
     void operator()(const PGSnapTrim &op);
     void operator()(const PGScrub &op);
+    void operator()(const PGRecovery &op);
   };
 public:
   // cppcheck-suppress noExplicitConstructor
@@ -384,9 +395,14 @@ public:
     const entity_inst_t &owner)
     : qvariant(op), cost(cost), priority(priority), start_time(start_time),
       owner(owner) {}
-  boost::optional<OpRequestRef> maybe_get_op() {
-    OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
-    return op ? *op : boost::optional<OpRequestRef>();
+  PGQueueable(
+    const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
+    const entity_inst_t &owner)
+    : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+      owner(owner) {}
+  const boost::optional<OpRequestRef> maybe_get_op() const {
+    const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
+    return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
   }
   void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
     RunVis v(osd, pg, handle);
@@ -419,7 +435,6 @@ public:
   MonClient   *&monc;
   ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
-  ThreadPool::WorkQueue<PG> &recovery_wq;
   GenContextWQ recovery_gen_wq;
   GenContextWQ op_gen_wq;
   ClassHandler  *&class_handler;
@@ -868,7 +883,21 @@ public:
   void send_pg_temp();
 
   void queue_for_peering(PG *pg);
-  bool queue_for_recovery(PG *pg);
+  void queue_for_recovery(PG *pg, bool front = false) {
+    pair<PGRef, PGQueueable> to_queue = make_pair(
+      pg,
+      PGQueueable(
+       PGRecovery(pg->get_osdmap()->get_epoch()),
+       cct->_conf->osd_recovery_cost,
+       cct->_conf->osd_recovery_priority,
+       ceph_clock_now(cct),
+       entity_inst_t()));
+    if (front) {
+      op_wq.queue_front(to_queue);
+    } else {
+      op_wq.queue(to_queue);
+    }
+  }
   void queue_for_snap_trim(PG *pg) {
     op_wq.queue(
       make_pair(
@@ -1278,7 +1307,6 @@ private:
 
   ThreadPool osd_tp;
   ShardedThreadPool osd_op_tp;
-  ThreadPool recovery_tp;
   ThreadPool disk_tp;
   ThreadPool command_tp;
 
@@ -2219,59 +2247,16 @@ protected:
   void do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data);
 
   // -- pg recovery --
-  xlist<PG*> recovery_queue;
+  Mutex recovery_lock;
   utime_t defer_recovery_until;
   int recovery_ops_active;
 #ifdef DEBUG_RECOVERY_OIDS
   map<spg_t, set<hobject_t, hobject_t::BitwiseComparator> > recovery_oids;
 #endif
 
-  struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
-    OSD *osd;
-    RecoveryWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, si, tp), osd(o) {}
-
-    bool _empty() {
-      return osd->recovery_queue.empty();
-    }
-    bool _enqueue(PG *pg);
-    void _dequeue(PG *pg) {
-      if (pg->recovery_item.remove_myself())
-       pg->put("RecoveryWQ");
-    }
-    PG *_dequeue() {
-      if (osd->recovery_queue.empty())
-       return NULL;
-      
-      if (!osd->_recover_now())
-       return NULL;
-
-      PG *pg = osd->recovery_queue.front();
-      osd->recovery_queue.pop_front();
-      return pg;
-    }
-    void _queue_front(PG *pg) {
-      if (!pg->recovery_item.is_on_list()) {
-       pg->get("RecoveryWQ");
-       osd->recovery_queue.push_front(&pg->recovery_item);
-      }
-    }
-    void _process(PG *pg, ThreadPool::TPHandle &handle) override {
-      osd->do_recovery(pg, handle);
-      pg->put("RecoveryWQ");
-    }
-    void _clear() {
-      while (!osd->recovery_queue.empty()) {
-       PG *pg = osd->recovery_queue.front();
-       osd->recovery_queue.pop_front();
-       pg->put("RecoveryWQ");
-      }
-    }
-  } recovery_wq;
-
   void start_recovery_op(PG *pg, const hobject_t& soid);
   void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
-  void do_recovery(PG *pg, ThreadPool::TPHandle &handle);
+  void do_recovery(PG *pg, epoch_t epoch_queued, ThreadPool::TPHandle &handle);
   bool _recover_now();
 
   // replay / delayed pg activation
index fa6b8375bba6a482bfa45606657438d4caaa993c..a8c78164d6c2f02121763b1462f7be3183b5ac27 100644 (file)
@@ -210,9 +210,10 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   coll(p), pg_log(cct),
   pgmeta_oid(p.make_pgmeta_oid()),
   missing_loc(this),
-  recovery_item(this), stat_queue_item(this),
+  stat_queue_item(this),
   snap_trim_queued(false),
   scrub_queued(false),
+  recovery_queued(false),
   recovery_ops_active(0),
   role(-1),
   state(0),
@@ -949,8 +950,6 @@ void PG::clear_primary_state()
   scrubber.reserved_peers.clear();
   scrub_after_recovery = false;
 
-  osd->recovery_wq.dequeue(this);
-
   agent_clear();
 }
 
@@ -1847,8 +1846,6 @@ void PG::activate(ObjectStore::Transaction& t,
       build_might_have_unfound();
 
       state_set(PG_STATE_DEGRADED);
-      dout(10) << "activate - starting recovery" << dendl;
-      osd->queue_for_recovery(this);
       if (have_unfound())
        discover_all_missing(query_map);
     }
@@ -2069,6 +2066,20 @@ bool PG::requeue_scrub()
   }
 }
 
+void PG::queue_recovery(bool front)
+{
+  if (!is_primary() || !is_peered()) {
+    dout(10) << "queue_recovery -- not primary or not peered " << dendl;
+    assert(!recovery_queued);
+  } else if (recovery_queued) {
+    dout(10) << "queue_recovery -- already queued" << dendl;
+  } else {
+    dout(10) << "queue_recovery -- queuing" << dendl;
+    recovery_queued = true;
+    osd->queue_for_recovery(this, front);
+  }
+}
+
 bool PG::queue_scrub()
 {
   assert(_lock.is_locked());
@@ -2229,6 +2240,9 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
 #endif
   // TODOSAM: osd->osd-> not good
   osd->osd->finish_recovery_op(this, soid, dequeue);
+
+  if (!dequeue) {
+  }
 }
 
 static void split_replay_queue(
@@ -6034,7 +6048,7 @@ PG::RecoveryState::Backfilling::Backfilling(my_context ctx)
   context< RecoveryMachine >().log_enter(state_name);
   PG *pg = context< RecoveryMachine >().pg;
   pg->backfill_reserved = true;
-  pg->osd->queue_for_recovery(pg);
+  pg->queue_recovery();
   pg->state_clear(PG_STATE_BACKFILL_TOOFULL);
   pg->state_clear(PG_STATE_BACKFILL_WAIT);
   pg->state_set(PG_STATE_BACKFILL);
@@ -6063,8 +6077,6 @@ PG::RecoveryState::Backfilling::react(const RemoteReservationRejected &)
     }
   }
 
-  pg->osd->recovery_wq.dequeue(pg);
-
   pg->waiting_on_backfill.clear();
   pg->finish_recovery_op(hobject_t::get_max());
 
@@ -6466,7 +6478,7 @@ PG::RecoveryState::Recovering::Recovering(my_context ctx)
   PG *pg = context< RecoveryMachine >().pg;
   pg->state_clear(PG_STATE_RECOVERY_WAIT);
   pg->state_set(PG_STATE_RECOVERING);
-  pg->osd->queue_for_recovery(pg);
+  pg->queue_recovery();
 }
 
 void PG::RecoveryState::Recovering::release_reservations()
@@ -6734,7 +6746,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const ActMap&)
   if (!pg->is_clean() &&
       !pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
       (!pg->get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) || pg->is_degraded())) {
-    pg->osd->queue_for_recovery(pg);
+    pg->queue_recovery();
   }
   return forward_event();
 }
@@ -6800,7 +6812,7 @@ boost::statechart::result PG::RecoveryState::Active::react(const MLogRec& logevt
     logevt.from,
     context< RecoveryMachine >().get_recovery_ctx());
   if (got_missing)
-    pg->osd->queue_for_recovery(pg);
+    pg->queue_recovery();
   return discard_event();
 }
 
index efcd99d63348d8096879edace68b14581467c207..f86bcfb5aede9ed9951b9420eccbe329954bc0df 100644 (file)
@@ -488,9 +488,10 @@ public:
 
   /* You should not use these items without taking their respective queue locks
    * (if they have one) */
-  xlist<PG*>::item recovery_item, stat_queue_item;
+  xlist<PG*>::item stat_queue_item;
   bool snap_trim_queued;
   bool scrub_queued;
+  bool recovery_queued;
 
   int recovery_ops_active;
   set<pg_shard_t> waiting_on_backfill;
@@ -2258,6 +2259,7 @@ public:
 
   void queue_snap_trim();
   bool requeue_scrub();
+  void queue_recovery(bool front = false);
   bool queue_scrub();
   unsigned get_scrub_priority();
 
index 43ef40ffb9982136ba29c96b39d146266c26f990..5a1f037b2e0dcda696ae91b05bee91dc9bc26468 100644 (file)
@@ -9885,7 +9885,7 @@ void ReplicatedPG::mark_all_unfound_lost(
     void operator()() {
       pg->requeue_ops(pg->waiting_for_all_missing);
       pg->waiting_for_all_missing.clear();
-      pg->osd->queue_for_recovery(pg);
+      pg->queue_recovery();
     }
   };
   submit_log_entries(
@@ -9895,7 +9895,7 @@ void ReplicatedPG::mark_all_unfound_lost(
       [=]() {
        requeue_ops(waiting_for_all_missing);
        waiting_for_all_missing.clear();
-       osd->queue_for_recovery(this);
+       queue_recovery();
 
        stringstream ss;
        ss << "pg has " << num_unfound
@@ -10026,7 +10026,6 @@ void ReplicatedPG::on_shutdown()
   dout(10) << "on_shutdown" << dendl;
 
   // remove from queues
-  osd->recovery_wq.dequeue(this);
   osd->pg_stat_queue_dequeue(this);
   osd->dequeue_pg(this, 0);
   osd->peering_wq.dequeue(this);
index fc5e5d466731923f23c6873781f6cfd064ce2ff8..d0cb85535eebc680783b9c52a3db021399e9c6a4 100644 (file)
@@ -842,7 +842,7 @@ protected:
       &requeue_recovery,
       &requeue_snaptrim);
     if (requeue_recovery)
-      osd->recovery_wq.queue(this);
+      queue_recovery();
     if (requeue_snaptrim)
       queue_snap_trim();