]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: change pg_slots unordered_map to use unique_ptr<>
authorSage Weil <sage@redhat.com>
Fri, 9 Feb 2018 20:39:34 +0000 (14:39 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Apr 2018 13:26:53 +0000 (08:26 -0500)
This avoids moving slots around in memory in the unordered_map... they can
be big!

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h

index 675997842f0c7f70f979d5c008a58b82c1228109..1a7d8a549ed00308f63cf2ddc383fd1aff0a8ceb 100644 (file)
@@ -3694,11 +3694,11 @@ void OSD::_get_pgs(vector<PGRef> *v, bool clear_too)
   for (auto& s : shards) {
     Mutex::Locker l(s->sdata_op_ordering_lock);
     for (auto& j : s->pg_slots) {
-      if (j.second.pg &&
-         !j.second.pg->is_deleted()) {
-       v->push_back(j.second.pg);
+      if (j.second->pg &&
+         !j.second->pg->is_deleted()) {
+       v->push_back(j.second->pg);
        if (clear_too) {
-         s->_detach_pg(j.second);
+         s->_detach_pg(j.second.get());
        }
       }
     }
@@ -3711,8 +3711,8 @@ void OSD::_get_pgids(vector<spg_t> *v)
   for (auto& s : shards) {
     Mutex::Locker l(s->sdata_op_ordering_lock);
     for (auto& j : s->pg_slots) {
-      if (j.second.pg &&
-         !j.second.pg->is_deleted()) {
+      if (j.second->pg &&
+         !j.second->pg->is_deleted()) {
        v->push_back(j.first);
       }
     }
@@ -3725,8 +3725,9 @@ void OSD::register_pg(PGRef pg)
   uint32_t shard_index = pgid.hash_to_shard(num_shards);
   auto sdata = shards[shard_index];
   Mutex::Locker l(sdata->sdata_op_ordering_lock);
-  auto& slot = sdata->pg_slots[pgid];
-  assert(!slot.pg);
+  auto r = sdata->pg_slots.emplace(pgid, make_unique<OSDShard::pg_slot>());
+  assert(r.second);
+  auto *slot = r.first->second.get();
   dout(20) << __func__ << " " << pgid << " " << pg << dendl;
   sdata->_attach_pg(slot, pg.get());
 }
@@ -3738,9 +3739,9 @@ void OSD::unregister_pg(PG *pg)
   Mutex::Locker l(sdata->sdata_op_ordering_lock);
   auto p = sdata->pg_slots.find(pg->pg_id);
   if (p != sdata->pg_slots.end() &&
-      p->second.pg) {
+      p->second->pg) {
     dout(20) << __func__ << " " << pg->pg_id << " " << pg << dendl;
-    sdata->_detach_pg(p->second);
+    sdata->_detach_pg(p->second.get());
   } else {
     dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl;
   }
@@ -3755,7 +3756,7 @@ PGRef OSD::_lookup_pg(spg_t pgid)
   if (p == sdata->pg_slots.end()) {
     return nullptr;
   }
-  return p->second.pg;
+  return p->second->pg;
 }
 
 PG *OSD::_lookup_lock_pg(spg_t pgid)
@@ -9244,19 +9245,19 @@ int OSD::init_op_flags(OpRequestRef& op)
 #undef dout_prefix
 #define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " "
 
-void OSDShard::_attach_pg(pg_slotslot, PG *pg)
+void OSDShard::_attach_pg(pg_slot *slot, PG *pg)
 {
   dout(10) << pg->pg_id << " " << pg << dendl;
-  slot.pg = pg;
+  slot->pg = pg;
   pg->osd_shard = this;
   ++osd->num_pgs;
 }
 
-void OSDShard::_detach_pg(pg_slotslot)
+void OSDShard::_detach_pg(pg_slot *slot)
 {
-  dout(10) << slot.pg->pg_id << " " << slot.pg << dendl;
-  slot.pg->osd_shard = nullptr;
-  slot.pg = nullptr;
+  dout(10) << slot->pg->pg_id << " " << slot->pg << dendl;
+  slot->pg->osd_shard = nullptr;
+  slot->pg = nullptr;
   --osd->num_pgs;
 }
 
@@ -9276,7 +9277,7 @@ void OSDShard::consume_map(
   // check slots
   auto p = pg_slots.begin();
   while (p != pg_slots.end()) {
-    OSDShard::pg_slot& slot = p->second;
+    OSDShard::pg_slot *slot = p->second.get();
     const spg_t& pgid = p->first;
     dout(20) << __func__ << " " << pgid << dendl;
     if (old_osdmap &&
@@ -9287,14 +9288,14 @@ void OSDShard::consume_map(
       osd->service.identify_split_children(old_osdmap, new_osdmap, pgid,
                                           new_children);
     }
-    if (slot.waiting_for_split) {
+    if (slot->waiting_for_split) {
       dout(20) << __func__ << "  " << pgid
               << " waiting for split" << dendl;
       ++p;
       continue;
     }
-    if (!slot.waiting_peering.empty()) {
-      epoch_t first = slot.waiting_peering.begin()->first;
+    if (!slot->waiting_peering.empty()) {
+      epoch_t first = slot->waiting_peering.begin()->first;
       if (first <= osdmap->get_epoch()) {
        dout(20) << __func__ << "  " << pgid
                 << " pending_peering first epoch " << first
@@ -9305,27 +9306,27 @@ void OSDShard::consume_map(
       ++p;
       continue;
     }
-    if (!slot.waiting.empty()) {
+    if (!slot->waiting.empty()) {
       if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) {
        dout(20) << __func__ << "  " << pgid << " maps to us, keeping"
                 << dendl;
        ++p;
        continue;
       }
-      while (!slot.waiting.empty() &&
-            slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
-       auto& qi = slot.waiting.front();
+      while (!slot->waiting.empty() &&
+            slot->waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
+       auto& qi = slot->waiting.front();
        dout(20) << __func__ << "  " << pgid
                 << " waiting item " << qi
                 << " epoch " << qi.get_map_epoch()
                 << " <= " << osdmap->get_epoch()
                 << ", stale, dropping" << dendl;
        *pushes_to_free += qi.get_reserved_pushes();
-       slot.waiting.pop_front();
+       slot->waiting.pop_front();
       }
-      if (slot.waiting.empty() &&
-         slot.num_running == 0 &&
-         !slot.pg) {
+      if (slot->waiting.empty() &&
+         slot->num_running == 0 &&
+         !slot->pg) {
        dout(20) << __func__ << "  " << pgid << " empty, pruning" << dendl;
        p = pg_slots.erase(p);
        continue;
@@ -9343,26 +9344,26 @@ void OSDShard::consume_map(
 
 void OSDShard::_wake_pg_slot(
   spg_t pgid,
-  OSDShard::pg_slotslot)
+  OSDShard::pg_slot *slot)
 {
   dout(20) << __func__ << " " << pgid
-          << " to_process " << slot.to_process
-          << " waiting " << slot.waiting
-          << " waiting_peering " << slot.waiting_peering << dendl;
-  for (auto i = slot.to_process.rbegin();
-       i != slot.to_process.rend();
+          << " to_process " << slot->to_process
+          << " waiting " << slot->waiting
+          << " waiting_peering " << slot->waiting_peering << dendl;
+  for (auto i = slot->to_process.rbegin();
+       i != slot->to_process.rend();
        ++i) {
     _enqueue_front(std::move(*i), osd->op_prio_cutoff);
   }
-  slot.to_process.clear();
-  for (auto i = slot.waiting.rbegin();
-       i != slot.waiting.rend();
+  slot->to_process.clear();
+  for (auto i = slot->waiting.rbegin();
+       i != slot->waiting.rend();
        ++i) {
     _enqueue_front(std::move(*i), osd->op_prio_cutoff);
   }
-  slot.waiting.clear();
-  for (auto i = slot.waiting_peering.rbegin();
-       i != slot.waiting_peering.rend();
+  slot->waiting.clear();
+  for (auto i = slot->waiting_peering.rbegin();
+       i != slot->waiting_peering.rend();
        ++i) {
     // this is overkill; we requeue everything, even if some of these items are
     // waiting for maps we don't have yet.  FIXME.
@@ -9370,9 +9371,9 @@ void OSDShard::_wake_pg_slot(
       _enqueue_front(std::move(*j), osd->op_prio_cutoff);
     }
   }
-  slot.waiting_peering.clear();
-  slot.waiting_for_split = false;
-  ++slot.requeue_seq;
+  slot->waiting_peering.clear();
+  slot->waiting_for_split = false;
+  ++slot->requeue_seq;
 }
 
 void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids)
@@ -9408,11 +9409,10 @@ void OSDShard::_prime_splits(set<spg_t> *pgids)
   while (p != pgids->end()) {
     unsigned shard_index = p->hash_to_shard(osd->num_shards);
     if (shard_index == shard_id) {
-      auto i = pg_slots.find(*p);
-      if (i == pg_slots.end()) {
+      auto r = pg_slots.emplace(*p, make_unique<OSDShard::pg_slot>());
+      if (r.second) {
        dout(10) << "priming slot " << *p << dendl;
-       OSDShard::pg_slot& slot = pg_slots[*p];
-       slot.waiting_for_split = true;
+       r.first->second->waiting_for_split = true;
       } else {
        auto q = pg_slots.find(*p);
        assert(q != pg_slots.end());
@@ -9437,9 +9437,9 @@ void OSDShard::register_and_wake_split_child(PG *pg)
     dout(10) << pg->pg_id << " " << pg << dendl;
     auto p = pg_slots.find(pg->pg_id);
     assert(p != pg_slots.end());
-    auto& slot = p->second;
-    assert(!slot.pg);
-    assert(slot.waiting_for_split);
+    auto *slot = p->second.get();
+    assert(!slot->pg);
+    assert(slot->waiting_for_split);
     _attach_pg(slot, pg);
     _wake_pg_slot(pg->pg_id, slot);
   }
@@ -9474,7 +9474,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num)
 
 void OSD::ShardedOpWQ::_add_slot_waiter(
   spg_t pgid,
-  OSDShard::pg_slotslot,
+  OSDShard::pg_slot *slot,
   OpQueueItem&& qi)
 {
   if (qi.is_peering()) {
@@ -9482,13 +9482,13 @@ void OSD::ShardedOpWQ::_add_slot_waiter(
             << " peering, item epoch is "
             << qi.get_map_epoch()
             << ", will wait on " << qi << dendl;
-    slot.waiting_peering[qi.get_map_epoch()].push_back(std::move(qi));
+    slot->waiting_peering[qi.get_map_epoch()].push_back(std::move(qi));
   } else {
     dout(20) << __func__ << " " << pgid
             << " item epoch is "
             << qi.get_map_epoch()
             << ", will wait on " << qi << dendl;
-    slot.waiting.push_back(std::move(qi));
+    slot->waiting.push_back(std::move(qi));
   }
 }
 
@@ -9533,15 +9533,17 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   uint64_t requeue_seq;
   const auto token = item.get_ordering_token();
   {
-    auto& slot = sdata->pg_slots[token];
+    auto r = sdata->pg_slots.emplace(token, make_unique<OSDShard::pg_slot>());
+    auto *slot = r.first->second.get();
     dout(30) << __func__ << " " << token
-            << " to_process " << slot.to_process
-            << " waiting " << slot.waiting
-            << " waiting_peering " << slot.waiting_peering
+            << (r.second ? " (new)" : "")
+            << " to_process " << slot->to_process
+            << " waiting " << slot->waiting
+            << " waiting_peering " << slot->waiting_peering
             << dendl;
-    if (slot.waiting_for_split
-       || (item.is_peering() && !slot.waiting_peering.empty())
-       || (!item.is_peering() && !slot.waiting.empty())) {
+    if (slot->waiting_for_split
+       || (item.is_peering() && !slot->waiting_peering.empty())
+       || (!item.is_peering() && !slot->waiting.empty())) {
       dout(20) << __func__ << " " << token << " already waiting, adding " << item
               << dendl;
       _add_slot_waiter(token, slot, std::move(item));
@@ -9549,12 +9551,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       return;
     }
     // note the requeue seq now...
-    requeue_seq = slot.requeue_seq;
-    pg = slot.pg;
-    slot.to_process.push_back(std::move(item));
-    dout(20) << __func__ << " " << slot.to_process.back()
+    requeue_seq = slot->requeue_seq;
+    pg = slot->pg;
+    slot->to_process.push_back(std::move(item));
+    dout(20) << __func__ << " " << slot->to_process.back()
             << " queued" << dendl;
-    ++slot.num_running;
+    ++slot->num_running;
   }
   sdata->sdata_op_ordering_lock.Unlock();
 
@@ -9573,10 +9575,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
   auto q = sdata->pg_slots.find(token);
   assert(q != sdata->pg_slots.end());
-  auto& slot = q->second;
-  --slot.num_running;
+  auto *slot = q->second.get();
+  --slot->num_running;
 
-  if (slot.to_process.empty()) {
+  if (slot->to_process.empty()) {
     // raced with wake_pg_waiters or consume_map
     dout(20) << __func__ << " " << token
             << " nothing queued" << dendl;
@@ -9586,9 +9588,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     sdata->sdata_op_ordering_lock.Unlock();
     return;
   }
-  if (requeue_seq != slot.requeue_seq) {
+  if (requeue_seq != slot->requeue_seq) {
     dout(20) << __func__ << " " << token
-            << " requeue_seq " << slot.requeue_seq << " > our "
+            << " requeue_seq " << slot->requeue_seq << " > our "
             << requeue_seq << ", we raced with wake_pg_waiters"
             << dendl;
     if (pg) {
@@ -9598,16 +9600,16 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     return;
   }
   dout(30) << __func__ << " " << token
-          << " to_process " << slot.to_process
-          << " waiting " << slot.waiting
-          << " waiting_peering " << slot.waiting_peering << dendl;
+          << " to_process " << slot->to_process
+          << " waiting " << slot->waiting
+          << " waiting_peering " << slot->waiting_peering << dendl;
 
   ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
                                 suicide_interval);
 
   // take next item
-  auto qi = std::move(slot.to_process.front());
-  slot.to_process.pop_front();
+  auto qi = std::move(slot->to_process.front());
+  slot->to_process.pop_front();
   dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
   unsigned pushes_to_free = 0;
   set<spg_t> new_children;
@@ -9617,7 +9619,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     // should this pg shard exist on this osd in this (or a later) epoch?
     osdmap = sdata->osdmap;
     const PGCreateInfo *create_info = qi.creates_pg();
-    if (slot.waiting_for_split) {
+    if (slot->waiting_for_split) {
       dout(20) << __func__ << " " << token
               << " splitting" << dendl;
       _add_slot_waiter(token, slot, std::move(qi));
@@ -9786,16 +9788,17 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
   assert(sdata);
   sdata->sdata_op_ordering_lock.Lock();
   auto p = sdata->pg_slots.find(item.get_ordering_token());
-  if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
+  if (p != sdata->pg_slots.end() &&
+      !p->second->to_process.empty()) {
     // we may be racing with _process, which has dequeued a new item
     // from pqueue, put it on to_process, and is now busy taking the
     // pg lock.  ensure this old requeued item is ordered before any
     // such newer item in to_process.
-    p->second.to_process.push_front(std::move(item));
-    item = std::move(p->second.to_process.back());
-    p->second.to_process.pop_back();
+    p->second->to_process.push_front(std::move(item));
+    item = std::move(p->second->to_process.back());
+    p->second->to_process.pop_back();
     dout(20) << __func__
-            << " " << p->second.to_process.front()
+            << " " << p->second->to_process.front()
             << " shuffled w/ " << item << dendl;
   } else {
     dout(20) << __func__ << " " << item << dendl;
index c0ab3e8e576d8505e18ccae2823fbaea6a241a76..8a4bf386cc168916c0bb6756288048dadb550cbc 100644 (file)
@@ -1120,7 +1120,7 @@ struct OSDShard {
   /// map of slots for each spg_t.  maintains ordering of items dequeued
   /// from pqueue while _process thread drops shard lock to acquire the
   /// pg lock.  slots are removed by consume_map.
-  unordered_map<spg_t,pg_slot> pg_slots;
+  unordered_map<spg_t,unique_ptr<pg_slot>> pg_slots;
 
   /// priority queue
   std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
@@ -1140,8 +1140,8 @@ struct OSDShard {
        priority, cost, std::move(item));
   }
 
-  void _attach_pg(pg_slotslot, PG *pg);
-  void _detach_pg(pg_slotslot);
+  void _attach_pg(pg_slot *slot, PG *pg);
+  void _detach_pg(pg_slot *slot);
 
   /// push osdmap into shard
   void consume_map(
@@ -1149,7 +1149,7 @@ struct OSDShard {
     unsigned *pushes_to_free,
     set<spg_t> *new_children);
 
-  void _wake_pg_slot(spg_t pgid, OSDShard::pg_slotslot);
+  void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot *slot);
 
   void _prime_splits(set<spg_t> *pgids);
   void prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids);
@@ -1654,7 +1654,7 @@ protected:
 
     void _add_slot_waiter(
       spg_t token,
-      OSDShard::pg_slotslot,
+      OSDShard::pg_slot *slot,
       OpQueueItem&& qi);
 
     /// try to do some work