]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: fast dispatch, standard op path for MOSDRepScrub
authorSamuel Just <sjust@redhat.com>
Thu, 18 Dec 2014 20:01:57 +0000 (12:01 -0800)
committerSamuel Just <sjust@redhat.com>
Wed, 13 May 2015 21:01:09 +0000 (14:01 -0700)
Previously, it had its own queue, but better to simply use
the OpWQ along with everything else.  It also allows MOSDRepScrub
to be fast dispatched.  We use NORMAL priority since we want the
replica scrub message processed before recovery or client IO
messages -- the primary is blocking writes on the objects until
the replica responds.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 311d0d364810bf5c4cd72b6f97fab80e231336ac..f6e9419019341eca1de7f9bf23a4179b1568f380 100644 (file)
@@ -198,7 +198,6 @@ OSDService::OSDService(OSD *osd) :
   recovery_wq(osd->recovery_wq),
   snap_trim_wq(osd->snap_trim_wq),
   scrub_wq(osd->scrub_wq),
-  rep_scrub_wq(osd->rep_scrub_wq),
   recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
                  &osd->recovery_tp),
   op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
@@ -1545,11 +1544,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
     cct->_conf->osd_scrub_thread_timeout,
     cct->_conf->osd_scrub_thread_suicide_timeout,
     &disk_tp),
-  rep_scrub_wq(
-    this,
-    cct->_conf->osd_scrub_thread_timeout,
-    cct->_conf->osd_scrub_thread_suicide_timeout,
-    &disk_tp),
   remove_wq(
     store,
     cct->_conf->osd_remove_thread_timeout,
@@ -5595,7 +5589,6 @@ void OSD::dispatch_op(OpRequestRef op)
   case MSG_OSD_PG_CREATE:
     handle_pg_create(op);
     break;
-
   case MSG_OSD_PG_NOTIFY:
     handle_pg_notify(op);
     break;
@@ -5696,6 +5689,8 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
   case MSG_OSD_EC_READ_REPLY:
     handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
     break;
+  case MSG_OSD_REP_SCRUB:
+    handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
   default:
     assert(0);
   }
@@ -5740,10 +5735,6 @@ void OSD::_dispatch(Message *m)
     handle_scrub(static_cast<MOSDScrub*>(m));
     break;
 
-  case MSG_OSD_REP_SCRUB:
-    handle_rep_scrub(static_cast<MOSDRepScrub*>(m));
-    break;
-
     // -- need OSDMap --
 
   default:
@@ -5766,26 +5757,6 @@ void OSD::_dispatch(Message *m)
 
 }
 
-void OSD::handle_rep_scrub(MOSDRepScrub *m)
-{
-  dout(10) << __func__ << " " << *m << dendl;
-  if (!require_self_aliveness(m, m->map_epoch)) {
-    m->put();
-    return;
-  }
-  if (!require_osd_peer(m)) {
-    m->put();
-    return;
-  }
-  if (osdmap->get_epoch() >= m->map_epoch &&
-      !require_same_peer_instance(m, osdmap, true)) {
-    m->put();
-    return;
-  }
-
-  rep_scrub_wq.queue(m);
-}
-
 void OSD::handle_scrub(MOSDScrub *m)
 {
   dout(10) << "handle_scrub " << *m << dendl;
index 56ccb4ffc136d51d9a74e57c35a049676b9fe4c4..16cde08bac33593a9b4455d163f221ec245dc345 100644 (file)
@@ -339,7 +339,6 @@ public:
   ThreadPool::WorkQueue<PG> &recovery_wq;
   ThreadPool::WorkQueue<PG> &snap_trim_wq;
   ThreadPool::WorkQueue<PG> &scrub_wq;
-  ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
   GenContextWQ recovery_gen_wq;
   GenContextWQ op_gen_wq;
   ClassHandler  *&class_handler;
@@ -2168,60 +2167,6 @@ protected:
     }
   } scrub_wq;
 
-  struct RepScrubWQ : public ThreadPool::WorkQueue<MOSDRepScrub> {
-  private: 
-    OSD *osd;
-    list<MOSDRepScrub*> rep_scrub_queue;
-
-  public:
-    RepScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
-      : ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, si, tp), osd(o) {}
-
-    bool _empty() {
-      return rep_scrub_queue.empty();
-    }
-    bool _enqueue(MOSDRepScrub *msg) {
-      rep_scrub_queue.push_back(msg);
-      return true;
-    }
-    void _dequeue(MOSDRepScrub *msg) {
-      assert(0); // Not applicable for this wq
-      return;
-    }
-    MOSDRepScrub *_dequeue() {
-      if (rep_scrub_queue.empty())
-       return NULL;
-      MOSDRepScrub *msg = rep_scrub_queue.front();
-      rep_scrub_queue.pop_front();
-      return msg;
-    }
-    void _process(
-      MOSDRepScrub *msg,
-      ThreadPool::TPHandle &handle) {
-      PG *pg = NULL;
-      {
-       Mutex::Locker lock(osd->osd_lock);
-       if (osd->is_stopping() ||
-           !osd->_have_pg(msg->pgid)) {
-         msg->put();
-         return;
-       }
-       pg = osd->_lookup_lock_pg(msg->pgid);
-      }
-      assert(pg);
-      pg->replica_scrub(msg, handle);
-      msg->put();
-      pg->unlock();
-    }
-    void _clear() {
-      while (!rep_scrub_queue.empty()) {
-       MOSDRepScrub *msg = rep_scrub_queue.front();
-       rep_scrub_queue.pop_front();
-       msg->put();
-      }
-    }
-  } rep_scrub_wq;
-
   // -- removing --
   struct RemoveWQ :
     public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
@@ -2274,6 +2219,7 @@ protected:
     case MSG_OSD_EC_WRITE_REPLY:
     case MSG_OSD_EC_READ:
     case MSG_OSD_EC_READ_REPLY:
+    case MSG_OSD_REP_SCRUB:
       return true;
     default:
       return false;
@@ -2327,7 +2273,6 @@ private:
   static int write_meta(ObjectStore *store,
                        uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
 
-  void handle_rep_scrub(MOSDRepScrub *m);
   void handle_scrub(struct MOSDScrub *m);
   void handle_osd_ping(class MOSDPing *m);
   void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
index 7d338bc640a4c91516e8bfff17befaa68bb7d494..7bbf7626ccbf7984b60e3fd5f5f2771660ca142e 100644 (file)
@@ -3377,6 +3377,8 @@ void PG::_request_scrub_map(
     spg_t(info.pgid.pgid, replica.shard), version,
     get_osdmap()->get_epoch(),
     start, end, deep, seed);
+  // default priority, we want the rep scrub processed prior to any recovery
+  // or client io messages (we are holding a lock!)
   osd->send_message_osd_cluster(
     replica.osd, repscrubop, get_osdmap()->get_epoch());
 }
@@ -3715,9 +3717,10 @@ void PG::repair_object(
  * scrubmap of objects that are in the range [msg->start, msg->end).
  */
 void PG::replica_scrub(
-  MOSDRepScrub *msg,
+  OpRequestRef op,
   ThreadPool::TPHandle &handle)
 {
+  MOSDRepScrub *msg = static_cast<MOSDRepScrub *>(op->get_req());
   assert(!scrubber.active_rep_scrub);
   dout(7) << "replica_scrub" << dendl;
 
@@ -3733,15 +3736,14 @@ void PG::replica_scrub(
   assert(msg->chunky);
   if (last_update_applied < msg->scrub_to) {
     dout(10) << "waiting for last_update_applied to catch up" << dendl;
-    scrubber.active_rep_scrub = msg;
+    scrubber.active_rep_scrub = op;
     msg->get();
     return;
   }
 
   if (active_pushes > 0) {
     dout(10) << "waiting for active pushes to finish" << dendl;
-    scrubber.active_rep_scrub = msg;
-    msg->get();
+    scrubber.active_rep_scrub = op;
     return;
   }
 
index d51c2c96c79c3114a87b01b028b3f82318e9589b..2f4dc35b110d58d8285864bc56961c51abae4f76 100644 (file)
@@ -1035,7 +1035,6 @@ public:
       epoch_start(0),
       active(false), queue_snap_trim(false),
       waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0),
-      active_rep_scrub(0),
       must_scrub(false), must_deep_scrub(false), must_repair(false),
       num_digest_updates_pending(0),
       state(INACTIVE),
@@ -1059,7 +1058,7 @@ public:
     int fixed;
     ScrubMap primary_scrubmap;
     map<pg_shard_t, ScrubMap> received_maps;
-    MOSDRepScrub *active_rep_scrub;
+    OpRequestRef active_rep_scrub;
     utime_t scrub_reg_stamp;  // stamp we registered for
 
     // flags to indicate explicitly requested scrubs (by admin)
@@ -1145,8 +1144,7 @@ public:
       waiting_on = 0;
       waiting_on_whom.clear();
       if (active_rep_scrub) {
-        active_rep_scrub->put();
-        active_rep_scrub = NULL;
+        active_rep_scrub = OpRequestRef();
       }
       received_maps.clear();
 
@@ -1230,7 +1228,7 @@ public:
   void unreg_next_scrub();
 
   void replica_scrub(
-    struct MOSDRepScrub *op,
+    OpRequestRef op,
     ThreadPool::TPHandle &handle);
   void sub_op_scrub_map(OpRequestRef op);
   void sub_op_scrub_reserve(OpRequestRef op);
index 4c549a5fd3d39bbaca0b7adc821744054b473115..f2225b7eef22d7a8b07040957f320272e900e0d6 100644 (file)
@@ -1315,6 +1315,10 @@ void ReplicatedPG::do_request(
     do_backfill(op);
     break;
 
+  case MSG_OSD_REP_SCRUB:
+    replica_scrub(op, handle);
+    break;
+
   default:
     assert(0 == "bad message type in do_request");
   }
@@ -7272,9 +7276,13 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version)
     }
   } else {
     if (scrubber.active_rep_scrub) {
-      if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
-       osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
-       scrubber.active_rep_scrub = 0;
+      if (last_update_applied == static_cast<MOSDRepScrub*>(
+           scrubber.active_rep_scrub->get_req())->scrub_to) {
+       osd->op_wq.queue(
+         make_pair(
+           this,
+           scrubber.active_rep_scrub));
+       scrubber.active_rep_scrub = OpRequestRef();
       }
     }
   }
@@ -8374,9 +8382,13 @@ void ReplicatedPG::_applied_recovered_object_replica()
 
   // requeue an active chunky scrub waiting on recovery ops
   if (!deleting && active_pushes == 0 &&
-      scrubber.active_rep_scrub && scrubber.active_rep_scrub->chunky) {
-    osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
-    scrubber.active_rep_scrub = 0;
+      scrubber.active_rep_scrub && static_cast<MOSDRepScrub*>(
+       scrubber.active_rep_scrub->get_req())->chunky) {
+    osd->op_wq.queue(
+      make_pair(
+       this,
+       scrubber.active_rep_scrub));
+    scrubber.active_rep_scrub = OpRequestRef();
   }
 
   unlock();