]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move ShardedOpWQ::ShardData -> OSDShard
authorSage Weil <sage@redhat.com>
Tue, 6 Feb 2018 00:22:40 +0000 (18:22 -0600)
committerSage Weil <sage@redhat.com>
Wed, 4 Apr 2018 13:26:52 +0000 (08:26 -0500)
Soon we will destroy pg_map!

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h

index 7a3aa8ca36da3690933e9330ca9d6cbbb8d7bbc2..e8a4b301df1e5b10b4227c28241f9a13c2769231 100644 (file)
@@ -2036,7 +2036,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   op_queue(get_io_queue()),
   op_prio_cutoff(get_io_prio_cut()),
   op_shardedwq(
-    get_num_op_shards(),
     this,
     cct->_conf->osd_op_thread_timeout,
     cct->_conf->osd_op_thread_suicide_timeout,
@@ -2067,10 +2066,28 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   ss << "osd." << whoami;
   trace_endpoint.copy_name(ss.str());
 #endif
+
+  // initialize shards
+  num_shards = get_num_op_shards();
+  for (uint32_t i = 0; i < num_shards; i++) {
+    char lock_name[128] = {0};
+    snprintf(lock_name, sizeof(lock_name), "OSDShard.%d::sdata_lock", i);
+    char order_lock[128] = {0};
+    snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i);
+    OSDShard *one_shard = new OSDShard(
+      lock_name, order_lock,
+      cct->_conf->osd_op_pq_max_tokens_per_priority,
+      cct->_conf->osd_op_pq_min_cost, cct, op_queue);
+    shards.push_back(one_shard);
+  }
 }
 
 OSD::~OSD()
 {
+  while (!shards.empty()) {
+    delete shards.back();
+    shards.pop_back();
+  }
   delete authorize_handler_cluster_registry;
   delete authorize_handler_service_registry;
   delete class_handler;
@@ -3807,23 +3824,16 @@ void OSD::recursive_remove_collection(CephContext* cct,
 
 PGRef OSD::_open_pg(
   OSDMapRef createmap,
-  OSDMapRef servicemap,
   spg_t pgid)
 {
-  PG* pg = _make_pg(createmap, pgid);
+  PGRef pg = _make_pg(createmap, pgid);
   {
     RWLock::WLocker l(pg_map_lock);
     assert(pg_map.count(pgid) == 0);
-    pg_map[pgid] = pg;
+    pg_map[pgid] = pg.get();
     pg_map_size = pg_map.size();
     pg->get("PGMap");  // because it's in pg_map
     service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
-
-    // make sure we register any splits that happened between when the pg
-    // was created and our latest map.
-    set<spg_t> new_children;
-    service.init_splits_between(pgid, createmap, servicemap, &new_children);
-    op_shardedwq.prime_splits(new_children);
   }
   return pg;
 }
@@ -3957,7 +3967,7 @@ void OSD::load_pgs()
       continue;
     }
 
-    PG *pg = NULL;
+    PGRef pg;
     if (map_epoch > 0) {
       OSDMapRef pgosdmap = service.try_get_map(map_epoch);
       if (!pgosdmap) {
@@ -3975,9 +3985,9 @@ void OSD::load_pgs()
          assert(0 == "Missing map in load_pgs");
        }
       }
-      pg = _open_pg(pgosdmap, osdmap, pgid);
+      pg = _open_pg(pgosdmap, pgid);
     } else {
-      pg = _open_pg(osdmap, osdmap, pgid);
+      pg = _open_pg(osdmap, pgid);
     }
     // there can be no waiters here, so we don't call wake_pg_waiters
 
@@ -4052,7 +4062,19 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
     role = -1;
   }
 
-  PGRef pg = _open_pg(createmap, osdmap, pgid);
+  PGRef pg = _open_pg(createmap, pgid);
+
+  // We need to avoid racing with consume_map().  This should get
+  // redone as a structured waterfall of incoming osdmaps from osd ->
+  // shard, and something here that gets the to-be-split pg slots
+  // primed, even when they land on other shards.  (I think it will be
+  // iterative to handle the case where it races with newer incoming
+  // maps.)  For now, just cross our fingers. FIXME
+  {
+    set<spg_t> new_children;
+    service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children);
+    op_shardedwq.prime_splits(new_children);
+  }
 
   pg->lock(true);
 
@@ -4076,7 +4098,7 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
 
   dispatch_context(rctx, pg.get(), osdmap, nullptr);
 
-  dout(10) << *pg << " is new" << dendl;
+  dout(10) << __func__ << " new pg " << *pg << dendl;
   return pg;
 }
 
@@ -9453,8 +9475,8 @@ int OSD::init_op_flags(OpRequestRef& op)
 
 void OSD::ShardedOpWQ::_wake_pg_slot(
   spg_t pgid,
-  ShardData *sdata,
-  ShardData::pg_slot& slot)
+  OSDShard *sdata,
+  OSDShard::pg_slot& slot)
 {
   dout(20) << __func__ << " " << pgid
           << " to_process " << slot.to_process
@@ -9488,8 +9510,8 @@ void OSD::ShardedOpWQ::_wake_pg_slot(
 
 void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
 {
-  uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
-  auto sdata = shard_list[shard_index];
+  uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
+  auto sdata = osd->shards[shard_index];
   bool queued = false;
   {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
@@ -9510,10 +9532,10 @@ void OSD::ShardedOpWQ::prime_splits(const set<spg_t>& pgs)
 {
   dout(20) << __func__ << " " << pgs << dendl;
   for (auto pgid : pgs) {
-    unsigned shard_index = pgid.hash_to_shard(shard_list.size());
-    ShardData* sdata = shard_list[shard_index];
+    unsigned shard_index = pgid.hash_to_shard(osd->shards.size());
+    OSDShard* sdata = osd->shards[shard_index];
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
-    ShardData::pg_slot& slot = sdata->pg_slots[pgid];
+    OSDShard::pg_slot& slot = sdata->pg_slots[pgid];
     slot.waiting_for_split = true;
   }
 }
@@ -9522,12 +9544,12 @@ void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
 {
   unsigned pushes_to_free = 0;
   bool queued = false;
-  for (auto sdata : shard_list) {
+  for (auto sdata : osd->shards) {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     sdata->waiting_for_pg_osdmap = osdmap;
     auto p = sdata->pg_slots.begin();
     while (p != sdata->pg_slots.end()) {
-      ShardData::pg_slot& slot = p->second;
+      OSDShard::pg_slot& slot = p->second;
       if (slot.waiting_for_split) {
        dout(20) << __func__ << "  " << p->first
                 << " waiting for split" << dendl;
@@ -9588,8 +9610,8 @@ void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami)
 void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg)
 {
   spg_t pgid = pg->get_pgid();
-  uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
-  auto sdata = shard_list[shard_index];
+  uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
+  auto sdata = osd->shards[shard_index];
   Mutex::Locker l(sdata->sdata_op_ordering_lock);
   auto p = sdata->pg_slots.find(pgid);
   if (p != sdata->pg_slots.end()) {
@@ -9602,7 +9624,7 @@ void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg)
 
 void OSD::ShardedOpWQ::clear_pg_slots()
 {
-  for (auto sdata : shard_list) {
+  for (auto sdata : osd->shards) {
     Mutex::Locker l(sdata->sdata_op_ordering_lock);
     sdata->pg_slots.clear();
     sdata->waiting_for_pg_osdmap.reset();
@@ -9612,7 +9634,7 @@ void OSD::ShardedOpWQ::clear_pg_slots()
 
 void OSD::ShardedOpWQ::_add_slot_waiter(
   spg_t pgid,
-  OSD::ShardedOpWQ::ShardData::pg_slot& slot,
+  OSDShard::pg_slot& slot,
   OpQueueItem&& qi)
 {
   if (qi.is_peering()) {
@@ -9635,8 +9657,8 @@ void OSD::ShardedOpWQ::_add_slot_waiter(
 
 void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 {
-  uint32_t shard_index = thread_index % num_shards;
-  auto& sdata = shard_list[shard_index];
+  uint32_t shard_index = thread_index % osd->num_shards;
+  auto& sdata = osd->shards[shard_index];
   assert(sdata);
   // peek at spg_t
   sdata->sdata_op_ordering_lock.Lock();
@@ -9806,6 +9828,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
            pg = osd->handle_pg_create_info(osdmap, create_info);
            if (pg) {
              // we created the pg! drop out and continue "normally"!
+             slot.pg = pg;           // install in shard slot
              _wake_pg_slot(token, sdata, slot);
              break;
            }
@@ -9905,9 +9928,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
 
 void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
   uint32_t shard_index =
-    item.get_ordering_token().hash_to_shard(shard_list.size());
+    item.get_ordering_token().hash_to_shard(osd->shards.size());
 
-  ShardData* sdata = shard_list[shard_index];
+  OSDShard* sdata = osd->shards[shard_index];
   assert (NULL != sdata);
   unsigned priority = item.get_priority();
   unsigned cost = item.get_cost();
@@ -9930,8 +9953,8 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
 
 void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
 {
-  auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size());
-  auto& sdata = shard_list[shard_index];
+  auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
+  auto& sdata = osd->shards[shard_index];
   assert(sdata);
   sdata->sdata_op_ordering_lock.Lock();
   auto p = sdata->pg_slots.find(item.get_ordering_token());
@@ -9984,18 +10007,18 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,
 }} // namespace ceph::osd_cmds
 
 
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
+std::ostream& operator<<(std::ostream& out, const io_queue& q) {
   switch(q) {
-  case OSD::io_queue::prioritized:
+  case io_queue::prioritized:
     out << "prioritized";
     break;
-  case OSD::io_queue::weightedpriority:
+  case io_queue::weightedpriority:
     out << "weightedpriority";
     break;
-  case OSD::io_queue::mclock_opclass:
+  case io_queue::mclock_opclass:
     out << "mclock_opclass";
     break;
-  case OSD::io_queue::mclock_client:
+  case io_queue::mclock_client:
     out << "mclock_client";
     break;
   }
index 30ab8eec00f270944af18808e1eed9b4736a5187..6568edaba61dae17332c5b57e9abbc95f1b064fd 100644 (file)
@@ -1106,6 +1106,86 @@ public:
   ~OSDService();
 };
 
+
+enum class io_queue {
+  prioritized,
+  weightedpriority,
+  mclock_opclass,
+  mclock_client,
+};
+
+struct OSDShard {
+  Mutex sdata_lock;
+  Cond sdata_cond;
+
+  Mutex sdata_op_ordering_lock;   ///< protects all members below
+
+  OSDMapRef waiting_for_pg_osdmap;
+
+  struct pg_slot {
+    PGRef pg;                     ///< cached pg reference [optional]
+    deque<OpQueueItem> to_process; ///< order items for this slot
+    int num_running = 0;          ///< _process threads doing pg lookup/lock
+
+    deque<OpQueueItem> waiting;   ///< waiting for pg (or map + pg)
+
+    /// waiting for map (peering evt)
+    map<epoch_t,deque<OpQueueItem>> waiting_peering;
+
+    /// incremented by wake_pg_waiters; indicates racing _process threads
+    /// should bail out (their op has been requeued)
+    uint64_t requeue_seq = 0;
+
+    /// waiting for split child to materialize
+    bool waiting_for_split = false;
+  };
+
+  /// map of slots for each spg_t.  maintains ordering of items dequeued
+  /// from pqueue while _process thread drops shard lock to acquire the
+  /// pg lock.  slots are removed only by prune_or_wake_pg_waiters.
+  unordered_map<spg_t,pg_slot> pg_slots;
+
+  /// priority queue
+  std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+
+  bool stop_waiting = false;
+
+  void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
+    unsigned priority = item.get_priority();
+    unsigned cost = item.get_cost();
+    if (priority >= cutoff)
+      pqueue->enqueue_strict_front(
+       item.get_owner(),
+       priority, std::move(item));
+    else
+      pqueue->enqueue_front(
+       item.get_owner(),
+       priority, cost, std::move(item));
+  }
+
+  OSDShard(
+    string lock_name, string ordering_lock,
+    uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+    io_queue opqueue)
+    : sdata_lock(lock_name.c_str(), false, true, false, cct),
+      sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+                            false, cct) {
+    if (opqueue == io_queue::weightedpriority) {
+      pqueue = std::make_unique<
+       WeightedPriorityQueue<OpQueueItem,uint64_t>>(
+         max_tok_per_prio, min_cost);
+    } else if (opqueue == io_queue::prioritized) {
+      pqueue = std::make_unique<
+       PrioritizedQueue<OpQueueItem,uint64_t>>(
+         max_tok_per_prio, min_cost);
+    } else if (opqueue == io_queue::mclock_opclass) {
+      pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
+    } else if (opqueue == io_queue::mclock_client) {
+      pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
+    }
+  }
+};
+
 class OSD : public Dispatcher,
            public md_config_obs_t {
   /** OSD **/
@@ -1527,13 +1607,7 @@ private:
   friend struct C_OpenPGs;
 
   // -- op queue --
-  enum class io_queue {
-    prioritized,
-    weightedpriority,
-    mclock_opclass,
-    mclock_client,
-  };
-  friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+  friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
 
   const io_queue op_queue;
   const unsigned int op_prio_cutoff;
@@ -1566,119 +1640,26 @@ private:
   class ShardedOpWQ
     : public ShardedThreadPool::ShardedWQ<OpQueueItem>
   {
-    struct ShardData {
-      Mutex sdata_lock;
-      Cond sdata_cond;
-
-      Mutex sdata_op_ordering_lock;   ///< protects all members below
-
-      OSDMapRef waiting_for_pg_osdmap;
-      struct pg_slot {
-       PGRef pg;                     ///< cached pg reference [optional]
-       deque<OpQueueItem> to_process; ///< order items for this slot
-       int num_running = 0;          ///< _process threads doing pg lookup/lock
-
-       deque<OpQueueItem> waiting;   ///< waiting for pg (or map + pg)
-
-       /// waiting for map (peering evt)
-       map<epoch_t,deque<OpQueueItem>> waiting_peering;
-
-       /// incremented by wake_pg_waiters; indicates racing _process threads
-       /// should bail out (their op has been requeued)
-       uint64_t requeue_seq = 0;
-
-       /// waiting for split child to materialize
-       bool waiting_for_split = false;
-      };
-
-      /// map of slots for each spg_t.  maintains ordering of items dequeued
-      /// from pqueue while _process thread drops shard lock to acquire the
-      /// pg lock.  slots are removed only by prune_or_wake_pg_waiters.
-      unordered_map<spg_t,pg_slot> pg_slots;
-
-      /// priority queue
-      std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
-
-      bool stop_waiting = false;
-
-      void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
-       unsigned priority = item.get_priority();
-       unsigned cost = item.get_cost();
-       if (priority >= cutoff)
-         pqueue->enqueue_strict_front(
-           item.get_owner(),
-           priority, std::move(item));
-       else
-         pqueue->enqueue_front(
-           item.get_owner(),
-           priority, cost, std::move(item));
-      }
-
-      ShardData(
-       string lock_name, string ordering_lock,
-       uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
-       io_queue opqueue)
-       : sdata_lock(lock_name.c_str(), false, true, false, cct),
-         sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
-                                false, cct) {
-       if (opqueue == io_queue::weightedpriority) {
-         pqueue = std::make_unique<
-           WeightedPriorityQueue<OpQueueItem,uint64_t>>(
-               max_tok_per_prio, min_cost);
-       } else if (opqueue == io_queue::prioritized) {
-         pqueue = std::make_unique<
-           PrioritizedQueue<OpQueueItem,uint64_t>>(
-               max_tok_per_prio, min_cost);
-       } else if (opqueue == io_queue::mclock_opclass) {
-         pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
-       } else if (opqueue == io_queue::mclock_client) {
-         pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
-       }
-      }
-    }; // struct ShardData
-
-    vector<ShardData*> shard_list;
     OSD *osd;
-    uint32_t num_shards;
 
   public:
-    ShardedOpWQ(uint32_t pnum_shards,
-               OSD *o,
+    ShardedOpWQ(OSD *o,
                time_t ti,
                time_t si,
                ShardedThreadPool* tp)
       : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
-        osd(o),
-        num_shards(pnum_shards) {
-      for (uint32_t i = 0; i < num_shards; i++) {
-       char lock_name[32] = {0};
-       snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
-       char order_lock[32] = {0};
-       snprintf(order_lock, sizeof(order_lock), "%s.%d",
-                "OSD:ShardedOpWQ:order:", i);
-       ShardData* one_shard = new ShardData(
-         lock_name, order_lock,
-         osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
-         osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
-       shard_list.push_back(one_shard);
-      }
-    }
-    ~ShardedOpWQ() override {
-      while (!shard_list.empty()) {
-       delete shard_list.back();
-       shard_list.pop_back();
-      }
+        osd(o) {
     }
 
     void _add_slot_waiter(
       spg_t token,
-      ShardData::pg_slot& slot,
+      OSDShard::pg_slot& slot,
       OpQueueItem&& qi);
 
     /// wake any pg waiters after a PG is split
     void wake_pg_split_waiters(spg_t pgid);
 
-    void _wake_pg_slot(spg_t pgid, ShardData *sdata, ShardData::pg_slot& slot);
+    void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot);
 
     /// prime slots for splitting pgs
     void prime_splits(const set<spg_t>& pgs);
@@ -1702,8 +1683,8 @@ private:
     void _enqueue_front(OpQueueItem&& item) override;
       
     void return_waiting_threads() override {
-      for(uint32_t i = 0; i < num_shards; i++) {
-       ShardData* sdata = shard_list[i];
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       OSDShard* sdata = osd->shards[i];
        assert (NULL != sdata); 
        sdata->sdata_lock.Lock();
        sdata->stop_waiting = true;
@@ -1713,8 +1694,8 @@ private:
     }
 
     void stop_return_waiting_threads() override {
-      for(uint32_t i = 0; i < num_shards; i++) {
-       ShardData* sdata = shard_list[i];
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       OSDShard* sdata = osd->shards[i];
        assert (NULL != sdata);
        sdata->sdata_lock.Lock();
        sdata->stop_waiting = false;
@@ -1723,8 +1704,8 @@ private:
     }
 
     void dump(Formatter *f) {
-      for(uint32_t i = 0; i < num_shards; i++) {
-       auto &&sdata = shard_list[i];
+      for(uint32_t i = 0; i < osd->num_shards; i++) {
+       auto &&sdata = osd->shards[i];
 
        char queue_name[32] = {0};
        snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i);
@@ -1767,8 +1748,8 @@ private:
     };
 
     bool is_shard_empty(uint32_t thread_index) override {
-      uint32_t shard_index = thread_index % num_shards; 
-      auto &&sdata = shard_list[shard_index];
+      uint32_t shard_index = thread_index % osd->num_shards;
+      auto &&sdata = osd->shards[shard_index];
       assert(sdata);
       Mutex::Locker l(sdata->sdata_op_ordering_lock);
       return sdata->pqueue->empty();
@@ -1851,6 +1832,11 @@ private:
     return service.add_map_inc_bl(e, bl);
   }
 
+public:
+  // -- shards --
+  vector<OSDShard*> shards;
+  uint32_t num_shards = 0;
+
 protected:
   // -- placement groups --
   RWLock pg_map_lock; // this lock orders *above* individual PG _locks
@@ -1880,7 +1866,6 @@ public:
 protected:
   PGRef _open_pg(
     OSDMapRef createmap,   ///< map pg is created in
-    OSDMapRef servicemap,  ///< latest service map
     spg_t pg);
 
   PG* _make_pg(OSDMapRef createmap, spg_t pgid);
@@ -2265,7 +2250,7 @@ public:
 };
 
 
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+std::ostream& operator<<(std::ostream& out, const io_queue& q);
 
 
 //compatibility of the executable