]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: convert scrub to use the OpWQ 4717/head
authorSamuel Just <sjust@redhat.com>
Fri, 15 May 2015 17:34:29 +0000 (10:34 -0700)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Jun 2015 18:11:37 +0000 (11:11 -0700)
Fixes: 8635
Signed-off-by: Samuel Just <sjust@redhat.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 98f15ff550d6e3844fee4f39aed0fb6c4cb55290..998e68c61b5cb0bc8612908a5a850ee25a88abe6 100644 (file)
@@ -588,9 +588,6 @@ OPTION(osd_recovery_thread_timeout, OPT_INT, 30)
 OPTION(osd_recovery_thread_suicide_timeout, OPT_INT, 300)
 OPTION(osd_recovery_sleep, OPT_FLOAT, 0)         // seconds to sleep between recovery ops
 OPTION(osd_snap_trim_sleep, OPT_FLOAT, 0)
-OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
-OPTION(osd_scrub_thread_suicide_timeout, OPT_INT, 60)
-OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
 OPTION(osd_scrub_invalid_stats, OPT_BOOL, true)
 OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
 OPTION(osd_remove_thread_suicide_timeout, OPT_INT, 10*60*60)
@@ -753,6 +750,10 @@ OPTION(osd_recovery_op_priority, OPT_U32, 10)
 OPTION(osd_snap_trim_priority, OPT_U32, 5)
 OPTION(osd_snap_trim_cost, OPT_U32, 1<<20) // set default cost equal to 1MB io
 
+OPTION(osd_scrub_priority, OPT_U32, 5)
+// set default cost equal to 50MB io
+OPTION(osd_scrub_cost, OPT_U32, 50<<20) 
+
 /**
  * osd_recovery_op_warn_multiple scales the normal warning threshhold,
  * osd_op_complaint_time, so that slow recovery ops won't cause noise
index 8c810f0fe4f3700803f8d3fed559037077cb0495..b0d9d279423cb97844785c2f12925a8778aa76b8 100644 (file)
@@ -160,6 +160,10 @@ void PGQueueable::RunVis::operator()(PGSnapTrim &op) {
   return pg->snap_trimmer(op.epoch_queued);
 }
 
+void PGQueueable::RunVis::operator()(PGScrub &op) {
+  return pg->scrub(op.epoch_queued, handle);
+}
+
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -204,7 +208,6 @@ OSDService::OSDService(OSD *osd) :
   op_wq(osd->op_shardedwq),
   peering_wq(osd->peering_wq),
   recovery_wq(osd->recovery_wq),
-  scrub_wq(osd->scrub_wq),
   recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
                  &osd->recovery_tp),
   op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
@@ -1544,11 +1547,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_recovery_thread_suicide_timeout,
     &recovery_tp),
   replay_queue_lock("OSD::replay_queue_lock"),
-  scrub_wq(
-    this,
-    cct->_conf->osd_scrub_thread_timeout,
-    cct->_conf->osd_scrub_thread_suicide_timeout,
-    &disk_tp),
   remove_wq(
     store,
     cct->_conf->osd_remove_thread_timeout,
@@ -5581,6 +5579,8 @@ epoch_t op_required_epoch(OpRequestRef op)
     return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
   case MSG_OSD_EC_READ_REPLY:
     return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+  case MSG_OSD_REP_SCRUB:
+    return replica_op_required_epoch<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
   default:
     assert(0);
     return 0;
@@ -5696,6 +5696,7 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
     break;
   case MSG_OSD_REP_SCRUB:
     handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
+    break;
   default:
     assert(0);
   }
index d01d8f3ea936e93e8dc4179532feedfbb584ec5a..df968594edf641526bbe29bc9a891661f75d6b06 100644 (file)
@@ -317,18 +317,27 @@ typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
 
 class OSD;
 
+struct PGScrub {
+  epoch_t epoch_queued;
+  PGScrub(epoch_t e) : epoch_queued(e) {}
+  ostream &operator<<(ostream &rhs) {
+    return rhs << "PGScrub";
+  }
+};
+
 struct PGSnapTrim {
   epoch_t epoch_queued;
   PGSnapTrim(epoch_t e) : epoch_queued(e) {}
   ostream &operator<<(ostream &rhs) {
-    return rhs << "SnapTrim";
+    return rhs << "PGSnapTrim";
   }
 };
 
 class PGQueueable {
   typedef boost::variant<
     OpRequestRef,
-    PGSnapTrim
+    PGSnapTrim,
+    PGScrub
     > QVariant;
   QVariant qvariant;
   int cost; 
@@ -343,6 +352,7 @@ class PGQueueable {
       : osd(osd), pg(pg), handle(handle) {}
     void operator()(OpRequestRef &op);
     void operator()(PGSnapTrim &op);
+    void operator()(PGScrub &op);
   };
 public:
   PGQueueable(OpRequestRef op)
@@ -356,6 +366,11 @@ public:
     const entity_inst_t &owner)
     : qvariant(op), cost(cost), priority(priority), start_time(start_time),
       owner(owner) {}
+  PGQueueable(
+    const PGScrub &op, int cost, unsigned priority, utime_t start_time,
+    const entity_inst_t &owner)
+    : qvariant(op), cost(cost), priority(priority), start_time(start_time),
+      owner(owner) {}
   boost::optional<OpRequestRef> maybe_get_op() {
     OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
     return op ? *op : boost::optional<OpRequestRef>();
@@ -391,7 +406,6 @@ public:
   ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
   ThreadPool::WorkQueue<PG> &recovery_wq;
-  ThreadPool::WorkQueue<PG> &scrub_wq;
   GenContextWQ recovery_gen_wq;
   GenContextWQ op_gen_wq;
   ClassHandler  *&class_handler;
@@ -769,8 +783,16 @@ public:
          ceph_clock_now(cct),
          entity_inst_t())));
   }
-  bool queue_for_scrub(PG *pg) {
-    return scrub_wq.queue(pg);
+  void queue_for_scrub(PG *pg) {
+    op_wq.queue(
+      make_pair(
+       pg,
+       PGQueueable(
+         PGScrub(pg->get_osdmap()->get_epoch()),
+         cct->_conf->osd_scrub_cost,
+         cct->_conf->osd_scrub_priority,
+         ceph_clock_now(cct),
+         entity_inst_t())));
   }
 
   // osd map cache (past osd maps)
@@ -2156,51 +2178,6 @@ protected:
   bool scrub_should_schedule();
   bool scrub_time_permit(utime_t now);
 
-  xlist<PG*> scrub_queue;
-
-  struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
-    OSD *osd;
-    ScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, si, tp), osd(o) {}
-
-    bool _empty() {
-      return osd->scrub_queue.empty();
-    }
-    bool _enqueue(PG *pg) {
-      if (pg->scrub_item.is_on_list()) {
-       return false;
-      }
-      pg->get("ScrubWQ");
-      osd->scrub_queue.push_back(&pg->scrub_item);
-      return true;
-    }
-    void _dequeue(PG *pg) {
-      if (pg->scrub_item.remove_myself()) {
-       pg->put("ScrubWQ");
-      }
-    }
-    PG *_dequeue() {
-      if (osd->scrub_queue.empty())
-       return NULL;
-      PG *pg = osd->scrub_queue.front();
-      osd->scrub_queue.pop_front();
-      return pg;
-    }
-    void _process(
-      PG *pg,
-      ThreadPool::TPHandle &handle) {
-      pg->scrub(handle);
-      pg->put("ScrubWQ");
-    }
-    void _clear() {
-      while (!osd->scrub_queue.empty()) {
-       PG *pg = osd->scrub_queue.front();
-       osd->scrub_queue.pop_front();
-       pg->put("ScrubWQ");
-      }
-    }
-  } scrub_wq;
-
   // -- removing --
   struct RemoveWQ :
     public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
index 22df0afc48288fd30ede161d8ab30aaa1eaf1305..c90b67ba9297421afb9675899fbc962292be7fcf 100644 (file)
@@ -195,8 +195,9 @@ PG::PG(OSDService *o, OSDMapRef curmap,
   coll(p), pg_log(cct),
   pgmeta_oid(p.make_pgmeta_oid()),
   missing_loc(this),
-  recovery_item(this), scrub_item(this), stat_queue_item(this),
+  recovery_item(this), stat_queue_item(this),
   snap_trim_queued(false),
+  scrub_queued(false),
   recovery_ops_active(0),
   role(0),
   state(0),
@@ -1937,6 +1938,20 @@ void PG::queue_snap_trim()
   }
 }
 
+bool PG::requeue_scrub()
+{
+  assert(is_locked());
+  if (scrub_queued) {
+    dout(10) << __func__ << ": already queued" << dendl;
+    return false;
+  } else {
+    dout(10) << __func__ << ": queueing" << dendl;
+    scrub_queued = true;
+    osd->queue_for_scrub(this);
+    return true;
+  }
+}
+
 bool PG::queue_scrub()
 {
   assert(_lock.is_locked());
@@ -1953,7 +1968,7 @@ bool PG::queue_scrub()
     state_set(PG_STATE_REPAIR);
     scrubber.must_repair = false;
   }
-  osd->queue_for_scrub(this);
+  requeue_scrub();
   return true;
 }
 
@@ -3365,7 +3380,7 @@ void PG::sub_op_scrub_map(OpRequestRef op)
   scrubber.waiting_on_whom.erase(m->from);
 
   if (scrubber.waiting_on == 0) {
-    osd->scrub_wq.queue(this);
+    requeue_scrub();
   }
 }
 
@@ -3491,7 +3506,6 @@ void PG::schedule_backfill_full_retry()
 
 void PG::clear_scrub_reserved()
 {
-  osd->scrub_wq.dequeue(this);
   scrubber.reserved_peers.clear();
   scrubber.reserve_failed = false;
 
@@ -3782,13 +3796,8 @@ void PG::replica_scrub(
  * scrub will be chunky if all OSDs in PG support chunky scrub
  * scrub will fail if OSDs are too old.
  */
-void PG::scrub(ThreadPool::TPHandle &handle)
+void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
 {
-  lock();
-  if (deleting) {
-    unlock();
-    return;
-  }
   if (g_conf->osd_scrub_sleep > 0 &&
       (scrubber.state == PG::Scrubber::NEW_CHUNK ||
        scrubber.state == PG::Scrubber::INACTIVE)) {
@@ -3800,6 +3809,11 @@ void PG::scrub(ThreadPool::TPHandle &handle)
     lock();
     dout(20) << __func__ << " slept for " << t << dendl;
   }
+  if (deleting || pg_has_reset_since(queued)) {
+    return;
+  }
+  assert(scrub_queued);
+  scrub_queued = false;
 
   if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
     dout(10) << "scrub -- not primary or active or not clean" << dendl;
@@ -3807,7 +3821,6 @@ void PG::scrub(ThreadPool::TPHandle &handle)
     state_clear(PG_STATE_REPAIR);
     state_clear(PG_STATE_DEEP_SCRUB);
     publish_stats_to_osd();
-    unlock();
     return;
   }
 
@@ -3838,8 +3851,6 @@ void PG::scrub(ThreadPool::TPHandle &handle)
   }
 
   chunky_scrub(handle);
-
-  unlock();
 }
 
 /*
@@ -4135,7 +4146,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
 
        if (scrubber.end < hobject_t::get_max()) {
           scrubber.state = PG::Scrubber::NEW_CHUNK;
-          osd->scrub_wq.queue(this);
+         requeue_scrub();
           done = true;
         } else {
           scrubber.state = PG::Scrubber::FINISH;
@@ -4816,6 +4827,7 @@ void PG::start_peering_interval(
   peer_purged.clear();
   actingbackfill.clear();
   snap_trim_queued = false;
+  scrub_queued = false;
 
   // reset primary state?
   if (was_old_primary || is_primary()) {
@@ -5132,6 +5144,8 @@ bool PG::can_discard_request(OpRequestRef& op)
     return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
   case MSG_OSD_EC_READ_REPLY:
     return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+  case MSG_OSD_REP_SCRUB:
+    return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
 
   case MSG_OSD_PG_SCAN:
     return can_discard_scan(op);
@@ -5213,6 +5227,11 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
     return !have_same_or_newer_map(
       cur_epoch,
       static_cast<MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
+
+  case MSG_OSD_REP_SCRUB:
+    return !have_same_or_newer_map(
+      cur_epoch,
+      static_cast<MOSDRepScrub*>(op->get_req())->map_epoch);
   }
   assert(0);
   return false;
index ae87319b40f7c2e760696144181bff9ef1a3401a..b6808eba4eefc0095f419cdf28b315a3826ea773 100644 (file)
@@ -434,8 +434,9 @@ public:
 
   /* You should not use these items without taking their respective queue locks
    * (if they have one) */
-  xlist<PG*>::item recovery_item, scrub_item, stat_queue_item;
+  xlist<PG*>::item recovery_item, stat_queue_item;
   bool snap_trim_queued;
+  bool scrub_queued;
 
   int recovery_ops_active;
   set<pg_shard_t> waiting_on_backfill;
@@ -1182,7 +1183,7 @@ public:
     const hobject_t& soid, list<pair<ScrubMap::object, pg_shard_t> > *ok_peers,
     pg_shard_t bad_peer);
 
-  void scrub(ThreadPool::TPHandle &handle);
+  void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
   void chunky_scrub(ThreadPool::TPHandle &handle);
   void scrub_compare_maps();
   void scrub_process_inconsistent();
@@ -2156,6 +2157,7 @@ public:
   void log_weirdness();
 
   void queue_snap_trim();
+  bool requeue_scrub();
   bool queue_scrub();
 
   /// share pg info after a pg is active
index 3d776a5696c45ef38937ff781d155d0d8fdd653c..737305648d97fa45fc9d4d95fab1cddf35a9f4f5 100644 (file)
@@ -7266,7 +7266,7 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version)
   if (is_primary()) {
     if (scrubber.active) {
       if (last_update_applied == scrubber.subset_last_update) {
-        osd->scrub_wq.queue(this);
+        requeue_scrub();
       }
     } else {
       assert(scrubber.start == scrubber.end);
@@ -8130,7 +8130,7 @@ void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
   waiting_for_blocked_object.erase(p);
 
   if (obc->requeue_scrub_on_unblock)
-    osd->queue_for_scrub(this);
+    requeue_scrub();
 }
 
 SnapSetContext *ReplicatedPG::create_snapset_context(const hobject_t& oid)
@@ -8363,7 +8363,7 @@ void ReplicatedPG::_applied_recovered_object(ObjectContextRef obc)
   // requeue an active chunky scrub waiting on recovery ops
   if (!deleting && active_pushes == 0
       && scrubber.is_chunky_scrub_active()) {
-    osd->scrub_wq.queue(this);
+    requeue_scrub();
   }
 
   unlock();
@@ -8747,7 +8747,6 @@ void ReplicatedPG::on_shutdown()
 
   // remove from queues
   osd->recovery_wq.dequeue(this);
-  osd->scrub_wq.dequeue(this);
   osd->pg_stat_queue_dequeue(this);
   osd->dequeue_pg(this, 0);
   osd->peering_wq.dequeue(this);
@@ -11196,7 +11195,7 @@ void ReplicatedPG::_scrub_digest_updated()
 {
   dout(20) << __func__ << dendl;
   if (--scrubber.num_digest_updates_pending == 0) {
-    osd->scrub_wq.queue(this);
+    requeue_scrub();
   }
 }