]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move pgid into OpQueueItem
authorKefu Chai <kchai@redhat.com>
Wed, 12 Jul 2017 07:19:13 +0000 (15:19 +0800)
committermyoungwon oh <omwmw@sk.com>
Tue, 10 Oct 2017 14:00:45 +0000 (23:00 +0900)
this pave the road to generalize OpWQ

We're going to want to be able to queue things that are not ordered by
the PG lock.  To that end, this patch genearlizes OSD::ShardedOpWQ to
use a type which can specify an ordering token and locking structure
other than a PG.

There is a lot of collateral damage which I didn't feel was worth
separating out into other commits.  The code in ShardedOpWQ itself got
some superficial cleanup.  Also, the item being queued has been switched
to not use a boost::variant.  It was a cute way before to make the type
easily copyable, but adding more visitors for the locking support would
have been annoying.  Instead, the variant is a unique_ptr to an
interface.  This makes the queue item type no longer copyable, which is
just as well since we don't really want to be copying queue items anyway
(duplicates would be most likely a bug)

Signed-off-by: Samuel Just <sjust@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
Signed-off-by: Myoungwon Oh <omwmw@sk.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpQueueItem.cc
src/osd/OpQueueItem.h
src/osd/PG.cc
src/osd/PrimaryLogPG.cc
src/osd/mClockClientQueue.cc
src/osd/mClockClientQueue.h
src/osd/mClockOpClassQueue.cc
src/osd/mClockOpClassQueue.h

index 211cdfa0d82f27c31d4ae9ebce61fb9a7e5dcb4b..81f71d41be0a05e7c2e1aecd74e0d328ac1283a5 100644 (file)
@@ -1674,14 +1674,14 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
               << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
 }
 
-void OSDService::enqueue_back(spg_t pgid, OpQueueItem&& qi)
+void OSDService::enqueue_back(OpQueueItem&& qi)
 {
-  osd->op_shardedwq.queue(make_pair(pgid, std::move(qi)));
+  osd->op_shardedwq.queue(std::move(qi));
 }
 
-void OSDService::enqueue_front(spg_t pgid, OpQueueItem&& qi)
+void OSDService::enqueue_front(OpQueueItem&& qi)
 {
-  osd->op_shardedwq.queue_front(make_pair(pgid, std::move(qi)));
+  osd->op_shardedwq.queue_front(std::move(qi));
 }
 
 void OSDService::queue_for_peering(PG *pg)
@@ -1691,19 +1691,119 @@ void OSDService::queue_for_peering(PG *pg)
 
 void OSDService::queue_for_snap_trim(PG *pg)
 {
+  class PGSnapTrim : public PGOpQueueable {
+    epoch_t epoch_queued;
+  public:
+    PGSnapTrim(
+      spg_t pg,
+      epoch_t epoch_queued)
+      : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+    op_type_t get_op_type() const override final {
+      return op_type_t::bg_snaptrim;
+    }
+    ostream &print(ostream &rhs) const override final {
+      return rhs << "PGSnapTrim(pgid=" << get_pgid()
+                << "epoch_queued=" << epoch_queued
+                << ")";
+    }
+    void run(
+      OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+      pg->snap_trimmer(epoch_queued);
+    }
+  };
   dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
-  osd->op_shardedwq.queue(
-    make_pair(
-      pg->pg_id,
-      OpQueueItem(
-       PGSnapTrim(pg->get_osdmap()->get_epoch()),
-       cct->_conf->osd_snap_trim_cost,
-       cct->_conf->osd_snap_trim_priority,
-       ceph_clock_now(),
-       0,
-       pg->get_osdmap()->get_epoch())));
+  enqueue_back(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(
+       new PGSnapTrim(pg->pg_id, pg->get_osdmap()->get_epoch())),
+      cct->_conf->osd_snap_trim_cost,
+      cct->_conf->osd_snap_trim_priority,
+      ceph_clock_now(),
+      0,
+      pg->get_osdmap()->get_epoch()));
 }
 
+void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
+{
+  class PGScrub : public PGOpQueueable {
+    epoch_t epoch_queued;
+  public:
+    PGScrub(
+      spg_t pg,
+      epoch_t epoch_queued)
+      : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+    op_type_t get_op_type() const override final {
+      return op_type_t::bg_scrub;
+    }
+    ostream &print(ostream &rhs) const override final {
+      return rhs << "PGScrub(pgid=" << get_pgid()
+                << "epoch_queued=" << epoch_queued
+                << ")";
+    }
+    void run(
+      OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+      pg->scrub(epoch_queued, handle);
+    }
+  };
+  unsigned scrub_queue_priority = pg->scrubber.priority;
+  if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
+    scrub_queue_priority = cct->_conf->osd_client_op_priority;
+  }
+  const auto epoch = pg->get_osdmap()->get_epoch();
+  enqueue_back(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(pg->info.pgid, epoch)),
+      cct->_conf->osd_snap_trim_cost,
+      scrub_queue_priority,
+      ceph_clock_now(),
+      0,
+      epoch));
+}
+
+void OSDService::_queue_for_recovery(
+  std::pair<epoch_t, PGRef> p,
+  uint64_t reserved_pushes)
+{
+  class PGRecovery : public PGOpQueueable {
+    epoch_t epoch_queued;
+    uint64_t reserved_pushes;
+  public:
+    PGRecovery(
+      spg_t pg,
+      epoch_t epoch_queued,
+      uint64_t reserved_pushes)
+      : PGOpQueueable(pg),
+       epoch_queued(epoch_queued),
+       reserved_pushes(reserved_pushes) {}
+    op_type_t get_op_type() const override final {
+      return op_type_t::bg_recovery;
+    }
+    virtual ostream &print(ostream &rhs) const override final {
+      return rhs << "PGRecovery(pgid=" << get_pgid()
+                << "epoch_queued=" << epoch_queued
+                << "reserved_pushes=" << reserved_pushes
+                << ")";
+    }
+    virtual uint64_t get_reserved_pushes() const override final {
+      return reserved_pushes;
+    }
+    virtual void run(
+      OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+      osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
+    }
+  };
+  assert(recovery_lock.is_locked_by_me());
+  enqueue_back(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(
+       new PGRecovery(
+         p.second->info.pgid, p.first, reserved_pushes)),
+      cct->_conf->osd_recovery_cost,
+      cct->_conf->osd_recovery_priority,
+      ceph_clock_now(),
+      0,
+      p.first));
+}
 
 // ====================================================================
 // OSD
@@ -8909,11 +9009,16 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
   op->osd_trace.keyval("cost", op->get_req()->get_cost());
   op->mark_queued_for_pg();
   logger->tinc(l_osd_op_before_queue_op_lat, latency);
-  op_shardedwq.queue(make_pair(pg, OpQueueItem(op, epoch)));
+  op_shardedwq.queue(
+    OpQueueItem(
+      unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, op)),
+      op->get_req()->get_cost(),
+      op->get_req()->get_priority(),
+      op->get_req()->get_recv_stamp(),
+      op->get_req()->get_source().num(),
+      epoch));
 }
 
-
-
 /*
  * NOTE: dequeue called in worker thread, with pg lock
  */
@@ -9454,7 +9559,7 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
       for (auto i = p->second.to_process.rbegin();
           i != p->second.to_process.rend();
           ++i) {
-       sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
+       sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
       }
       for (auto& q : p->second.to_process) {
        pushes_to_free += q.get_reserved_pushes();
@@ -9548,9 +9653,8 @@ void OSD::ShardedOpWQ::clear_pg_slots()
 void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 {
   uint32_t shard_index = thread_index % num_shards;
-  ShardData *sdata = shard_list[shard_index];
-  assert(NULL != sdata);
-
+  auto& sdata = shard_list[shard_index];
+  assert(sdata);
   // peek at spg_t
   sdata->sdata_op_ordering_lock.Lock();
   if (sdata->pqueue->empty()) {
@@ -9569,30 +9673,31 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       return;
     }
   }
-  pair<spg_t, OpQueueItem> item = sdata->pqueue->dequeue();
+  OpQueueItem item = sdata->pqueue->dequeue();
   if (osd->is_stopping()) {
     sdata->sdata_op_ordering_lock.Unlock();
     return;    // OSD shutdown, discard.
   }
   PGRef pg;
   uint64_t requeue_seq;
+  const auto token = item.get_ordering_token();
   {
-    auto& slot = sdata->pg_slots[item.first];
-    dout(30) << __func__ << " " << item.first
+    auto& slot = sdata->pg_slots[token];
+    dout(30) << __func__ << " " << token
             << " to_process " << slot.to_process
             << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
-    slot.to_process.push_back(item.second);
+    slot.to_process.push_back(std::move(item));
     // note the requeue seq now...
     requeue_seq = slot.requeue_seq;
     if (slot.waiting_for_pg) {
       // save ourselves a bit of effort
-      dout(20) << __func__ << " " << item.first << " item " << item.second
+      dout(20) << __func__ << slot.to_process.back()
               << " queued, waiting_for_pg" << dendl;
       sdata->sdata_op_ordering_lock.Unlock();
       return;
     }
     pg = slot.pg;
-    dout(20) << __func__ << " " << item.first << " item " << item.second
+    dout(20) << __func__ << " " << slot.to_process.back()
             << " queued" << dendl;
     ++slot.num_running;
   }
@@ -9602,27 +9707,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
   // [lookup +] lock pg (if we have it)
   if (!pg) {
-    pg = osd->_lookup_lock_pg(item.first);
+    pg = osd->_lookup_lock_pg(token);
   } else {
     pg->lock();
   }
 
   osd->service.maybe_inject_dispatch_delay();
 
-  boost::optional<OpQueueItem> qi;
-
   // we don't use a Mutex::Locker here because of the
   // osd->service.release_reserved_pushes() call below
   sdata->sdata_op_ordering_lock.Lock();
 
-  auto q = sdata->pg_slots.find(item.first);
+  auto q = sdata->pg_slots.find(token);
   assert(q != sdata->pg_slots.end());
   auto& slot = q->second;
   --slot.num_running;
 
   if (slot.to_process.empty()) {
     // raced with wake_pg_waiters or prune_pg_waiters
-    dout(20) << __func__ << " " << item.first << " nothing queued" << dendl;
+    dout(20) << __func__ << " " << token
+            << " nothing queued" << dendl;
     if (pg) {
       pg->unlock();
     }
@@ -9630,7 +9734,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     return;
   }
   if (requeue_seq != slot.requeue_seq) {
-    dout(20) << __func__ << " " << item.first
+    dout(20) << __func__ << " " << token
             << " requeue_seq " << slot.requeue_seq << " > our "
             << requeue_seq << ", we raced with wake_pg_waiters"
             << dendl;
@@ -9644,12 +9748,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl;
     slot.pg = pg;
   }
-  dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process
+  dout(30) << __func__ << " " << token
+          << " to_process " << slot.to_process
           << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
 
   // make sure we're not already waiting for this pg
   if (slot.waiting_for_pg) {
-    dout(20) << __func__ << " " << item.first << " item " << item.second
+    dout(20) << __func__ << " " << token
             << " slot is waiting_for_pg" << dendl;
     if (pg) {
       pg->unlock();
@@ -9659,30 +9764,32 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   }
 
   // take next item
-  qi = slot.to_process.front();
+  auto qi = std::move(slot.to_process.front());
   slot.to_process.pop_front();
-  dout(20) << __func__ << " " << item.first << " item " << *qi
-          << " pg " << pg << dendl;
+  dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
 
   if (!pg) {
     // should this pg shard exist on this osd in this (or a later) epoch?
     OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
-    if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) {
-      dout(20) << __func__ << " " << item.first
-              << " no pg, should exist, will wait on " << *qi << dendl;
-      slot.to_process.push_front(*qi);
+    if (osdmap->is_up_acting_osd_shard(token,
+                                      osd->whoami)) {
+      dout(20) << __func__ << " " << token
+              << " no pg, should exist, will wait" << " on " << qi << dendl;
+      slot.to_process.push_front(std::move(qi));
       slot.waiting_for_pg = true;
-    } else if (qi->get_map_epoch() > osdmap->get_epoch()) {
-      dout(20) << __func__ << " " << item.first << " no pg, item epoch is "
-              << qi->get_map_epoch() << " > " << osdmap->get_epoch()
-              << ", will wait on " << *qi << dendl;
-      slot.to_process.push_front(*qi);
+    } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
+      dout(20) << __func__ << " " << token
+              << " no pg, item epoch is "
+              << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+              << ", will wait on " << qi << dendl;
+      slot.to_process.push_front(std::move(qi));
       slot.waiting_for_pg = true;
     } else {
-      dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist,"
-              << " dropping " << *qi << dendl;
+      dout(20) << __func__ << " " << token
+              << " no pg, shouldn't exist,"
+              << " dropping " << qi << dendl;
       // share map with client?
-      if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+      if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
        Session *session = static_cast<Session *>(
          (*_op)->get_req()->get_connection()->get_priv());
        if (session) {
@@ -9690,7 +9797,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
          session->put();
        }
       }
-      unsigned pushes_to_free = qi->get_reserved_pushes();
+      unsigned pushes_to_free = qi.get_reserved_pushes();
       if (pushes_to_free > 0) {
        sdata->sdata_op_ordering_lock.Unlock();
        osd->service.release_reserved_pushes(pushes_to_free);
@@ -9708,7 +9815,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   {
 #ifdef WITH_LTTNG
     osd_reqid_t reqid;
-    if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+    if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
       reqid = (*_op)->get_reqid();
     }
 #endif
@@ -9727,12 +9834,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
                                 suicide_interval);
-  qi->run(osd, pg, tp_handle);
+  qi.run(osd, pg, tp_handle);
 
   {
 #ifdef WITH_LTTNG
     osd_reqid_t reqid;
-    if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+    if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
       reqid = (*_op)->get_reqid();
     }
 #endif
@@ -9743,24 +9850,23 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   pg->unlock();
 }
 
-void OSD::ShardedOpWQ::_enqueue(pair<spg_t, OpQueueItem>&& item) {
+void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
   uint32_t shard_index =
-    item.first.hash_to_shard(shard_list.size());
+    item.get_ordering_token().hash_to_shard(shard_list.size());
 
   ShardData* sdata = shard_list[shard_index];
   assert (NULL != sdata);
-  unsigned priority = item.second.get_priority();
-  unsigned cost = item.second.get_cost();
+  unsigned priority = item.get_priority();
+  unsigned cost = item.get_cost();
   sdata->sdata_op_ordering_lock.Lock();
 
-  dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+  dout(20) << __func__ << " " << item << dendl;
   if (priority >= osd->op_prio_cutoff)
     sdata->pqueue->enqueue_strict(
-      item.second.get_owner(), priority, std::move(item));
+      item.get_owner(), priority, std::move(item));
   else
     sdata->pqueue->enqueue(
-      item.second.get_owner(),
-      priority, cost, std::move(item));
+      item.get_owner(), priority, cost, std::move(item));
   sdata->sdata_op_ordering_lock.Unlock();
 
   sdata->sdata_lock.Lock();
@@ -9769,26 +9875,26 @@ void OSD::ShardedOpWQ::_enqueue(pair<spg_t, OpQueueItem>&& item) {
 
 }
 
-void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, OpQueueItem>&& item)
+void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
 {
-  uint32_t shard_index = item.first.hash_to_shard(shard_list.size());
-  ShardData* sdata = shard_list[shard_index];
-  assert (NULL != sdata);
+  auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size());
+  auto& sdata = shard_list[shard_index];
+  assert(sdata);
   sdata->sdata_op_ordering_lock.Lock();
-  auto p = sdata->pg_slots.find(item.first);
+  auto p = sdata->pg_slots.find(item.get_ordering_token());
   if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
     // we may be racing with _process, which has dequeued a new item
     // from pqueue, put it on to_process, and is now busy taking the
     // pg lock.  ensure this old requeued item is ordered before any
     // such newer item in to_process.
-    p->second.to_process.push_front(item.second);
-    item.second = p->second.to_process.back();
+    p->second.to_process.push_front(std::move(item));
+    item = std::move(p->second.to_process.back());
     p->second.to_process.pop_back();
-    dout(20) << __func__ << " " << item.first
+    dout(20) << __func__
             << " " << p->second.to_process.front()
-            << " shuffled w/ " << item.second << dendl;
+            << " shuffled w/ " << item << dendl;
   } else {
-    dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+    dout(20) << __func__ << " " << item << dendl;
   }
   sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
   sdata->sdata_op_ordering_lock.Unlock();
index dfb4d0b11f5ee5cef3ab698c3d27fb33e0f3130e..2df4ad6e87efda0165ce4cb78bfd1539c3130f9c 100644 (file)
@@ -367,8 +367,8 @@ public:
   GenContextWQ recovery_gen_wq;
   ClassHandler  *&class_handler;
 
-  void enqueue_back(spg_t pgid, OpQueueItem&& qi);
-  void enqueue_front(spg_t pgid, OpQueueItem&& qi);
+  void enqueue_back(OpQueueItem&& qi);
+  void enqueue_front(OpQueueItem&& qi);
 
   void maybe_inject_dispatch_delay() {
     if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
@@ -859,22 +859,7 @@ public:
 
   AsyncReserver<spg_t> snap_reserver;
   void queue_for_snap_trim(PG *pg);
-
-  void queue_for_scrub(PG *pg, bool with_high_priority) {
-    unsigned scrub_queue_priority = pg->scrubber.priority;
-    if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
-      scrub_queue_priority = cct->_conf->osd_client_op_priority;
-    }
-    enqueue_back(
-      pg->get_pgid(),
-      OpQueueItem(
-       PGScrub(pg->get_osdmap()->get_epoch()),
-       cct->_conf->osd_scrub_cost,
-       scrub_queue_priority,
-       ceph_clock_now(),
-       0,
-       pg->get_osdmap()->get_epoch()));
-  }
+  void queue_for_scrub(PG *pg, bool with_high_priority);
 
 private:
   // -- pg recovery and associated throttling --
@@ -891,18 +876,7 @@ private:
   bool _recover_now(uint64_t *available_pushes);
   void _maybe_queue_recovery();
   void _queue_for_recovery(
-    pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
-    assert(recovery_lock.is_locked_by_me());
-    enqueue_back(
-      p.second->get_pgid(),
-      OpQueueItem(
-       PGRecovery(p.first, reserved_pushes),
-       cct->_conf->osd_recovery_cost,
-       cct->_conf->osd_recovery_priority,
-       ceph_clock_now(),
-       0,
-       p.first));
-  }
+    pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
 public:
   void start_recovery_op(PG *pg, const hobject_t& soid);
   void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
@@ -1618,10 +1592,10 @@ private:
    * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
    * and already requeued the items.
    */
-  friend class OpQueueItem;
+  friend class PGOpItem;
 
   class ShardedOpWQ
-    : public ShardedThreadPool::ShardedWQ<pair<spg_t,OpQueueItem>>
+    : public ShardedThreadPool::ShardedWQ<OpQueueItem>
   {
     struct ShardData {
       Mutex sdata_lock;
@@ -1650,18 +1624,18 @@ private:
       unordered_map<spg_t,pg_slot> pg_slots;
 
       /// priority queue
-      std::unique_ptr<OpQueue< pair<spg_t, OpQueueItem>, uint64_t>> pqueue;
+      std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
 
-      void _enqueue_front(pair<spg_t, OpQueueItem>&& item, unsigned cutoff) {
-       unsigned priority = item.second.get_priority();
-       unsigned cost = item.second.get_cost();
+      void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
+       unsigned priority = item.get_priority();
+       unsigned cost = item.get_cost();
        if (priority >= cutoff)
          pqueue->enqueue_strict_front(
-           item.second.get_owner(),
+           item.get_owner(),
            priority, std::move(item));
        else
          pqueue->enqueue_front(
-           item.second.get_owner(),
+           item.get_owner(),
            priority, cost, std::move(item));
       }
 
@@ -1673,15 +1647,13 @@ private:
          sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
                                 false, cct) {
        if (opqueue == io_queue::weightedpriority) {
-         pqueue = std::unique_ptr
-           <WeightedPriorityQueue<pair<spg_t,OpQueueItem>,uint64_t>>(
-             new WeightedPriorityQueue<pair<spg_t,OpQueueItem>,uint64_t>(
-               max_tok_per_prio, min_cost));
+         pqueue = ceph::make_unique<
+           WeightedPriorityQueue<OpQueueItem,uint64_t>>(
+               max_tok_per_prio, min_cost);
        } else if (opqueue == io_queue::prioritized) {
-         pqueue = std::unique_ptr
-           <PrioritizedQueue<pair<spg_t,OpQueueItem>,uint64_t>>(
-             new PrioritizedQueue<pair<spg_t,OpQueueItem>,uint64_t>(
-               max_tok_per_prio, min_cost));
+         pqueue = ceph::make_unique<
+           PrioritizedQueue<OpQueueItem,uint64_t>>(
+               max_tok_per_prio, min_cost);
        } else if (opqueue == io_queue::mclock_opclass) {
          pqueue = ceph::make_unique<ceph::mClockOpClassQueue>(cct);
        } else if (opqueue == io_queue::mclock_client) {
@@ -1700,7 +1672,7 @@ private:
                time_t ti,
                time_t si,
                ShardedThreadPool* tp)
-      : ShardedThreadPool::ShardedWQ<pair<spg_t,OpQueueItem>>(ti, si, tp),
+      : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
         osd(o),
         num_shards(pnum_shards) {
       for (uint32_t i = 0; i < num_shards; i++) {
@@ -1739,10 +1711,10 @@ private:
     void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
 
     /// enqueue a new item
-    void _enqueue(pair <spg_t, OpQueueItem>&& item) override;
+    void _enqueue(OpQueueItem&& item) override;
 
     /// requeue an old item (at the front of the line)
-    void _enqueue_front(pair <spg_t, OpQueueItem>&& item) override;
+    void _enqueue_front(OpQueueItem&& item) override;
       
     void return_waiting_threads() override {
       for(uint32_t i = 0; i < num_shards; i++) {
@@ -1756,12 +1728,14 @@ private:
 
     void dump(Formatter *f) {
       for(uint32_t i = 0; i < num_shards; i++) {
-       ShardData* sdata = shard_list[i];
-       char lock_name[32] = {0};
-       snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
-       assert (NULL != sdata);
+       auto &&sdata = shard_list[i];
+
+       char queue_name[32] = {0};
+       snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i);
+       assert(NULL != sdata);
+
        sdata->sdata_op_ordering_lock.Lock();
-       f->open_object_section(lock_name);
+       f->open_object_section(queue_name);
        sdata->pqueue->dump(f);
        f->close_section();
        sdata->sdata_op_ordering_lock.Unlock();
@@ -1783,9 +1757,9 @@ private:
            out_ops->push_front(*mop);
        }
       }
-      bool operator()(const pair<spg_t, OpQueueItem> &op) {
-       if (op.first == pgid) {
-         accumulate(op.second);
+      bool operator()(const OpQueueItem &op) {
+       if (op.get_ordering_token() == pgid) {
+         accumulate(op);
          return true;
        } else {
          return false;
@@ -1798,8 +1772,8 @@ private:
 
     bool is_shard_empty(uint32_t thread_index) override {
       uint32_t shard_index = thread_index % num_shards; 
-      ShardData* sdata = shard_list[shard_index];
-      assert(NULL != sdata);
+      auto &&sdata = shard_list[shard_index];
+      assert(sdata);
       Mutex::Locker l(sdata->sdata_op_ordering_lock);
       return sdata->pqueue->empty();
     }
index bf9441618663ae4d6df272ba4d3accae9364578e..02bd16727718e0615c4254ec0d52c8369e20b503 100644 (file)
  *
  */
 
-
-#include "PG.h"
 #include "OpQueueItem.h"
 #include "OSD.h"
 
-
-void OpQueueItem::RunVis::operator()(const OpRequestRef &op) {
+void PGOpItem::run(OSD *osd,
+                   PGRef& pg,
+                   ThreadPool::TPHandle &handle)
+{
   osd->dequeue_op(pg, op, handle);
 }
-
-void OpQueueItem::RunVis::operator()(const PGSnapTrim &op) {
-  pg->snap_trimmer(op.epoch_queued);
-}
-
-void OpQueueItem::RunVis::operator()(const PGScrub &op) {
-  pg->scrub(op.epoch_queued, handle);
-}
-
-void OpQueueItem::RunVis::operator()(const PGRecovery &op) {
-  osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
-}
index 97c4c5ae22f3f15fe27b0e7997ac928d5ea5889c..212be71c96af635a9aff8b06fd864ee72535ea19 100644 (file)
@@ -53,96 +53,163 @@ struct PGRecovery {
   }
 };
 
-
 class OpQueueItem {
-  typedef boost::variant<
-    OpRequestRef,
-    PGSnapTrim,
-    PGScrub,
-    PGRecovery
-    > QVariant;
-  QVariant qvariant;
-  int cost;
-  unsigned priority;
-  utime_t start_time;
-  uint64_t owner;  ///< global id (e.g., client.XXX)
-  epoch_t map_epoch;    ///< an epoch we expect the PG to exist in
-
-  struct RunVis : public boost::static_visitor<> {
-    OSD *osd;
-    PGRef &pg;
-    ThreadPool::TPHandle &handle;
-    RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
-      : osd(osd), pg(pg), handle(handle) {}
-    void operator()(const OpRequestRef &op);
-    void operator()(const PGSnapTrim &op);
-    void operator()(const PGScrub &op);
-    void operator()(const PGRecovery &op);
-  }; // struct RunVis
-
-  struct StringifyVis : public boost::static_visitor<std::string> {
-    std::string operator()(const OpRequestRef &op) {
-      return stringify(op);
-    }
-    std::string operator()(const PGSnapTrim &op) {
-      return "PGSnapTrim";
+public:
+  class OrderLocker {
+  public:
+    using Ref = unique_ptr<OrderLocker>;
+    virtual void lock() = 0;
+    virtual void unlock() = 0;
+    virtual ~OrderLocker() {}
+  };
+  // Abstraction for operations queueable in the op queue
+  class OpQueueable {
+  public:
+    enum class op_type_t {
+      client_op,
+      osd_subop,
+      bg_snaptrim,
+      bg_recovery,
+      bg_scrub
+    };
+    using Ref = std::unique_ptr<OpQueueable>;
+
+    /// Items with the same queue token will end up in the same shard
+    virtual uint32_t get_queue_token() const = 0;
+
+    /* Items will be dequeued and locked atomically w.r.t. other items with the
+       * same ordering token */
+    virtual const spg_t& get_ordering_token() const = 0;
+    virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+    virtual op_type_t get_op_type() const = 0;
+    virtual boost::optional<OpRequestRef> maybe_get_op() const {
+      return boost::none;
     }
-    std::string operator()(const PGScrub &op) {
-      return "PGScrub";
+
+    virtual uint64_t get_reserved_pushes() const {
+      return 0;
     }
-    std::string operator()(const PGRecovery &op) {
-      return "PGRecovery";
+
+    virtual ostream &print(ostream &rhs) const = 0;
+
+    virtual void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+    virtual ~OpQueueable() {}
+    friend ostream& operator<<(ostream& out, const OpQueueable& q) {
+      return q.print(out);
     }
   };
 
-  friend ostream& operator<<(ostream& out, const OpQueueItem& q) {
-    StringifyVis v;
-    return out << "OpQueueItem(" << boost::apply_visitor(v, q.qvariant)
-              << " prio " << q.priority << " cost " << q.cost
-              << " e" << q.map_epoch << ")";
-  }
+private:
+  OpQueueable::Ref qitem;
+  int cost;
+  unsigned priority;
+  utime_t start_time;
+  uint64_t owner;  ///< global id (e.g., client.XXX)
+  epoch_t map_epoch;    ///< an epoch we expect the PG to exist in
 
 public:
-
-  OpQueueItem(OpRequestRef op, epoch_t e)
-    : qvariant(op), cost(op->get_req()->get_cost()),
-      priority(op->get_req()->get_priority()),
-      start_time(op->get_req()->get_recv_stamp()),
-      owner(op->get_req()->get_source().num()),
-      map_epoch(e)
-    {}
   OpQueueItem(
-    const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
-    uint64_t owner, epoch_t e)
-    : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner), map_epoch(e) {}
-  OpQueueItem(
-    const PGScrub &op, int cost, unsigned priority, utime_t start_time,
-    uint64_t owner, epoch_t e)
-    : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner), map_epoch(e) {}
-  OpQueueItem(
-    const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
-    uint64_t owner, epoch_t e)
-    : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner), map_epoch(e) {}
-
-  const boost::optional<OpRequestRef> maybe_get_op() const {
-    const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
-    return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
+    OpQueueable::Ref &&item,
+    int cost,
+    unsigned priority,
+    utime_t start_time,
+    uint64_t owner,
+    epoch_t e)
+    : qitem(std::move(item)),
+      cost(cost),
+      priority(priority),
+      start_time(start_time),
+      owner(owner),
+      map_epoch(e)
+  {}
+  OpQueueItem(OpQueueItem &&) = default;
+  OpQueueItem(const OpQueueItem &) = delete;
+  OpQueueItem &operator=(OpQueueItem &&) = default;
+  OpQueueItem &operator=(const OpQueueItem &) = delete;
+
+  OrderLocker::Ref get_order_locker(PGRef pg) {
+    return qitem->get_order_locker(pg);
+  }
+  uint32_t get_queue_token() const {
+    return qitem->get_queue_token();
+  }
+  const spg_t& get_ordering_token() const {
+    return qitem->get_ordering_token();
+  }
+  using op_type_t = OpQueueable::op_type_t;
+  OpQueueable::op_type_t get_op_type() const {
+    return qitem->get_op_type();
+  }
+  boost::optional<OpRequestRef> maybe_get_op() const {
+    return qitem->maybe_get_op();
   }
   uint64_t get_reserved_pushes() const {
-    const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
-    return op ? op->reserved_pushes : 0;
+    return qitem->get_reserved_pushes();
   }
-  void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
-    RunVis v(osd, pg, handle);
-    boost::apply_visitor(v, qvariant);
+  void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) {
+    qitem->run(osd, pg, handle);
   }
   unsigned get_priority() const { return priority; }
   int get_cost() const { return cost; }
   utime_t get_start_time() const { return start_time; }
   uint64_t get_owner() const { return owner; }
   epoch_t get_map_epoch() const { return map_epoch; }
-  const QVariant& get_variant() const { return qvariant; }
-}; // struct OpQueueItem
+
+  friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
+    return out << "OpQueueItem("
+              << item.get_ordering_token() << " " << *item.qitem
+              << " prio " << item.get_priority()
+              << " cost " << item.get_cost()
+              << " e" << item.get_map_epoch() << ")";
+  }
+};
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpQueueItem::OpQueueable {
+  spg_t pgid;
+protected:
+  const spg_t& get_pgid() const {
+    return pgid;
+  }
+public:
+  PGOpQueueable(spg_t pg) : pgid(pg) {}
+  uint32_t get_queue_token() const override final {
+    return get_pgid().ps();
+  }
+
+  const spg_t& get_ordering_token() const override final {
+    return get_pgid();
+  }
+
+  OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final {
+    class Locker : public OpQueueItem::OrderLocker {
+      PGRef pg;
+    public:
+      Locker(PGRef pg) : pg(pg) {}
+      void lock() override final {
+       pg->lock();
+      }
+      void unlock() override final {
+       pg->unlock();
+      }
+    };
+    return OpQueueItem::OrderLocker::Ref(
+      new Locker(pg));
+  }
+};
+
+class PGOpItem : public PGOpQueueable {
+  OpRequestRef op;
+public:
+  PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(op) {}
+  op_type_t get_op_type() const override final {
+    return op_type_t::client_op;
+  }
+  ostream &print(ostream &rhs) const override final {
+    return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+  }
+  boost::optional<OpRequestRef> maybe_get_op() const override final {
+    return op;
+  }
+  void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
index 509537d99537a16b269f386eff6b1768b11fbc62..e2c4028f88cc7163d9c1a538dae72fe226b39902 100644 (file)
@@ -3457,7 +3457,14 @@ void PG::requeue_op(OpRequestRef op)
     p->second.push_front(op);
   } else {
     dout(20) << __func__ << " " << op << dendl;
-    osd->enqueue_front(info.pgid, OpQueueItem(op, get_osdmap()->get_epoch()));
+    osd->enqueue_front(
+      OpQueueItem(
+        unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+       op->get_req()->get_cost(),
+       op->get_req()->get_priority(),
+       op->get_req()->get_recv_stamp(),
+       op->get_req()->get_source_inst(),
+       get_osdmap()->get_epoch()));
   }
 }
 
@@ -3466,15 +3473,7 @@ void PG::requeue_ops(list<OpRequestRef> &ls)
   for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
        i != ls.rend();
        ++i) {
-    auto p = waiting_for_map.find((*i)->get_source());
-    if (p != waiting_for_map.end()) {
-      dout(20) << __func__ << " " << *i << " (waiting_for_map " << p->first
-              << ")" << dendl;
-      p->second.push_front(*i);
-    } else {
-      dout(20) << __func__ << " " << *i << dendl;
-      osd->enqueue_front(info.pgid, OpQueueItem(*i, get_osdmap()->get_epoch()));
-    }
+    requeue_op(*i);
   }
   ls.clear();
 }
@@ -3492,7 +3491,14 @@ void PG::requeue_map_waiters()
     } else {
       dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
       for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
-       osd->enqueue_front(info.pgid, OpQueueItem(*q, epoch));
+       auto req = *q;
+       osd->enqueue_front(OpQueueItem(
+          unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, req)),
+         req->get_req()->get_cost(),
+         req->get_req()->get_priority(),
+         req->get_req()->get_recv_stamp(),
+         req->get_req()->get_source_inst(),
+         epoch));
       }
       p = waiting_for_map.erase(p);
     }
index 15231f2c101f20a923200880c9f4d7e8fe7158c8..6abb40fe4435735ef80189e882c103b61076e3bf 100644 (file)
@@ -9060,9 +9060,15 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
     if (scrubber.active_rep_scrub) {
       if (last_update_applied >= static_cast<const MOSDRepScrub*>(
            scrubber.active_rep_scrub->get_req())->scrub_to) {
+       auto& op = scrubber.active_rep_scrub;
        osd->enqueue_back(
-         info.pgid,
-         OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
+          OpQueueItem(
+           unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+           op->get_req()->get_cost(),
+           op->get_req()->get_priority(),
+           op->get_req()->get_recv_stamp(),
+           op->get_req()->get_source_inst(),
+           get_osdmap()->get_epoch()));
        scrubber.active_rep_scrub = OpRequestRef();
       }
     }
@@ -10297,10 +10303,16 @@ void PrimaryLogPG::_applied_recovered_object_replica()
   if (!deleting && active_pushes == 0 &&
       scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
        scrubber.active_rep_scrub->get_req())->chunky) {
+    auto& op = scrubber.active_rep_scrub;
     osd->enqueue_back(
-      info.pgid,
-      OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
-    scrubber.active_rep_scrub = OpRequestRef();
+      OpQueueItem(
+        unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+       op->get_req()->get_cost(),
+       op->get_req()->get_priority(),
+       op->get_req()->get_recv_stamp(),
+       op->get_req()->get_source_inst(),
+       get_osdmap()->get_epoch()));
+    scrubber.active_rep_scrub.reset();
   }
   unlock();
 }
index 5393792a4aaaddd27da417fdb552c216982076a3..31ce375e35496eedcd6d9e7808ffac2148e37ba3 100644 (file)
@@ -101,22 +101,21 @@ namespace ceph {
 
   mClockClientQueue::osd_op_type_t
   mClockClientQueue::get_osd_op_type(const Request& request) {
-    osd_op_type_t type =
-      boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
-
+    switch (request.get_op_type()) {
     // if we got client_op back then we need to distinguish between
     // a client op and an osd subop.
-
-    if (osd_op_type_t::client_op != type) {
-      return type;
-      /* fixme: this should match REPOP and probably others
-    } else if (MSG_OSD_SUBOP ==
-              boost::get<OpRequestRef>(
-                request.second.get_variant())->get_req()->get_header().type) {
-      return osd_op_type_t::osd_subop;
-      */
-    } else {
+    case OpQueueItem::op_type_t::client_op:
       return osd_op_type_t::client_op;
+    case OpQueueItem::op_type_t::osd_subop:
+      return osd_op_type_t::osd_subop;
+    case OpQueueItem::op_type_t::bg_snaptrim:
+      return osd_op_type_t::bg_snaptrim;
+    case OpQueueItem::op_type_t::bg_recovery:
+      return osd_op_type_t::bg_recovery;
+    case OpQueueItem::op_type_t::bg_scrub:
+      return osd_op_type_t::bg_scrub;
+    default:
+      assert(0);
     }
   }
 
index a08ee0e0ec6d68ab972e6ec2187e570412f39856..42bb175f7a06c76a2e8f457339adcb703398e75b 100644 (file)
@@ -28,7 +28,7 @@
 
 namespace ceph {
 
-  using Request = std::pair<spg_t, OpQueueItem>;
+  using Request = OpQueueItem;
   using Client = uint64_t;
 
   // This class exists to bridge the ceph code, which treats the class
@@ -74,7 +74,7 @@ namespace ceph {
                                std::list<Request> *out) override final {
       queue.remove_by_filter(
        [&cl, out] (Request&& r) -> bool {
-         if (cl == r.second.get_owner()) {
+         if (cl == r.get_owner()) {
            out->push_front(std::move(r));
            return true;
          } else {
index f825511f67da770a6764b906d25cec8d72e2a4f1..743c7e367257dffe943b045283262832d485238e 100644 (file)
@@ -98,22 +98,21 @@ namespace ceph {
 
   mClockOpClassQueue::osd_op_type_t
   mClockOpClassQueue::get_osd_op_type(const Request& request) {
-    osd_op_type_t type =
-      boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
-
+    switch (request.get_op_type()) {
     // if we got client_op back then we need to distinguish between
     // a client op and an osd subop.
-
-    if (osd_op_type_t::client_op != type) {
-      return type;
-      /* fixme: this should match REPOP and probably others
-    } else if (MSG_OSD_SUBOP ==
-              boost::get<OpRequestRef>(
-                request.second.get_variant())->get_req()->get_header().type) {
-      return osd_op_type_t::osd_subop;
-      */
-    } else {
+    case OpQueueItem::op_type_t::client_op:
       return osd_op_type_t::client_op;
+    case OpQueueItem::op_type_t::osd_subop:
+      return osd_op_type_t::osd_subop;
+    case OpQueueItem::op_type_t::bg_snaptrim:
+      return osd_op_type_t::bg_snaptrim;
+    case OpQueueItem::op_type_t::bg_recovery:
+      return osd_op_type_t::bg_recovery;
+    case OpQueueItem::op_type_t::bg_scrub:
+      return osd_op_type_t::bg_scrub;
+    default:
+      assert(0);
     }
   }
 
index 957b7232a9d9e9c9cf9ada231607ab92183a3045..de9757f6354cbfcc59f227a2d43c0566d5816176 100644 (file)
@@ -28,7 +28,7 @@
 
 namespace ceph {
 
-  using Request = std::pair<spg_t, OpQueueItem>;
+  using Request = OpQueueItem;
   using Client = uint64_t;
 
   // This class exists to bridge the ceph code, which treats the class
@@ -72,7 +72,7 @@ namespace ceph {
                                std::list<Request> *out) override final {
       queue.remove_by_filter(
        [&cl, out] (Request&& r) -> bool {
-         if (cl == r.second.get_owner()) {
+         if (cl == r.get_owner()) {
            out->push_front(std::move(r));
            return true;
          } else {