]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: use PrioritizedQueue for OpWQ
authorSamuel Just <sam.just@inktank.com>
Mon, 1 Oct 2012 23:11:40 +0000 (16:11 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 13 Nov 2012 18:45:00 +0000 (10:45 -0800)
The OpWQ PriorityQueue replaces OSD::op_queue, PG::op_queue,
and PG::qlock.  The syncronization is now done as part of the
usual WorkQueue syncronization pattern.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index 66a55863a52987c272f74d55be4697ed2a00fcdd..dc64e08c8b213779dfeb75d3ad117fc43fde4034 100644 (file)
@@ -728,7 +728,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   finished_lock("OSD::finished_lock"),
   admin_ops_hook(NULL),
   historic_ops_hook(NULL),
-  op_queue_len(0),
   op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
   peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
   map_lock("OSD::map_lock"),
@@ -5609,37 +5608,84 @@ bool OSD::op_is_discardable(MOSDOp *op)
 void OSD::enqueue_op(PG *pg, OpRequestRef op)
 {
   dout(15) << "enqueue_op " << op << " " << *(op->request) << dendl;
-  pg->queue_op(op);
+  op_wq.queue(make_pair(PGRef(pg), op));
 }
 
-bool OSD::OpWQ::_enqueue(PG *pg)
+void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
 {
-  pg->get();
-  osd->op_queue.push_back(pg);
-  osd->op_queue_len++;
-  osd->logger->set(l_osd_opq, osd->op_queue_len);
-  return true;
+  pqueue.enqueue(item.second->request->get_source_inst(),
+                1, 1, item);
+  osd->logger->set(l_osd_opq, pqueue.length());
 }
 
-PG *OSD::OpWQ::_dequeue()
+void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
 {
-  if (osd->op_queue.empty())
-    return NULL;
-  PG *pg = osd->op_queue.front();
-  osd->op_queue.pop_front();
-  osd->op_queue_len--;
-  osd->logger->set(l_osd_opq, osd->op_queue_len);
+  {
+    Mutex::Locker l(qlock);
+    if (pg_for_processing.count(&*(item.first))) {
+      pg_for_processing[&*(item.first)].push_front(item.second);
+      item.second = pg_for_processing[&*(item.first)].back();
+      pg_for_processing[&*(item.first)].pop_back();
+    }
+  }
+  pqueue.enqueue_front(item.second->request->get_source_inst(),
+                      1, 1, item);
+  osd->logger->set(l_osd_opq, pqueue.length());
+}
+
+PGRef OSD::OpWQ::_dequeue()
+{
+  assert(!pqueue.empty());
+  PGRef pg;
+  {
+    Mutex::Locker l(qlock);
+    pair<PGRef, OpRequestRef> ret = pqueue.dequeue();
+    pg = ret.first;
+    pg_for_processing[&*pg].push_back(ret.second);
+  }
+  osd->logger->set(l_osd_opq, pqueue.length());
   return pg;
 }
 
-void OSDService::queue_for_peering(PG *pg)
+void OSD::OpWQ::_process(PGRef pg)
 {
-  peering_wq.queue(pg);
+  pg->lock();
+  OpRequestRef op;
+  {
+    Mutex::Locker l(qlock);
+    assert(pg_for_processing.count(&*pg));
+    assert(pg_for_processing[&*pg].size());
+    op = pg_for_processing[&*pg].front();
+    pg_for_processing[&*pg].pop_front();
+    if (!(pg_for_processing[&*pg].size()))
+      pg_for_processing.erase(&*pg);
+  }
+  osd->dequeue_op(pg, op);
+  pg->unlock();
+}
+
+/*
+ * NOTE: dequeue called in worker thread, with pg lock
+ */
+void OSD::dequeue_op(PGRef pg, OpRequestRef op)
+{
+  dout(10) << "dequeue_op " << op << " " << *(op->request)
+          << " pg " << *pg << dendl;
+  if (pg->deleting)
+    return;
+
+  op->mark_reached_pg();
+
+  pg->do_request(op);
+
+  // finish
+  dout(10) << "dequeue_op " << op << " finish" << dendl;
 }
 
-void OSDService::queue_for_op(PG *pg)
+
+void OSDService::queue_for_peering(PG *pg)
 {
-  op_wq.queue(pg);
+  peering_wq.queue(pg);
 }
 
 void OSD::process_peering_events(const list<PG*> &pgs)
@@ -5678,44 +5724,6 @@ void OSD::process_peering_events(const list<PG*> &pgs)
   service.send_pg_temp();
 }
 
-/*
- * NOTE: dequeue called in worker thread, without osd_lock
- */
-void OSD::dequeue_op(PG *pg)
-{
-  OpRequestRef op;
-
-  pg->lock();
-  if (pg->deleting) {
-    pg->unlock();
-    pg->put();
-    return;
-  }
-
-  pg->lockq();
-  assert(!pg->op_queue.empty());
-  op = pg->op_queue.front();
-  pg->op_queue.pop_front();
-  pg->unlockq();
-    
-  dout(10) << "dequeue_op " << op << " " << *op->request << " pg " << *pg << dendl;
-
-  op->mark_reached_pg();
-
-  pg->do_request(op);
-
-  // unlock and put pg
-  pg->unlock();
-  pg->put();
-  
-  //#warning foo
-  //scrub_wq.queue(pg);
-
-  // finish
-  dout(10) << "dequeue_op " << op << " finish" << dendl;
-}
-
-
 // --------------------------------
 
 int OSD::init_op_flags(MOSDOp *op)
index 23a8729202f7930f49ff827c2cdd452cbe8eecf5..4b018d7f5d1f568231b3a62a9e1f62b24fdb6415 100644 (file)
@@ -53,6 +53,7 @@ using namespace __gnu_cxx;
 #include "common/shared_cache.hpp"
 #include "common/simple_cache.hpp"
 #include "common/sharedptr_registry.hpp"
+#include "common/PrioritizedQueue.h"
 
 #define CEPH_OSD_PROTOCOL    10 /* cluster internal */
 
@@ -172,7 +173,7 @@ public:
   Messenger *&client_messenger;
   PerfCounters *&logger;
   MonClient   *&monc;
-  ThreadPool::WorkQueue<PG> &op_wq;
+  ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
   ThreadPool::WorkQueue<PG> &recovery_wq;
   ThreadPool::WorkQueue<PG> &snap_trim_wq;
@@ -279,7 +280,6 @@ public:
   void send_pg_temp();
 
   void queue_for_peering(PG *pg);
-  void queue_for_op(PG *pg);
   bool queue_for_recovery(PG *pg);
   bool queue_for_snap_trim(PG *pg) {
     return snap_trim_wq.queue(pg);
@@ -569,44 +569,43 @@ private:
   HistoricOpsSocketHook *historic_ops_hook;
 
   // -- op queue --
-  list<PG*> op_queue;
-  int op_queue_len;
 
-  struct OpWQ : public ThreadPool::WorkQueue<PG> {
+  struct OpWQ: public ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>,
+                                              PGRef > {
+    Mutex qlock;
+    map<PG*, list<OpRequestRef> > pg_for_processing;
     OSD *osd;
+    PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
     OpWQ(OSD *o, time_t ti, ThreadPool *tp)
-      : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {}
-
-    bool _enqueue(PG *pg);
-    void _dequeue(PG *pg) {
-      for (list<PG*>::iterator i = osd->op_queue.begin();
-          i != osd->op_queue.end();
-          ) {
-       if (*i == pg) {
-         osd->op_queue.erase(i++);
-         pg->put();
-       } else {
-         ++i;
-       }
+      : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
+       "OSD::OpWQ", ti, ti*10, tp),
+       qlock("OpWQ::qlock"),
+       osd(o) {}
+
+    void _enqueue_front(pair<PGRef, OpRequestRef> item);
+    void _enqueue(pair<PGRef, OpRequestRef> item);
+    PGRef _dequeue();
+
+    struct Pred {
+      PG *pg;
+      Pred(PG *pg) : pg(pg) {}
+      bool operator()(const pair<PGRef, OpRequestRef> &op) {
+       return op.first == pg;
       }
+    };
+    void dequeue(PG *pg) {
+      lock();
+      pqueue.remove_by_filter(Pred(pg));
+      unlock();
     }
     bool _empty() {
-      return osd->op_queue.empty();
-    }
-    PG *_dequeue();
-    void _process(PG *pg) {
-      osd->dequeue_op(pg);
-    }
-    void _clear() {
-      assert(osd->op_queue.empty());
+      return pqueue.empty();
     }
+    void _process(PGRef pg);
   } op_wq;
 
   void enqueue_op(PG *pg, OpRequestRef op);
-  void dequeue_op(PG *pg);
-  static void static_dequeueop(OSD *o, PG *pg) {
-    o->dequeue_op(pg);
-  };
+  void dequeue_op(PGRef pg, OpRequestRef op);
 
   // -- peering queue --
   struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
index 87fd4621bb14aba58cb988706676b9699f34372e..a1e232fbd97e48a498082e0a446f04438aecc87a 100644 (file)
@@ -63,7 +63,6 @@ PG::PG(OSDService *o, OSDMapRef curmap,
        const hobject_t& ioid) :
   osd(o), osdmap_ref(curmap), pool(_pool),
   _lock("PG::_lock"),
-  _qlock("PG::_qlock"),
   ref(0), deleting(false), dirty_info(false), dirty_log(false),
   info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
   recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
@@ -2768,12 +2767,12 @@ void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
 void PG::requeue_ops(list<OpRequestRef> &ls)
 {
   dout(15) << " requeue_ops " << ls << dendl;
-  lockq();
-  assert(&ls != &op_queue);
-  size_t requeue_size = ls.size();
-  op_queue.splice(op_queue.begin(), ls, ls.begin(), ls.end());
-  for (size_t i = 0; i < requeue_size; ++i) osd->queue_for_op(this);
-  unlockq();
+  for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
+       i != ls.rend();
+       ++i) {
+    osd->op_wq.queue_front(make_pair(PGRef(this), *i));
+  }
+  ls.clear();
 }
 
 
@@ -4688,14 +4687,6 @@ bool PG::must_delay_request(OpRequestRef op)
   return false;
 }
 
-void PG::queue_op(OpRequestRef op)
-{
-  _qlock.Lock();
-  op_queue.push_back(op);
-  osd->queue_for_op(this);
-  _qlock.Unlock();
-}
-
 void PG::take_waiters()
 {
   dout(10) << "take_waiters" << dendl;
index 13d529f4d6ebb7b087417ee0b54333bb73d2b154..151b6705e9f38a499414124498f47ef36d692bce 100644 (file)
@@ -348,7 +348,6 @@ protected:
    * put_unlock() when done with the current pointer (_most common_).
    */  
   Mutex _lock;
-  Mutex _qlock;
   Cond _cond;
   atomic_t ref;
 
@@ -356,18 +355,12 @@ public:
   bool deleting;  // true while RemoveWQ should be chewing on us
 
   void lock(bool no_lockdep = false);
-  void lockq(bool no_lockdep = false) {
-    _qlock.Lock(no_lockdep);
-  }
   void unlock() {
     //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
     assert(!dirty_info);
     assert(!dirty_log);
     _lock.Unlock();
   }
-  void unlockq() {
-    _qlock.Unlock();
-  }
 
   /* During handle_osd_map, the osd holds a write lock to the osdmap.
    * *_with_map_lock_held assume that the map_lock is already held */
@@ -403,8 +396,6 @@ public:
   }
 
 
-  list<OpRequestRef> op_queue;  // op queue
-
   bool dirty_info, dirty_log;
 
 public:
@@ -1732,7 +1723,6 @@ public:
   bool can_discard_request(OpRequestRef op);
 
   bool must_delay_request(OpRequestRef op);
-  void queue_op(OpRequestRef op);
 
   bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
   bool old_peering_evt(CephPeeringEvtRef evt) {
@@ -1810,4 +1800,6 @@ ostream& operator<<(ostream& out, const PG& pg);
 void intrusive_ptr_add_ref(PG *pg);
 void intrusive_ptr_release(PG *pg);
 
+typedef boost::intrusive_ptr<PG> PGRef;
+
 #endif