]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
common/Finisher: only queue empty only wake up waiter.
authorJianpeng Ma <jianpeng.ma@intel.com>
Thu, 20 Sep 2018 13:52:53 +0000 (21:52 +0800)
committerJianpeng Ma <jianpeng.ma@intel.com>
Thu, 20 Sep 2018 13:52:53 +0000 (21:52 +0800)
Signed-off-by: Jianpeng Ma <jianpeng.ma@intel.com>
src/common/Finisher.h
src/common/legacy_config_opts.h
src/common/options.cc
src/os/ObjectStore.h
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/filestore/FileStore.h
src/os/kstore/KStore.h
src/os/memstore/MemStore.h
src/osd/OSD.cc
src/osd/OSD.h

index d463680a8c8cd7c62947b33ec24d837b605aa1b1..4b6d2d3c8a136da2896df3e5264b0ee10ca8b366 100644 (file)
@@ -184,4 +184,43 @@ public:
   }
 };
 
+class ContextQueue {
+  list<Context *> q;
+  std::mutex q_mutex;
+  Mutex& mutex;
+  Cond& cond;
+public:
+  ContextQueue(Mutex& mut, Cond& con) : mutex(mut), cond(con) {}
+
+  void queue(list<Context *>& ls) {
+    {
+      std::scoped_lock l(q_mutex);
+      if (q.empty()) {
+       q.swap(ls);
+      } else {
+       q.insert(q.end(), ls.begin(), ls.end());
+      }
+    }
+
+    mutex.Lock();
+    cond.SignalOne();
+    mutex.Unlock();
+
+    ls.clear();
+  }
+
+  void swap(list<Context *>& ls) {
+    ls.clear();
+    std::scoped_lock l(q_mutex);
+    if (!q.empty()) {
+      q.swap(ls);
+    }
+  }
+
+  bool empty() {
+    std::scoped_lock l(q_mutex);
+    return q.empty();
+  }
+};
+
 #endif
index 2b3fb0d62c476dc37a58b1c00084125f4226b9b1..071c627f9852a7345b87910fa13d74d69e4ff59c 100644 (file)
@@ -1070,7 +1070,6 @@ OPTION(bluestore_debug_omit_block_device_write, OPT_BOOL)
 OPTION(bluestore_debug_fsck_abort, OPT_BOOL)
 OPTION(bluestore_debug_omit_kv_commit, OPT_BOOL)
 OPTION(bluestore_debug_permit_any_bdev_label, OPT_BOOL)
-OPTION(bluestore_shard_finishers, OPT_BOOL)
 OPTION(bluestore_debug_random_read_err, OPT_DOUBLE)
 OPTION(bluestore_debug_inject_bug21040, OPT_BOOL)
 OPTION(bluestore_debug_inject_csum_err_probability, OPT_FLOAT)
index d1c295d3fb3f35337cb08a8fbfb3c125f4a16719..8d9bdcf5498411a480e27b310d516661621f2481 100644 (file)
@@ -4340,10 +4340,6 @@ std::vector<Option> get_global_options() {
     .set_default(false)
     .set_description(""),
 
-    Option("bluestore_shard_finishers", Option::TYPE_BOOL, Option::LEVEL_DEV)
-    .set_default(false)
-    .set_description(""),
-
     Option("bluestore_debug_random_read_err", Option::TYPE_FLOAT, Option::LEVEL_DEV)
     .set_default(0)
     .set_description(""),
index 129b366ea8b2956766e574194217d5b112fad653..77305025f605cbab9428232213a2c052f6568cbe 100644 (file)
@@ -51,7 +51,7 @@ namespace ceph {
  */
 
 class Logger;
-
+class ContextQueue;
 
 static inline void encode(const map<string,bufferptr> *attrset, bufferlist &bl) {
   encode(*attrset, bl);
@@ -1617,6 +1617,13 @@ public:
    */
   virtual CollectionHandle create_new_collection(const coll_t &cid) = 0;
 
+  /**
+   * set ContextQueue for a collection
+   *
+   * After that, oncommits of Transaction will queue into commit_queue.
+   * And osd ShardThread will call oncommits.
+   */
+  virtual void set_collection_commit_queue(const coll_t &cid, ContextQueue *commit_queue) = 0;
 
   /**
    * Synchronous read operations
index 8ce239ad7e7a7a48f8d2822b6f89008a5e0f32e0..70233809b59f5a0e5ff9efd66283fab358d36f5b 100644 (file)
@@ -3191,7 +3191,8 @@ BlueStore::Collection::Collection(BlueStore *store_, Cache *c, coll_t cid)
     cache(c),
     lock("BlueStore::Collection::lock", true, false),
     exists(true),
-    onode_map(c)
+    onode_map(c),
+    commit_queue(nullptr)
 {
 }
 
@@ -3841,6 +3842,7 @@ BlueStore::BlueStore(CephContext *cct, const string& path)
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
     deferred_finisher(cct, "defered_finisher", "dfin"),
+    finisher(cct, "commit_finisher", "cfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
     mempool_thread(this)
@@ -3860,6 +3862,7 @@ BlueStore::BlueStore(CephContext *cct,
                       cct->_conf->bluestore_throttle_bytes +
                       cct->_conf->bluestore_throttle_deferred_bytes),
     deferred_finisher(cct, "defered_finisher", "dfin"),
+    finisher(cct, "commit_finisher", "cfin"),
     kv_sync_thread(this),
     kv_finalize_thread(this),
     min_alloc_size(_min_alloc_size),
@@ -3873,11 +3876,6 @@ BlueStore::BlueStore(CephContext *cct,
 
 BlueStore::~BlueStore()
 {
-  for (auto f : finishers) {
-    delete f;
-  }
-  finishers.clear();
-
   cct->_conf.remove_observer(this);
   _shutdown_logger();
   ceph_assert(!mounted);
@@ -4079,23 +4077,6 @@ void BlueStore::_set_blob_size()
            << std::dec << dendl;
 }
 
-void BlueStore::_set_finisher_num()
-{
-  if (cct->_conf->bluestore_shard_finishers) {
-    if (cct->_conf->osd_op_num_shards) {
-      m_finisher_num = cct->_conf->osd_op_num_shards;
-    } else {
-      ceph_assert(bdev);
-      if (bdev->is_rotational()) {
-       m_finisher_num = cct->_conf->osd_op_num_shards_hdd;
-      } else {
-       m_finisher_num = cct->_conf->osd_op_num_shards_ssd;
-      }
-    }
-  }
-  ceph_assert(m_finisher_num != 0);
-}
-
 int BlueStore::_set_cache_sizes()
 {
   ceph_assert(bdev);
@@ -6109,7 +6090,7 @@ int BlueStore::_fsck(bool deep, bool repair)
 
   mempool_thread.init();
 
-  // we need finishers and kv_{sync,finalize}_thread *just* for replay
+  // we need finisher and kv_{sync,finalize}_thread *just* for replay
   _kv_start();
   r = _deferred_replay();
   _kv_stop();
@@ -7250,6 +7231,21 @@ ObjectStore::CollectionHandle BlueStore::create_new_collection(
   return c;
 }
 
+void BlueStore::set_collection_commit_queue(
+    const coll_t& cid,
+    ContextQueue *commit_queue)
+{
+  if (commit_queue) {
+    RWLock::RLocker l(coll_lock);
+    if (coll_map.count(cid)) {
+      coll_map[cid]->commit_queue = commit_queue;
+    } else if (new_coll_map.count(cid)) {
+      new_coll_map[cid]->commit_queue = commit_queue;
+    }
+  }
+}
+
+
 bool BlueStore::exists(CollectionHandle &c_, const ghobject_t& oid)
 {
   Collection *c = static_cast<Collection *>(c_.get());
@@ -8552,8 +8548,6 @@ int BlueStore::_open_super_meta()
   _set_compression();
   _set_blob_size();
 
-  _set_finisher_num();
-
   _validate_bdev();
   return 0;
 }
@@ -8934,7 +8928,11 @@ void BlueStore::_txc_committed_kv(TransContext *txc)
   {
     std::lock_guard<std::mutex> l(txc->osr->qlock);
     txc->state = TransContext::STATE_KV_DONE;
-    finishers[txc->osr->shard]->queue(txc->oncommits);
+    if (txc->ch->commit_queue) {
+      txc->ch->commit_queue->queue(txc->oncommits);
+    } else {
+      finisher.queue(txc->oncommits);
+    }
   }
   txc->log_state_latency(logger, l_bluestore_state_kv_committing_lat);
   logger->tinc(l_bluestore_commit_lat, ceph_clock_now() - txc->start);
@@ -9053,7 +9051,6 @@ void BlueStore::_osr_attach(Collection *c)
   auto q = coll_map.find(c->cid);
   if (q != coll_map.end()) {
     c->osr = q->second->osr;
-    ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
     ldout(cct, 10) << __func__ << " " << c->cid
                   << " reusing osr " << c->osr << " from existing coll "
                   << q->second << dendl;
@@ -9062,7 +9059,6 @@ void BlueStore::_osr_attach(Collection *c)
     auto p = zombie_osr_set.find(c->cid);
     if (p == zombie_osr_set.end()) {
       c->osr = new OpSequencer(this, c->cid);
-      c->osr->shard = c->cid.hash_to_shard(m_finisher_num);
       ldout(cct, 10) << __func__ << " " << c->cid
                     << " fresh osr " << c->osr << dendl;
     } else {
@@ -9071,7 +9067,6 @@ void BlueStore::_osr_attach(Collection *c)
       ldout(cct, 10) << __func__ << " " << c->cid
                     << " resurrecting zombie osr " << c->osr << dendl;
       c->osr->zombie = false;
-      ceph_assert(c->osr->shard == c->cid.hash_to_shard(m_finisher_num));
     }
   }
 }
@@ -9199,17 +9194,8 @@ void BlueStore::_kv_start()
 {
   dout(10) << __func__ << dendl;
 
-  for (int i = 0; i < m_finisher_num; ++i) {
-    ostringstream oss;
-    oss << "finisher-" << i;
-    Finisher *f = new Finisher(cct, oss.str(), "finisher");
-    finishers.push_back(f);
-  }
-
   deferred_finisher.start();
-  for (auto f : finishers) {
-    f->start();
-  }
+  finisher.start();
   kv_sync_thread.create("bstore_kv_sync");
   kv_finalize_thread.create("bstore_kv_final");
 }
@@ -9247,10 +9233,8 @@ void BlueStore::_kv_stop()
   dout(10) << __func__ << " stopping finishers" << dendl;
   deferred_finisher.wait_for_empty();
   deferred_finisher.stop();
-  for (auto f : finishers) {
-    f->wait_for_empty();
-    f->stop();
-  }
+  finisher.wait_for_empty();
+  finisher.stop();
   dout(10) << __func__ << " stopped" << dendl;
 }
 
@@ -9873,8 +9857,12 @@ int BlueStore::queue_transactions(
   for (auto c : on_applied_sync) {
     c->complete(0);
   }
-  for (auto c : on_applied) {
-    finishers[osr->shard]->queue(c);
+  if (!on_applied.empty()) {
+    if (c->commit_queue) {
+      c->commit_queue->queue(on_applied);
+    } else {
+      finisher.queue(on_applied);
+    }
   }
 
   logger->tinc(l_bluestore_submit_lat, mono_clock::now() - start);
index bf65f6efb5dc1ab6eb383f4c3681b9712ebf664d..840bed6414492ac75ef9531203fb6e408948cc92 100644 (file)
@@ -1368,6 +1368,7 @@ public:
 
     //pool options
     pool_opts_t pool_opts;
+    ContextQueue *commit_queue;
 
     OnodeRef get_onode(const ghobject_t& oid, bool create);
 
@@ -1692,8 +1693,6 @@ public:
     BlueStore *store;
     coll_t cid;
 
-    size_t shard;
-
     uint64_t last_seq = 0;
 
     std::atomic_int txc_with_unstable_io = {0};  ///< num txcs with unstable io
@@ -1884,10 +1883,7 @@ private:
   deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
   int deferred_queue_size = 0;         ///< num txc's queued across all osrs
   atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
-  Finisher deferred_finisher;
-
-  int m_finisher_num = 1;
-  vector<Finisher*> finishers;
+  Finisher deferred_finisher, finisher;
 
   KVSyncThread kv_sync_thread;
   std::mutex kv_lock;
@@ -2410,6 +2406,8 @@ public:
 
   CollectionHandle open_collection(const coll_t &c) override;
   CollectionHandle create_new_collection(const coll_t& cid) override;
+  void set_collection_commit_queue(const coll_t& cid,
+                                  ContextQueue *commit_queue) override;
 
   bool collection_exists(const coll_t& c) override;
   int collection_empty(CollectionHandle& c, bool *empty) override;
index eb9c57de087cbf03cfe9656a1fa3ead0b694532b..b62d5bf0ca4233537ec0b16abc2456d1ca775ec1 100644 (file)
@@ -531,6 +531,9 @@ public:
 
   CollectionHandle open_collection(const coll_t& c) override;
   CollectionHandle create_new_collection(const coll_t& c) override;
+  void set_collection_commit_queue(const coll_t& cid,
+                                  ContextQueue *commit_queue) override {
+  }
 
   int queue_transactions(CollectionHandle& ch, vector<Transaction>& tls,
                         TrackedOpRef op = TrackedOpRef(),
index 100604209c2ba32e98a43b8f4885c8d8ea9c06da..d1dfca4eb584273fe052895f97f85922fb075d62 100644 (file)
@@ -443,6 +443,9 @@ public:
 
   CollectionHandle open_collection(const coll_t& c) override;
   CollectionHandle create_new_collection(const coll_t& c) override;
+  void set_collection_commit_queue(const coll_t& cid,
+                                  ContextQueue *commit_queue) override {
+  }
 
   using ObjectStore::exists;
   bool exists(CollectionHandle& c, const ghobject_t& oid) override;
index 1d184ee8b3436b264782a552a8552eb8b90925bb..59bb92d5b4a76546562f61503e1cdfe4572aaedb 100644 (file)
@@ -321,6 +321,11 @@ public:
     return get_collection(c);
   }
   CollectionHandle create_new_collection(const coll_t& c) override;
+
+  void set_collection_commit_queue(const coll_t& cid,
+                                  ContextQueue *commit_queue) override {
+  }
+
   bool collection_exists(const coll_t& c) override;
   int collection_empty(CollectionHandle& c, bool *empty) override;
   int collection_bits(CollectionHandle& c) override;
index 57f6d9fa06312fc70a91308e096e951d701bd0a3..8b9f00dd71bf809a144d98a5f0af43995332778d 100644 (file)
@@ -4078,6 +4078,11 @@ void OSD::load_pgs()
       recursive_remove_collection(cct, store, pgid, *it);
       continue;
     }
+    {
+      uint32_t shard_index = pgid.hash_to_shard(shards.size());
+      assert(NULL != shards[shard_index]);
+      store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
+    }
 
     pg->reg_next_scrub();
 
@@ -4147,6 +4152,12 @@ PGRef OSD::handle_pg_create_info(const OSDMapRef& osdmap,
   PGRef pg = _make_pg(startmap, pgid);
   pg->ch = store->create_new_collection(pg->coll);
 
+  {
+    uint32_t shard_index = pgid.hash_to_shard(shards.size());
+    assert(NULL != shards[shard_index]);
+    store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
+  }
+
   pg->lock(true);
 
   // we are holding the shard lock
@@ -8560,6 +8571,12 @@ void OSD::split_pgs(
     out_pgs->insert(child);
     child->ch = store->create_new_collection(child->coll);
 
+    {
+      uint32_t shard_index = i->hash_to_shard(shards.size());
+      assert(NULL != shards[shard_index]);
+      store->set_collection_commit_queue(child->coll, &(shards[shard_index]->context_queue));
+    }
+
     unsigned split_bits = i->get_split_bits(pg_num);
     dout(10) << " pg_num is " << pg_num
             << ", m_seed " << i->ps()
@@ -10224,7 +10241,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
   ceph_assert(sdata);
   // peek at spg_t
   sdata->shard_lock.Lock();
-  if (sdata->pqueue->empty()) {
+  if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
     sdata->sdata_wait_lock.Lock();
     if (!sdata->stop_waiting) {
       dout(20) << __func__ << " empty q, waiting" << dendl;
@@ -10233,24 +10250,38 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       sdata->sdata_cond.Wait(sdata->sdata_wait_lock);
       sdata->sdata_wait_lock.Unlock();
       sdata->shard_lock.Lock();
-      if (sdata->pqueue->empty()) {
+      if (sdata->pqueue->empty() && sdata->context_queue.empty()) {
        sdata->shard_lock.Unlock();
        return;
       }
       osd->cct->get_heartbeat_map()->reset_timeout(hb,
          osd->cct->_conf->threadpool_default_timeout, 0);
     } else {
-      dout(0) << __func__ << " need return immediately" << dendl;
+      dout(20) << __func__ << " need return immediately" << dendl;
       sdata->sdata_wait_lock.Unlock();
       sdata->shard_lock.Unlock();
       return;
     }
   }
-  OpQueueItem item = sdata->pqueue->dequeue();
+
   if (osd->is_stopping()) {
     sdata->shard_lock.Unlock();
     return;    // OSD shutdown, discard.
   }
+
+  list<Context *> oncommits;
+  if (!sdata->context_queue.empty()) {
+    sdata->context_queue.swap(oncommits);
+  }
+
+  if (sdata->pqueue->empty()) {
+    sdata->shard_lock.Unlock();
+    handle_oncommits(oncommits);
+    return;
+  }
+
+  OpQueueItem item = sdata->pqueue->dequeue();
+
   const auto token = item.get_ordering_token();
   auto r = sdata->pg_slots.emplace(token, nullptr);
   if (r.second) {
@@ -10288,6 +10319,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       dout(20) << __func__ << " slot " << token << " no longer there" << dendl;
       pg->unlock();
       sdata->shard_lock.Unlock();
+      handle_oncommits(oncommits);
       return;
     }
     slot = q->second.get();
@@ -10299,6 +10331,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
               << " nothing queued" << dendl;
       pg->unlock();
       sdata->shard_lock.Unlock();
+      handle_oncommits(oncommits);
       return;
     }
     if (requeue_seq != slot->requeue_seq) {
@@ -10308,6 +10341,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
               << dendl;
       pg->unlock();
       sdata->shard_lock.Unlock();
+      handle_oncommits(oncommits);
       return;
     }
     if (slot->pg != pg) {
@@ -10407,10 +10441,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       if (pushes_to_free > 0) {
        sdata->shard_lock.Unlock();
        osd->service.release_reserved_pushes(pushes_to_free);
+       handle_oncommits(oncommits);
        return;
       }
     }
     sdata->shard_lock.Unlock();
+    handle_oncommits(oncommits);
     return;
   }
   if (qi.is_peering()) {
@@ -10419,6 +10455,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
       _add_slot_waiter(token, slot, std::move(qi));
       sdata->shard_lock.Unlock();
       pg->unlock();
+      handle_oncommits(oncommits);
       return;
     }
   }
@@ -10465,6 +10502,8 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
     tracepoint(osd, opwq_process_finish, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
   }
+
+  handle_oncommits(oncommits);
 }
 
 void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
index cace749e139b4a0186ea1c7768556c6e510cf40b..737029eaa66241a238509ef4ee2be65c55a6dc06 100644 (file)
@@ -1166,6 +1166,8 @@ struct OSDShard {
 
   bool stop_waiting = false;
 
+  ContextQueue context_queue;
+
   void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
     unsigned priority = item.get_priority();
     unsigned cost = item.get_cost();
@@ -1223,8 +1225,8 @@ struct OSDShard {
       osdmap_lock_name(shard_name + "::osdmap_lock"),
       osdmap_lock(osdmap_lock_name.c_str(), false, false),
       shard_lock_name(shard_name + "::shard_lock"),
-      shard_lock(shard_lock_name.c_str(), false, true,
-                            false, cct) {
+      shard_lock(shard_lock_name.c_str(), false, true, false, cct),
+      context_queue(sdata_wait_lock, sdata_cond) {
     if (opqueue == io_queue::weightedpriority) {
       pqueue = std::make_unique<
        WeightedPriorityQueue<OpQueueItem,uint64_t>>(
@@ -1774,7 +1776,13 @@ protected:
       auto &&sdata = osd->shards[shard_index];
       ceph_assert(sdata);
       Mutex::Locker l(sdata->shard_lock);
-      return sdata->pqueue->empty();
+      return sdata->pqueue->empty() && sdata->context_queue.empty();
+    }
+
+    void handle_oncommits(list<Context*>& oncommits) {
+      for (auto p : oncommits) {
+       p->complete(0);
+      }
     }
   } op_shardedwq;