]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: add PGQueueable
authorSamuel Just <sjust@redhat.com>
Fri, 29 May 2015 18:42:52 +0000 (11:42 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 29 May 2015 18:42:52 +0000 (11:42 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h

index f6e9419019341eca1de7f9bf23a4179b1568f380..246c4f71ae16d6b2e1a339cbcc1ce6f993b1f75b 100644 (file)
@@ -152,6 +152,10 @@ static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
   return *_dout << "osd." << whoami << " " << epoch << " ";
 }
 
+void PGQueueable::RunVis::operator()(OpRequestRef &op) {
+  return osd->dequeue_op(pg, op, handle);
+}
+
 //Initial features in new superblock.
 //Features here are also automatically upgraded
 CompatSet OSD::get_osd_initial_compat_set() {
@@ -1272,7 +1276,10 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
 
 void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
 {
-  osd->op_shardedwq.dequeue(pg, dequeued);
+  if (dequeued)
+    osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued);
+  else
+    osd->op_shardedwq.dequeue(pg);
 }
 
 void OSDService::queue_for_peering(PG *pg)
@@ -8194,7 +8201,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
       return;
     }
   }
-  pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+  pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
   sdata->pg_for_processing[&*(item.first)].push_back(item.second);
   sdata->sdata_op_ordering_lock.Unlock();
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, 
@@ -8202,7 +8209,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
 
   (item.first)->lock_suspend_timeout(tp_handle);
 
-  OpRequestRef op;
+  boost::optional<PGQueueable> op;
   {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     if (!sdata->pg_for_processing.count(&*(item.first))) {
@@ -8220,7 +8227,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
   // and will begin to be handled by a worker thread.
   {
 #ifdef WITH_LTTNG
-    osd_reqid_t reqid = op->get_reqid();
+    osd_reqid_t reqid;
+    if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
+      reqid = (*_op)->get_reqid();
+    }
 #endif
     tracepoint(osd, opwq_process_start, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
@@ -8235,11 +8245,14 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
   delete f;
   *_dout << dendl;
 
-  osd->dequeue_op(item.first, op, tp_handle);
+  op->run(osd, item.first, tp_handle);
 
   {
 #ifdef WITH_LTTNG
-    osd_reqid_t reqid = op->get_reqid();
+    osd_reqid_t reqid;
+    if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
+      reqid = (*_op)->get_reqid();
+    }
 #endif
     tracepoint(osd, opwq_process_finish, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
@@ -8248,21 +8261,22 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
   (item.first)->unlock();
 }
 
-void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
 
   uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
 
   ShardData* sdata = shard_list[shard_index];
   assert (NULL != sdata);
-  unsigned priority = item.second->get_req()->get_priority();
-  unsigned cost = item.second->get_req()->get_cost();
+  unsigned priority = item.second.get_priority();
+  unsigned cost = item.second.get_cost();
   sdata->sdata_op_ordering_lock.Lock();
  
   if (priority >= CEPH_MSG_PRIO_LOW)
     sdata->pqueue.enqueue_strict(
-      item.second->get_req()->get_source_inst(), priority, item);
+      item.second.get_owner(), priority, item);
   else
-    sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+    sdata->pqueue.enqueue(
+      item.second.get_owner(),
       priority, cost, item);
   sdata->sdata_op_ordering_lock.Unlock();
 
@@ -8272,7 +8286,7 @@ void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
 
 }
 
-void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
 
   uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
 
@@ -8284,13 +8298,15 @@ void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
     item.second = sdata->pg_for_processing[&*(item.first)].back();
     sdata->pg_for_processing[&*(item.first)].pop_back();
   }
-  unsigned priority = item.second->get_req()->get_priority();
-  unsigned cost = item.second->get_req()->get_cost();
+  unsigned priority = item.second.get_priority();
+  unsigned cost = item.second.get_cost();
   if (priority >= CEPH_MSG_PRIO_LOW)
     sdata->pqueue.enqueue_strict_front(
-      item.second->get_req()->get_source_inst(),priority, item);
+      item.second.get_owner(),
+      priority, item);
   else
-    sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+    sdata->pqueue.enqueue_front(
+      item.second.get_owner(),
       priority, cost, item);
 
   sdata->sdata_op_ordering_lock.Unlock();
index 33c8e94617dc30c4a82a12eea6000ba45293686e..78d33959d8493745a72cc4366577825bed55d92f 100644 (file)
@@ -316,6 +316,44 @@ public:
 typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
 
 class OSD;
+class PGQueueable {
+  typedef boost::variant<
+    OpRequestRef
+    > QVariant;
+  QVariant qvariant;
+  int cost; 
+  unsigned priority;
+  utime_t start_time;
+  entity_inst_t owner;
+  struct RunVis : public boost::static_visitor<> {
+    OSD *osd;
+    PGRef &pg;
+    ThreadPool::TPHandle &handle;
+    RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
+      : osd(osd), pg(pg), handle(handle) {}
+    void operator()(OpRequestRef &op);
+  };
+public:
+  PGQueueable(OpRequestRef op)
+    : qvariant(op), cost(op->get_req()->get_cost()),
+      priority(op->get_req()->get_priority()),
+      start_time(op->get_req()->get_recv_stamp()),
+      owner(op->get_req()->get_source_inst())
+    {}
+  boost::optional<OpRequestRef> maybe_get_op() {
+    OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
+    return op ? *op : boost::optional<OpRequestRef>();
+  }
+  void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
+    RunVis v(osd, pg, handle);
+    boost::apply_visitor(v, qvariant);
+  }
+  unsigned get_priority() const { return priority; }
+  int get_cost() const { return cost; }
+  utime_t get_start_time() const { return start_time; }
+  entity_inst_t get_owner() const { return owner; }
+};
+
 class OSDService {
 public:
   OSD *osd;
@@ -334,7 +372,7 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > &op_wq;
+  ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
   ThreadPool::WorkQueue<PG> &recovery_wq;
   ThreadPool::WorkQueue<PG> &snap_trim_wq;
@@ -1456,15 +1494,15 @@ private:
 
   // -- op queue --
 
-  class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > {
+  friend class PGQueueable;
+  class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
 
     struct ShardData {
       Mutex sdata_lock;
       Cond sdata_cond;
       Mutex sdata_op_ordering_lock;
-      map<PG*, list<OpRequestRef> > pg_for_processing;
-      PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
+      map<PG*, list<PGQueueable> > pg_for_processing;
+      PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t> pqueue;
       ShardData(
        string lock_name, string ordering_lock,
        uint64_t max_tok_per_prio, uint64_t min_cost)
@@ -1479,7 +1517,7 @@ private:
 
   public:
     ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
-      ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
+      ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> >(ti, si, tp),
       osd(o), num_shards(pnum_shards) {
       for(uint32_t i = 0; i < num_shards; i++) {
        char lock_name[32] = {0};
@@ -1504,8 +1542,8 @@ private:
     }
 
     void _process(uint32_t thread_index, heartbeat_handle_d *hb);
-    void _enqueue(pair <PGRef, OpRequestRef> item);
-    void _enqueue_front(pair <PGRef, OpRequestRef> item);
+    void _enqueue(pair <PGRef, PGQueueable> item);
+    void _enqueue_front(pair <PGRef, PGQueueable> item);
       
     void return_waiting_threads() {
       for(uint32_t i = 0; i < num_shards; i++) {
@@ -1534,38 +1572,52 @@ private:
     struct Pred {
       PG *pg;
       Pred(PG *pg) : pg(pg) {}
-      bool operator()(const pair<PGRef, OpRequestRef> &op) {
+      bool operator()(const pair<PGRef, PGQueueable> &op) {
        return op.first == pg;
       }
     };
 
-    void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
+    void dequeue(PG *pg) {
       ShardData* sdata = NULL;
       assert(pg != NULL);
       uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
       sdata = shard_list[shard_index];
       assert(sdata != NULL);
-      if (!dequeued) {
-       sdata->sdata_op_ordering_lock.Lock();
-       sdata->pqueue.remove_by_filter(Pred(pg));
-       sdata->pg_for_processing.erase(pg);
-       sdata->sdata_op_ordering_lock.Unlock();
-      } else {
-       list<pair<PGRef, OpRequestRef> > _dequeued;
-       sdata->sdata_op_ordering_lock.Lock();
-       sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
-       for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
-            i != _dequeued.end(); ++i) {
-         dequeued->push_back(i->second);
-       }
-       if (sdata->pg_for_processing.count(pg)) {
-         dequeued->splice(
-           dequeued->begin(),
-           sdata->pg_for_processing[pg]);
-         sdata->pg_for_processing.erase(pg);
+      sdata->sdata_op_ordering_lock.Lock();
+      sdata->pqueue.remove_by_filter(Pred(pg));
+      sdata->pg_for_processing.erase(pg);
+      sdata->sdata_op_ordering_lock.Unlock();
+    }
+
+    void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
+      ShardData* sdata = NULL;
+      assert(pg != NULL);
+      uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+      sdata = shard_list[shard_index];
+      assert(sdata != NULL);
+      assert(dequeued);
+      list<pair<PGRef, PGQueueable> > _dequeued;
+      sdata->sdata_op_ordering_lock.Lock();
+      sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+      for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
+          i != _dequeued.end(); ++i) {
+       boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
+       if (mop)
+         dequeued->push_back(*mop);
+      }
+      map<PG *, list<PGQueueable> >::iterator iter =
+       sdata->pg_for_processing.find(pg);
+      if (iter != sdata->pg_for_processing.end()) {
+       for (list<PGQueueable>::reverse_iterator i = iter->second.rbegin();
+            i != iter->second.rend();
+            ++i) {
+         boost::optional<OpRequestRef> mop = i->maybe_get_op();
+         if (mop)
+           dequeued->push_front(*mop);
        }
-       sdata->sdata_op_ordering_lock.Unlock();          
+       sdata->pg_for_processing.erase(iter);
       }
+      sdata->sdata_op_ordering_lock.Unlock();
     }
  
     bool is_shard_empty(uint32_t thread_index) {