]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: restructure deferred writes
authorSage Weil <sage@redhat.com>
Tue, 11 Apr 2017 21:54:57 +0000 (17:54 -0400)
committerSage Weil <sage@redhat.com>
Wed, 26 Apr 2017 20:03:35 +0000 (16:03 -0400)
Explicitly aggregate deferred writes into a batch.  When we
submit, take the opportunity to coalesce contiguous writes.
Handle aio completion independently from the original txcs.

Note that this paves the way for a few additional steps:

1- we could make deallocations cancel deferred writes.
2- we could drop the txc deferred states entirely and rely on
the explicit deferred write batch machinery instead... if we
build an alternative way to complete the SharedBlob writes
and ensure the lifecycle issue are dealt with.  (I'm not sure
it would be worth it, but it might be.)

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index b08a7bcab8ed3a8da966ff273a806fad8ac0076d..bf9a00b26a646b571b9a780e3f5e8a734d4af6e6 100644 (file)
@@ -2887,8 +2887,110 @@ bool BlueStore::WriteContext::has_conflict(
   return false;
 }
  
- // =======================================================
+// =======================================================
+
+// DeferredBatch
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.DeferredBatch(" << this << ") "
+
+void BlueStore::DeferredBatch::prepare_write(
+  CephContext *cct,
+  uint64_t seq, uint64_t offset, uint64_t length,
+  bufferlist::const_iterator& blp)
+{
+  _discard(cct, offset, length);
+  auto i = iomap.insert(make_pair(offset, deferred_io()));
+  assert(i.second);  // this should be a new insertion
+  i.first->second.seq = seq;
+  blp.copy(length, i.first->second.bl);
+  dout(20) << __func__ << " seq " << seq
+          << " 0x" << std::hex << offset << "~" << length
+          << " crc " << i.first->second.bl.crc32c(-1)
+          << std::dec << dendl;
+  seq_bytes[seq] += length;
+#ifdef DEBUG_DEFERRED
+  _audit(cct);
+#endif
+}
+
+void BlueStore::DeferredBatch::_discard(
+  CephContext *cct, uint64_t offset, uint64_t length)
+{
+  generic_dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
+                  << std::dec << dendl;
+  auto p = iomap.lower_bound(offset);
+  if (p != iomap.begin()) {
+    --p;
+    auto end = p->first + p->second.bl.length();
+    if (end > offset) {
+      bufferlist head;
+      head.substr_of(p->second.bl, 0, offset - p->first);
+      dout(20) << __func__ << "  keep head " << p->second.seq
+              << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+              << " -> 0x" << head.length() << std::dec << dendl;
+      auto i = seq_bytes.find(p->second.seq);
+      if (end > offset + length) {
+       bufferlist tail;
+       tail.substr_of(p->second.bl, offset + length - p->first,
+                      end - (offset + length));
+       dout(20) << __func__ << "  keep tail " << p->second.seq
+                << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+                << " -> 0x" << tail.length() << std::dec << dendl;
+       auto &n = iomap[offset + length];
+       n.bl.swap(tail);
+       n.seq = p->second.seq;
+       i->second -= length;
+      } else {
+       i->second -= end - offset;
+      }
+      p->second.bl.swap(head);
+    }
+    ++p;
+  }
+  while (p != iomap.end()) {
+    if (p->first >= offset + length) {
+      break;
+    }
+    auto i = seq_bytes.find(p->second.seq);
+    auto end = p->first + p->second.bl.length();
+    if (end > offset + length) {
+      unsigned drop_front = offset + length - p->first;
+      unsigned keep_tail = end - (offset + length);
+      dout(20) << __func__ << "  truncate front " << p->second.seq
+              << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+              << " drop_front 0x" << drop_front << " keep_tail 0x" << keep_tail
+              << " to 0x" << (offset + length) << "~" << keep_tail
+              << std::dec << dendl;
+      auto &s = iomap[offset + length];
+      s.seq = p->second.seq;
+      s.bl.substr_of(p->second.bl, drop_front, keep_tail);
+      i->second -= drop_front;
+    } else {
+      dout(20) << __func__ << "  drop " << p->second.seq
+              << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+              << std::dec << dendl;
+      i->second -= p->second.bl.length();
+    }
+    p = iomap.erase(p);
+  }
+}
+
+void BlueStore::DeferredBatch::_audit(CephContext *cct)
+{
+  map<uint64_t,int> sb;
+  for (auto p : seq_bytes) {
+    sb[p.first] = 0;  // make sure we have the same set of keys
+  }
+  uint64_t pos = 0;
+  for (auto& p : iomap) {
+    assert(p.first >= pos);
+    sb[p.second.seq] += p.second.bl.length();
+    pos = p.first + p.second.bl.length();
+  }
+  assert(sb == seq_bytes);
+}
+
+
 // Collection
 
 #undef dout_prefix
@@ -3165,7 +3267,11 @@ void *BlueStore::MempoolThread::entry()
 static void aio_cb(void *priv, void *priv2)
 {
   BlueStore *store = static_cast<BlueStore*>(priv);
-  store->_txc_aio_finish(priv2);
+  if ((unsigned long long)priv2 & 0x1ull) {
+    store->deferred_aio_finish((void*)((unsigned long long)priv2 & ~1ull));
+  } else {
+    store->txc_aio_finish(priv2);
+  }
 }
 
 BlueStore::BlueStore(CephContext *cct, const string& path)
@@ -7416,11 +7522,6 @@ void BlueStore::_txc_state_proc(TransContext *txc)
       txc->state = TransContext::STATE_FINISHING;
       break;
 
-    case TransContext::STATE_DEFERRED_AIO_WAIT:
-      txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
-      _deferred_finish(txc);
-      return;
-
     case TransContext::STATE_DEFERRED_CLEANUP:
       txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat);
       txc->state = TransContext::STATE_FINISHING;
@@ -7767,8 +7868,8 @@ void BlueStore::_osr_drain_preceding(TransContext *txc)
   {
     // submit anything pending
     std::lock_guard<std::mutex> l(deferred_lock);
-    if (!osr->deferred_pending.empty()) {
-      _deferred_try_submit(osr);
+    if (osr->deferred_pending) {
+      _deferred_submit(osr);
     }
   }
   {
@@ -7855,7 +7956,7 @@ void BlueStore::_kv_sync_thread()
       dout(20) << __func__ << " wake" << dendl;
     } else {
       deque<TransContext*> kv_submitting;
-      deque<TransContext*> deferred_done, deferred_stable;
+      deque<DeferredBatch*> deferred_done, deferred_stable;
       dout(20) << __func__ << " committing " << kv_queue.size()
               << " submitting " << kv_queue_unsubmitted.size()
               << " deferred done " << deferred_done_queue.size()
@@ -7889,7 +7990,7 @@ void BlueStore::_kv_sync_thread()
        if (num_aios) {
          force_flush = true;
        } else if (kv_committing.empty() && kv_submitting.empty() &&
-           deferred_stable.empty()) {
+                  deferred_stable.empty()) {
          force_flush = true;  // there's nothing else to commit!
        } else if (deferred_aggressive) {
          force_flush = true;
@@ -7987,19 +8088,22 @@ void BlueStore::_kv_sync_thread()
       }
 
       // cleanup sync deferred keys
-      for (auto txc : deferred_stable) {
-       bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
-       if (!wt.released.empty()) {
-         // kraken replay compat only
-         txc->released = wt.released;
-         dout(10) << __func__ << " deferred txn has released " << txc->released
-                  << " (we just upgraded from kraken) on " << txc << dendl;
-         _txc_finalize_kv(txc, synct);
+      for (auto b : deferred_stable) {
+       for (auto& txc : b->txcs) {
+         bluestore_deferred_transaction_t& wt = *txc.deferred_txn;
+         if (!wt.released.empty()) {
+           // kraken replay compat only
+           txc.released = wt.released;
+           dout(10) << __func__ << " deferred txn has released "
+                    << txc.released
+                    << " (we just upgraded from kraken) on " << &txc << dendl;
+           _txc_finalize_kv(&txc, synct);
+         }
+         // cleanup the deferred
+         string key;
+         get_deferred_key(wt.seq, &key);
+         synct->rm_single_key(PREFIX_DEFERRED, key);
        }
-       // cleanup the deferred
-       string key;
-       get_deferred_key(wt.seq, &key);
-       synct->rm_single_key(PREFIX_DEFERRED, key);
       }
 
       // submit synct synchronously (block and wait for it to commit)
@@ -8035,10 +8139,14 @@ void BlueStore::_kv_sync_thread()
        _txc_state_proc(txc);
        kv_committing.pop_front();
       }
-      while (!deferred_stable.empty()) {
-       TransContext *txc = deferred_stable.front();
-       _txc_state_proc(txc);
-       deferred_stable.pop_front();
+      for (auto b : deferred_stable) {
+       auto p = b->txcs.begin();
+       while (p != b->txcs.end()) {
+         TransContext *txc = &*p;
+         p = b->txcs.erase(p); // unlink here because
+         _txc_state_proc(txc); // this may destroy txc
+       }
+       delete b;
       }
 
       if (!deferred_aggressive) {
@@ -8088,17 +8196,30 @@ bluestore_deferred_op_t *BlueStore::_get_deferred_op(
 
 void BlueStore::_deferred_queue(TransContext *txc)
 {
-  dout(20) << __func__ << " txc " << txc << " on " << txc->osr << dendl;
+  dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
   std::lock_guard<std::mutex> l(deferred_lock);
-  if (txc->osr->deferred_pending.empty() &&
-      txc->osr->deferred_running.empty()) {
+  if (!txc->osr->deferred_pending &&
+      !txc->osr->deferred_running) {
     deferred_queue.push_back(*txc->osr);
   }
-  txc->osr->deferred_pending.push_back(*txc);
+  if (!txc->osr->deferred_pending) {
+    txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get());
+  }
   ++deferred_queue_size;
+  txc->osr->deferred_pending->txcs.push_back(*txc);
+  bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
+  for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) {
+    const auto& op = *opi;
+    assert(op.op == bluestore_deferred_op_t::OP_WRITE);
+    bufferlist::const_iterator p = op.data.begin();
+    for (auto e : op.extents) {
+      txc->osr->deferred_pending->prepare_write(
+       cct, wt.seq, e.offset, e.length, p);
+    }
+  }
   if (deferred_aggressive &&
-      txc->osr->deferred_running.empty()) {
-    _deferred_try_submit(txc->osr.get());
+      !txc->osr->deferred_running) {
+    _deferred_submit(txc->osr.get());
   }
 }
 
@@ -8107,106 +8228,99 @@ void BlueStore::_deferred_try_submit()
   dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
           << deferred_queue_size << " txcs" << dendl;
   for (auto& osr : deferred_queue) {
-    if (osr.deferred_running.empty()) {
-      _deferred_try_submit(&osr);
+    if (!osr.deferred_running) {
+      _deferred_submit(&osr);
     }
   }
 }
 
-void BlueStore::_deferred_try_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit(OpSequencer *osr)
 {
-  dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size()
-          << " pending " << dendl;
-  assert(!osr->deferred_pending.empty());
-  assert(osr->deferred_running.empty());
+  dout(10) << __func__ << " osr " << osr
+          << " " << osr->deferred_pending->iomap.size() << " ios pending "
+          << dendl;
+  assert(osr->deferred_pending);
+  assert(!osr->deferred_running);
 
-  deferred_queue_size -= osr->deferred_pending.size();
+  auto b = osr->deferred_pending;
+  deferred_queue_size -= b->seq_bytes.size();
   assert(deferred_queue_size >= 0);
-  osr->deferred_running.swap(osr->deferred_pending);
 
-  // attach all IO to the last in the batch
-  TransContext *last = &osr->deferred_running.back();
+  osr->deferred_running = osr->deferred_pending;
+  osr->deferred_pending = nullptr;
 
-  // reverse order
-  for (auto i = osr->deferred_running.rbegin();
-       i != osr->deferred_running.rend();
-       ++i) {
-    TransContext *txc = &*i;
-    bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
-    dout(20) << __func__ << "  txc " << txc << " seq " << wt.seq << dendl;
-    txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
-    txc->state = TransContext::STATE_DEFERRED_AIO_WAIT;
-    for (auto opi = wt.ops.rbegin(); opi != wt.ops.rend(); ++opi) {
-      assert(opi->op == bluestore_deferred_op_t::OP_WRITE);
-      const auto& op = *opi;
-      uint64_t bl_offset = 0;
-      for (auto e : op.extents) {
-       interval_set<uint64_t> cur;
-       cur.insert(e.offset, e.length);
-       interval_set<uint64_t> overlap;
-       overlap.intersection_of(cur, osr->deferred_blocks);
-       cur.subtract(overlap);
-       dout(20) << __func__ << "  txc " << txc << " " << e << std::hex
-                << " overlap 0x" << overlap << " new 0x" << cur
-                << " from bl_offset 0x" << bl_offset << std::dec << dendl;
-       for (auto j = cur.begin(); j != cur.end(); ++j) {
-         bufferlist bl;
-         bl.substr_of(op.data, bl_offset + j.get_start() - e.offset,
-                      j.get_len());
-         if (!g_conf->bluestore_debug_omit_block_device_write) {
-           logger->inc(l_bluestore_deferred_write_ops);
-           logger->inc(l_bluestore_deferred_write_bytes, bl.length());
-           int r = bdev->aio_write(j.get_start(), bl, &last->ioc, false);
-           assert(r == 0);
-         }
-         txc->osr->deferred_blocks.insert(j.get_start(), j.get_len());
+  uint64_t start = 0, pos = 0;
+  bufferlist bl;
+  auto i = b->iomap.begin();
+  while (true) {
+    if (i == b->iomap.end() || i->first != pos) {
+      if (bl.length()) {
+       dout(20) << __func__ << " write 0x" << std::hex
+                << start << "~" << bl.length()
+                << " crc " << bl.crc32c(-1) << std::dec << dendl;
+       if (!g_conf->bluestore_debug_omit_block_device_write) {
+         logger->inc(l_bluestore_deferred_write_ops);
+         logger->inc(l_bluestore_deferred_write_bytes, bl.length());
+         int r = bdev->aio_write(start, bl, &b->ioc, false);
+         assert(r == 0);
        }
-        bl_offset += e.length;
       }
+      if (i == b->iomap.end()) {
+       break;
+      }
+      start = 0;
+      pos = i->first;
+      bl.clear();
+    }
+    dout(20) << __func__ << "   seq " << i->second.seq << " 0x"
+            << std::hex << pos << "~" << i->second.bl.length() << std::dec
+            << dendl;
+    if (!bl.length()) {
+      start = pos;
     }
+    pos += i->second.bl.length();
+    bl.claim_append(i->second.bl);
+    ++i;
   }
-  osr->deferred_txc = last;
-  dout(20) << __func__ << " osr " << osr << " deferred_blocks 0x" << std::hex
-          << osr->deferred_blocks << std::dec << dendl;
-  _txc_aio_submit(last);
+  bdev->aio_submit(&b->ioc);
 }
 
-int BlueStore::_deferred_finish(TransContext *txc)
+void BlueStore::_deferred_aio_finish(OpSequencer *osr)
 {
-  bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
-  dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
+  dout(10) << __func__ << " osr " << osr << dendl;
+  assert(osr->deferred_running);
+
+  DeferredBatch *b = osr->deferred_running;
+  {
+    std::lock_guard<std::mutex> l2(osr->qlock);
+    std::lock_guard<std::mutex> l(kv_lock);
+    for (auto& i : b->txcs) {
+      TransContext *txc = &i;
+      txc->state = TransContext::STATE_DEFERRED_CLEANUP;
+      txc->osr->qcond.notify_all();
+      throttle_deferred_bytes.put(txc->cost);
+    }
+    deferred_done_queue.emplace_back(b);
+  }
 
-  OpSequencer::deferred_queue_t finished;
   {
     std::lock_guard<std::mutex> l(deferred_lock);
-    assert(txc->osr->deferred_txc == txc);
-    txc->osr->deferred_blocks.clear();
-    finished.swap(txc->osr->deferred_running);
-    if (txc->osr->deferred_pending.empty()) {
-      auto q = deferred_queue.iterator_to(*txc->osr);
+    assert(osr->deferred_running == b);
+    osr->deferred_running = nullptr;
+    if (!osr->deferred_pending) {
+      auto q = deferred_queue.iterator_to(*osr);
       deferred_queue.erase(q);
     } else if (deferred_aggressive) {
-      _deferred_try_submit(txc->osr.get());
+      _deferred_submit(osr);
     }
   }
 
-  std::lock_guard<std::mutex> l2(txc->osr->qlock);
-  std::lock_guard<std::mutex> l(kv_lock);
-  for (auto& i : finished) {
-    TransContext *txc = &i;
-    txc->state = TransContext::STATE_DEFERRED_CLEANUP;
-    txc->osr->qcond.notify_all();
-    throttle_deferred_bytes.put(txc->cost);
-    deferred_done_queue.push_back(txc);
-  }
-  finished.clear();
-
   // in the normal case, do not bother waking up the kv thread; it will
   // catch us on the next commit anyway.
   if (deferred_aggressive) {
+    std::lock_guard<std::mutex> l(kv_lock);
     kv_cond.notify_one();
   }
-  return 0;
 }
 
 int BlueStore::_deferred_replay()
index f2a32b8b2df84937b33fbcb25e93ebf3c06e097f..af4b519fc72e3cfe3ac8d93481165d9cff3011df 100644 (file)
@@ -47,6 +47,7 @@ class FreelistManager;
 class BlueFS;
 
 //#define DEBUG_CACHE
+//#define DEBUG_DEFERRED
 
 enum {
   l_bluestore_first = 732430,
@@ -1377,8 +1378,7 @@ public:
       STATE_KV_QUEUED,     // queued for kv_sync_thread submission
       STATE_KV_SUBMITTED,  // submitted to kv; not yet synced
       STATE_KV_DONE,
-      STATE_DEFERRED_QUEUED,    // in deferred_queue
-      STATE_DEFERRED_AIO_WAIT,  // aio in flight, waiting for completion
+      STATE_DEFERRED_QUEUED,    // in deferred_queue (pending or running)
       STATE_DEFERRED_CLEANUP,   // remove deferred kv record
       STATE_DEFERRED_DONE,
       STATE_FINISHING,
@@ -1396,7 +1396,6 @@ public:
       case STATE_KV_SUBMITTED: return "kv_submitted";
       case STATE_KV_DONE: return "kv_done";
       case STATE_DEFERRED_QUEUED: return "deferred_queued";
-      case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait";
       case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
       case STATE_DEFERRED_DONE: return "deferred_done";
       case STATE_FINISHING: return "finishing";
@@ -1415,7 +1414,6 @@ public:
       case l_bluestore_state_kv_committing_lat: return "kv_committing";
       case l_bluestore_state_kv_done_lat: return "kv_done";
       case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
-      case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait";
       case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
       case l_bluestore_state_finishing_lat: return "finishing";
       case l_bluestore_state_done_lat: return "done";
@@ -1549,6 +1547,36 @@ public:
     }
   };
 
+  typedef boost::intrusive::list<
+    TransContext,
+    boost::intrusive::member_hook<
+      TransContext,
+      boost::intrusive::list_member_hook<>,
+      &TransContext::deferred_queue_item> > deferred_queue_t;
+
+  struct DeferredBatch {
+    struct deferred_io {
+      bufferlist bl;    ///< data
+      uint64_t seq;     ///< deferred transaction seq
+    };
+    map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
+    deferred_queue_t txcs;           ///< txcs in this batch
+    IOContext ioc;                   ///< our aios
+    /// bytes of pending io for each deferred seq (may be 0)
+    map<uint64_t,int> seq_bytes;
+
+    void _discard(CephContext *cct, uint64_t offset, uint64_t length);
+    void _audit(CephContext *cct);
+
+    DeferredBatch(CephContext *cct, OpSequencer *osr)
+      : ioc(cct, (void*)((unsigned long long)osr | 1ull)) {}
+
+    /// prepare a write
+    void prepare_write(CephContext *cct,
+                      uint64_t seq, uint64_t offset, uint64_t length,
+                      bufferlist::const_iterator& p);
+  };
+
   class OpSequencer : public Sequencer_impl {
   public:
     std::mutex qlock;
@@ -1561,19 +1589,11 @@ public:
        &TransContext::sequencer_item> > q_list_t;
     q_list_t q;  ///< transactions
 
-    typedef boost::intrusive::list<
-      TransContext,
-      boost::intrusive::member_hook<
-       TransContext,
-       boost::intrusive::list_member_hook<>,
-       &TransContext::deferred_queue_item> > deferred_queue_t;
-    deferred_queue_t deferred_pending;      ///< waiting
-    deferred_queue_t deferred_running;      ///< in flight ios
-    interval_set<uint64_t> deferred_blocks; ///< blocks in flight
-    TransContext *deferred_txc;             ///< txc carrying this batch
-
     boost::intrusive::list_member_hook<> deferred_osr_queue_item;
 
+    DeferredBatch *deferred_running = nullptr;
+    DeferredBatch *deferred_pending = nullptr;
+
     Sequencer *parent;
     BlueStore *store;
 
@@ -1773,8 +1793,8 @@ private:
   deque<TransContext*> kv_queue;             ///< ready, already submitted
   deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
   deque<TransContext*> kv_committing;        ///< currently syncing
-  deque<TransContext*> deferred_done_queue;    ///< deferred ios done
-  deque<TransContext*> deferred_stable_queue;  ///< deferred ios done + stable
+  deque<DeferredBatch*> deferred_done_queue;   ///< deferred ios done
+  deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
 
   PerfCounters *logger = nullptr;
 
@@ -1916,7 +1936,7 @@ private:
   void _txc_state_proc(TransContext *txc);
   void _txc_aio_submit(TransContext *txc);
 public:
-  void _txc_aio_finish(void *p) {
+  void txc_aio_finish(void *p) {
     _txc_state_proc(static_cast<TransContext*>(p));
   }
 private:
@@ -1947,14 +1967,19 @@ private:
   }
 
   bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
+public:
+  void deferred_aio_finish(void *priv) {
+    _deferred_aio_finish(static_cast<OpSequencer*>(priv));
+  }
+private:
   void _deferred_queue(TransContext *txc);
   void deferred_try_submit() {
     std::lock_guard<std::mutex> l(deferred_lock);
     _deferred_try_submit();
   }
   void _deferred_try_submit();
-  void _deferred_try_submit(OpSequencer *osr);
-  int _deferred_finish(TransContext *txc);
+  void _deferred_submit(OpSequencer *osr);
+  void _deferred_aio_finish(OpSequencer *osr);
   int _deferred_replay();
 
 public: