]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: fix {nid,blobid}_max
authorSage Weil <sage@redhat.com>
Fri, 21 Oct 2016 14:23:19 +0000 (10:23 -0400)
committerSage Weil <sage@redhat.com>
Tue, 1 Nov 2016 14:30:03 +0000 (10:30 -0400)
- single case to cover increases ahead of schedule or just in time
- update global max only after txn commits, eliminating race
- drop unneeded id_lock
- improve t naming

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index 18f9eab3b43e78f1b46c450930eede1501ba0404..4ce7927c082219b7d769cbfc12edcc2ef3e95231 100644 (file)
@@ -6173,6 +6173,13 @@ void BlueStore::_txc_state_proc(TransContext *txc)
          dout(20) << __func__
                   << " last_{nid,blobid} exceeds max, submit via kv thread"
                   << dendl;
+       } else if (txc->osr->kv_committing_serially) {
+         dout(20) << __func__ << " prior txc submitted via kv thread, us too"
+                  << dendl;
+         // note: this is starvation-prone.  once we have a txc in a busy
+         // sequencer that is committing serially it is possible to keep
+         // submitting new transactions fast enough that we get stuck doing
+         // so.  the alternative is to block here... fixme?
        } else {
          _txc_finalize_kv(txc, txc->t);
          txc->kv_submitted = true;
@@ -6184,6 +6191,10 @@ void BlueStore::_txc_state_proc(TransContext *txc)
        std::lock_guard<std::mutex> l(kv_lock);
        kv_queue.push_back(txc);
        kv_cond.notify_one();
+       if (!txc->kv_submitted) {
+         kv_queue_unsubmitted.push_back(txc);
+         ++txc->osr->kv_committing_serially;
+       }
       }
       return;
     case TransContext::STATE_KV_QUEUED:
@@ -6511,7 +6522,6 @@ void BlueStore::_kv_sync_thread()
   std::unique_lock<std::mutex> l(kv_lock);
   while (true) {
     assert(kv_committing.empty());
-    assert(wal_cleaning.empty());
     if (kv_queue.empty() && wal_cleanup_queue.empty()) {
       if (kv_stop)
        break;
@@ -6520,14 +6530,19 @@ void BlueStore::_kv_sync_thread()
       kv_cond.wait(l);
       dout(20) << __func__ << " wake" << dendl;
     } else {
+      deque<TransContext*> kv_submitting;
+      deque<TransContext*> wal_cleaning;
       dout(20) << __func__ << " committing " << kv_queue.size()
+              << " submitting " << kv_queue_unsubmitted.size()
               << " cleaning " << wal_cleanup_queue.size() << dendl;
       kv_committing.swap(kv_queue);
+      kv_submitting.swap(kv_queue_unsubmitted);
       wal_cleaning.swap(wal_cleanup_queue);
       utime_t start = ceph_clock_now(NULL);
       l.unlock();
 
       dout(30) << __func__ << " committing txc " << kv_committing << dendl;
+      dout(30) << __func__ << " submitting txc " << kv_submitting << dendl;
       dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
 
       alloc->commit_start();
@@ -6535,52 +6550,42 @@ void BlueStore::_kv_sync_thread()
       // flush/barrier on block device
       bdev->flush();
 
-      uint64_t high_nid = 0, high_blobid = 0;
-      bool any_to_submit = false;
-      for (auto txc : kv_committing) {
-       if (!txc->kv_submitted) {
-         _txc_finalize_kv(txc, txc->t);
-         if (txc->last_nid > high_nid) {
-           high_nid = txc->last_nid;
-         }
-         if (txc->last_blobid > high_blobid) {
-           high_blobid = txc->last_blobid;
-         }
-          txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
-         any_to_submit = true;
-       }
-      }
-      if (any_to_submit) {
-       if (high_nid || high_blobid) {
-         TransContext *first_txc = kv_committing.front();
-         std::lock_guard<std::mutex> l(id_lock);
-         if (high_nid + g_conf->bluestore_nid_prealloc/2 > nid_max) {
-           nid_max = high_nid + g_conf->bluestore_nid_prealloc;
-           bufferlist bl;
-           ::encode(nid_max, bl);
-           first_txc->t->set(PREFIX_SUPER, "nid_max", bl);
-           dout(10) << __func__ << " nid_max now " << nid_max << dendl;
-         }
-         if (high_blobid + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
-           blobid_max = high_blobid + g_conf->bluestore_blobid_prealloc;
-           bufferlist bl;
-           ::encode(blobid_max, bl);
-           first_txc->t->set(PREFIX_SUPER, "blobid_max", bl);
-           dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
-         }
-       }
-       for (auto txc : kv_committing) {
-         if (!txc->kv_submitted) {
-           txc->kv_submitted = true;
-           int r = db->submit_transaction(txc->t);
-           assert(r == 0);
-         }
-       }
+      // we will use one final transaction to force a sync
+      KeyValueDB::Transaction synct = db->get_transaction();
+
+      // increase {nid,blobid}_max?  note that this covers both the
+      // case wehre we are approaching the max and the case we passed
+      // it.  in either case, we increase the max in the earlier txn
+      // we submit.
+      uint64_t new_nid_max = 0, new_blobid_max = 0;
+      if (nid_last + g_conf->bluestore_nid_prealloc/2 > nid_max) {
+       KeyValueDB::Transaction t =
+         kv_submitting.empty() ? synct : kv_submitting.front()->t;
+       new_nid_max = nid_last + g_conf->bluestore_nid_prealloc;
+       bufferlist bl;
+       ::encode(new_nid_max, bl);
+       t->set(PREFIX_SUPER, "nid_max", bl);
+       dout(10) << __func__ << " new_nid_max " << new_nid_max << dendl;
+      }
+      if (blobid_last + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
+       KeyValueDB::Transaction t =
+         kv_submitting.empty() ? synct : kv_submitting.front()->t;
+       new_blobid_max = blobid_last + g_conf->bluestore_blobid_prealloc;
+       bufferlist bl;
+       ::encode(new_blobid_max, bl);
+       t->set(PREFIX_SUPER, "blobid_max", bl);
+       dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
+      }
+      for (auto txc : kv_submitting) {
+       assert(!txc->kv_submitted);
+       _txc_finalize_kv(txc, txc->t);
+       txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
+       int r = db->submit_transaction(txc->t);
+       assert(r == 0);
+       --txc->osr->kv_committing_serially;
+       txc->kv_submitted = true;
       }
 
-      // one final transaction to force a sync
-      KeyValueDB::Transaction t = db->get_transaction();
-
       vector<bluestore_pextent_t> bluefs_gift_extents;
       if (bluefs) {
        int r = _balance_bluefs_freespace(&bluefs_gift_extents);
@@ -6593,7 +6598,7 @@ void BlueStore::_kv_sync_thread()
          ::encode(bluefs_extents, bl);
          dout(10) << __func__ << " bluefs_extents now 0x" << std::hex
                   << bluefs_extents << std::dec << dendl;
-         t->set(PREFIX_SUPER, "bluefs_extents", bl);
+         synct->set(PREFIX_SUPER, "bluefs_extents", bl);
        }
       }
 
@@ -6603,15 +6608,26 @@ void BlueStore::_kv_sync_thread()
            ++it) {
        bluestore_wal_transaction_t& wt =*(*it)->wal_txn;
        // kv metadata updates
-       _txc_finalize_kv(*it, t);
+       _txc_finalize_kv(*it, synct);
        // cleanup the wal
        string key;
        get_wal_key(wt.seq, &key);
-       t->rm_single_key(PREFIX_WAL, key);
+       synct->rm_single_key(PREFIX_WAL, key);
       }
-      int r = db->submit_transaction_sync(t);
+
+      // submit synct synchronously (block and wait for it to commit)
+      int r = db->submit_transaction_sync(synct);
       assert(r == 0);
 
+      if (new_nid_max) {
+       nid_max = new_nid_max;
+       dout(10) << __func__ << " nid_max now " << nid_max << dendl;
+      }
+      if (new_blobid_max) {
+       blobid_max = new_blobid_max;
+       dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
+      }
+
       utime_t finish = ceph_clock_now(NULL);
       utime_t dur = finish - start;
       dout(20) << __func__ << " committed " << kv_committing.size()
@@ -6619,6 +6635,7 @@ void BlueStore::_kv_sync_thread()
               << " in " << dur << dendl;
       while (!kv_committing.empty()) {
        TransContext *txc = kv_committing.front();
+       assert(txc->kv_submitted);
        _txc_state_proc(txc);
        kv_committing.pop_front();
       }
index 3288c2debf5351c56a06bb47d99fb0451ced5574..12ad2d007a75848ef33bbc243ddaaec94edd29b2 100644 (file)
@@ -1277,6 +1277,8 @@ public:
 
     uint64_t last_seq = 0;
 
+    std::atomic_int kv_committing_serially = {0};
+
     OpSequencer()
        //set the qlock to PTHREAD_MUTEX_RECURSIVE mode
       : parent(NULL) {
@@ -1431,7 +1433,6 @@ private:
 
   vector<Cache*> cache_shards;
 
-  std::mutex id_lock;
   std::atomic<uint64_t> nid_last = {0};
   std::atomic<uint64_t> nid_max = {0};
   std::atomic<uint64_t> blobid_last = {0};
@@ -1454,8 +1455,10 @@ private:
   std::mutex kv_lock;
   std::condition_variable kv_cond, kv_sync_cond;
   bool kv_stop;
-  deque<TransContext*> kv_queue, kv_committing;
-  deque<TransContext*> wal_cleanup_queue, wal_cleaning;
+  deque<TransContext*> kv_queue;             ///< ready, already submitted
+  deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
+  deque<TransContext*> kv_committing;        ///< currently syncing
+  deque<TransContext*> wal_cleanup_queue;    ///< wal done, ready for cleanup
 
   PerfCounters *logger;