]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: implement ordering for overlapping IOs 28952/head
authorMahati Chamarthy <mahati.chamarthy@intel.com>
Mon, 16 Sep 2019 09:00:52 +0000 (14:30 +0530)
committerJason Dillaman <dillaman@redhat.com>
Wed, 16 Oct 2019 01:09:18 +0000 (21:09 -0400)
..and block flushes until previous writes are completed

Signed-off-by: Mahati Chamarthy <mahati.chamarthy@intel.com>
src/librbd/io/ImageDispatchSpec.cc
src/librbd/io/ImageDispatchSpec.h
src/librbd/io/ImageRequestWQ.cc
src/librbd/io/ImageRequestWQ.h
src/test/librbd/io/test_mock_ImageRequestWQ.cc

index f33b8ef6aedd4a26e0385d1362525dbcb04fa8b1..a787c57f49ee80278041277578fd752a6d6dced9 100644 (file)
@@ -131,6 +131,16 @@ uint64_t ImageDispatchSpec<I>::extents_length() {
   return length;
 }
 
+template <typename I>
+const Extents& ImageDispatchSpec<I>::get_image_extents() const {
+   return this->m_image_extents;
+}
+
+template <typename I>
+uint64_t ImageDispatchSpec<I>::get_tid() {
+  return this->m_tid;
+}
+
 template <typename I>
 bool ImageDispatchSpec<I>::is_write_op() const {
   return boost::apply_visitor(IsWriteOpVisitor(), m_request);
index 60f1ea5bb804d6a890259361295678aea7e612c3..353cdd16f98225f99865600c86ae5a2dd2680112 100644 (file)
@@ -76,49 +76,49 @@ public:
     return new ImageDispatchSpec(image_ctx, aio_comp,
                                  std::move(image_extents),
                                  Read{std::move(read_result)},
-                                 op_flags, parent_trace);
+                                 op_flags, parent_trace, 0);
   }
 
   static ImageDispatchSpec* create_discard_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
-      uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace) {
+      uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace, uint64_t tid) {
     return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
                                  Discard{discard_granularity_bytes},
-                                 0, parent_trace);
+                                 0, parent_trace, tid);
   }
 
   static ImageDispatchSpec* create_write_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
-      bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+      bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
     return new ImageDispatchSpec(image_ctx, aio_comp, std::move(image_extents),
-                                 Write{std::move(bl)}, op_flags, parent_trace);
+                                 Write{std::move(bl)}, op_flags, parent_trace, tid);
   }
 
   static ImageDispatchSpec* create_write_same_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp, uint64_t off, uint64_t len,
-      bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace) {
+      bufferlist &&bl, int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
     return new ImageDispatchSpec(image_ctx, aio_comp, {{off, len}},
                                  WriteSame{std::move(bl)}, op_flags,
-                                 parent_trace);
+                                 parent_trace, tid);
   }
 
   static ImageDispatchSpec* create_compare_and_write_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
       bufferlist &&cmp_bl, bufferlist &&bl, uint64_t *mismatch_offset,
-      int op_flags, const ZTracer::Trace &parent_trace) {
+      int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid) {
     return new ImageDispatchSpec(image_ctx, aio_comp,
                                  std::move(image_extents),
                                  CompareAndWrite{std::move(cmp_bl),
                                                  std::move(bl),
                                                  mismatch_offset},
-                                 op_flags, parent_trace);
+                                 op_flags, parent_trace, tid);
   }
 
   static ImageDispatchSpec* create_flush_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp,
       FlushSource flush_source, const ZTracer::Trace &parent_trace) {
     return new ImageDispatchSpec(image_ctx, aio_comp, {}, Flush{flush_source},
-                                 0, parent_trace);
+                                 0, parent_trace, 0);
   }
 
   ~ImageDispatchSpec() {
@@ -146,6 +146,14 @@ public:
     return (m_throttled_flag & RBD_QOS_MASK) == RBD_QOS_MASK;
   }
 
+  const Extents& get_image_extents() const;
+
+  AioCompletion* get_aio_completion() const {
+    return m_aio_comp;
+  }
+
+  uint64_t get_tid();
+
 private:
   typedef boost::variant<Read,
                          Discard,
@@ -160,10 +168,10 @@ private:
 
   ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
                      Extents&& image_extents, Request&& request,
-                     int op_flags, const ZTracer::Trace& parent_trace)
+                     int op_flags, const ZTracer::Trace& parent_trace, uint64_t tid)
     : m_image_ctx(image_ctx), m_aio_comp(aio_comp),
       m_image_extents(std::move(image_extents)), m_request(std::move(request)),
-      m_op_flags(op_flags), m_parent_trace(parent_trace) {
+      m_op_flags(op_flags), m_parent_trace(parent_trace), m_tid(tid) {
     m_aio_comp->get();
   }
 
@@ -173,6 +181,7 @@ private:
   Request m_request;
   int m_op_flags;
   ZTracer::Trace m_parent_trace;
+  uint64_t m_tid;
   std::atomic<uint64_t> m_throttled_flag = 0;
 
   uint64_t extents_length();
index a3a70be4cb986740e0feb718b82926279fd6d6fe..1836eb297336e5437533b2d93bc1986ff6259ae0 100644 (file)
@@ -324,14 +324,22 @@ void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
     return;
   }
 
+  auto tid = ++m_last_tid;
+
+  {
+    std::lock_guard locker{m_lock};
+    m_queued_or_blocked_io_tids.insert(tid);
+  }
+
+  ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
+          m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
+
   std::shared_lock owner_locker{m_image_ctx.owner_lock};
   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
-    queue(ImageDispatchSpec<I>::create_write_request(
-            m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
+    queue(req);
   } else {
     c->start_op();
-    ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
-                              std::move(bl), op_flags, trace);
+    process_io(req, false);
     finish_in_flight_io();
   }
   trace.event("finish");
@@ -363,14 +371,22 @@ void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
     return;
   }
 
+  auto tid = ++m_last_tid;
+
+  {
+    std::lock_guard locker{m_lock};
+    m_queued_or_blocked_io_tids.insert(tid);
+  }
+
+  ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
+            m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
+
   std::shared_lock owner_locker{m_image_ctx.owner_lock};
   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
-    queue(ImageDispatchSpec<I>::create_discard_request(
-            m_image_ctx, c, off, len, discard_granularity_bytes, trace));
+    queue(req);
   } else {
     c->start_op();
-    ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}},
-                                 discard_granularity_bytes, trace);
+    process_io(req, false);
     finish_in_flight_io();
   }
   trace.event("finish");
@@ -398,13 +414,27 @@ void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
     return;
   }
 
+  auto tid = ++m_last_tid;
+
+  ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
+            m_image_ctx, c, FLUSH_SOURCE_USER, trace);
+
+  {
+    std::lock_guard locker{m_lock};
+    if(!m_queued_or_blocked_io_tids.empty()) {
+      ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
+      m_queued_flushes.emplace(tid, req);
+      --m_in_flight_ios;
+      return;
+    }
+  }
+
   std::shared_lock owner_locker{m_image_ctx.owner_lock};
   if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
-    queue(ImageDispatchSpec<I>::create_flush_request(
-            m_image_ctx, c, FLUSH_SOURCE_USER, trace));
+    queue(req);
   } else {
     c->start_op();
-    ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace);
+    process_io(req, false);
     finish_in_flight_io();
   }
   trace.event("finish");
@@ -436,14 +466,22 @@ void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
     return;
   }
 
+  auto tid = ++m_last_tid;
+
+  {
+    std::lock_guard locker{m_lock};
+    m_queued_or_blocked_io_tids.insert(tid);
+  }
+
+  ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
+            m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
+
   std::shared_lock owner_locker{m_image_ctx.owner_lock};
   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
-    queue(ImageDispatchSpec<I>::create_write_same_request(
-            m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
+    queue(req);
   } else {
     c->start_op();
-    ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl),
-                                  op_flags, trace);
+    process_io(req, false);
     finish_in_flight_io();
   }
   trace.event("finish");
@@ -477,21 +515,116 @@ void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
     return;
   }
 
+  auto tid = ++m_last_tid;
+
+  {
+    std::lock_guard locker{m_lock};
+    m_queued_or_blocked_io_tids.insert(tid);
+  }
+
+  ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
+            m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
+            mismatch_off, op_flags, trace, tid);
+
   std::shared_lock owner_locker{m_image_ctx.owner_lock};
   if (m_image_ctx.non_blocking_aio || writes_blocked()) {
-    queue(ImageDispatchSpec<I>::create_compare_and_write_request(
-            m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
-            mismatch_off, op_flags, trace));
+    queue(req);
   } else {
     c->start_op();
-    ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
-                                           std::move(cmp_bl), std::move(bl),
-                                           mismatch_off, op_flags, trace);
+    process_io(req, false);
     finish_in_flight_io();
   }
   trace.event("finish");
 }
 
+template <typename I>
+bool ImageRequestWQ<I>::block_overlapping_io(
+    ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "ictx=" << &m_image_ctx
+                 << "off: " << off << " len: " << len <<dendl;
+
+  if(len == 0) {
+    return false;
+  }
+
+  if (in_flight_image_extents->empty() ||
+      !in_flight_image_extents->intersects(off, len)) {
+    in_flight_image_extents->insert(off, len);
+    return false;
+  }
+
+  return true;
+}
+
+template <typename I>
+void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
+                                               uint64_t tid) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+
+  remove_in_flight_write_ios(offset, length, true, tid);
+
+  std::unique_lock locker{m_lock};
+  if (!m_blocked_ios.empty()) {
+    auto it = m_blocked_ios.begin();
+    while (it != m_blocked_ios.end()) {
+      auto next_blocked_object_ios_it = it;
+      ++next_blocked_object_ios_it;
+      auto blocked_io = *it;
+
+      if (block_overlapping_io(&m_in_flight_extents, offset, length)) {
+        break;
+      }
+      ldout(cct, 20) << "unblocking off: " << offset << ", "
+                     << "len: " << length << dendl;
+      AioCompletion *aio_comp = blocked_io->get_aio_completion();
+
+      m_blocked_ios.erase(it);
+      locker.unlock();
+      queue_unblocked_io(aio_comp, blocked_io);
+      locker.lock();
+    }
+  }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::unblock_flushes(uint64_t tid) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+  std::unique_lock locker{m_lock};
+  auto io_tid_it = m_queued_or_blocked_io_tids.begin();
+  while (true) {
+    auto it = m_queued_flushes.begin();
+    if (it == m_queued_flushes.end() ||
+        (io_tid_it != m_queued_or_blocked_io_tids.end() &&
+        *io_tid_it < it->first)) {
+      break;
+    }
+
+    auto blocked_flush = *it;
+    ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
+
+    AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
+
+    m_queued_flushes.erase(it);
+    locker.unlock();
+    queue_unblocked_io(aio_comp, blocked_flush.second);
+    locker.lock();
+  }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
+                                           ImageDispatchSpec<I> *req) {
+  if (!start_in_flight_io(comp)) {
+    return;
+  }
+
+  std::shared_lock owner_locker{m_image_ctx.owner_lock};
+  queue(req);
+}
+
 template <typename I>
 void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
@@ -787,26 +920,90 @@ void *ImageRequestWQ<I>::_void_dequeue() {
 }
 
 template <typename I>
-void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
+void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
+                                   bool non_blocking_io) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
                  << "req=" << req << dendl;
 
+  //extents are invalidated after the request is sent
+  //so gather them ahead of that
+  const auto& extents = req->get_image_extents();
+  bool write_op = req->is_write_op();
+  uint64_t tid = req->get_tid();
+  uint64_t offset;
+  uint64_t length;
+
+  if (write_op) {
+    std::lock_guard locker{m_lock};
+    offset = extents.size() ? extents.front().first : 0;
+    length = extents.size() ? extents.front().second : 0;
+    bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
+    if (blocked) {
+      ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
+                     << &m_image_ctx << ", "
+                     << "off=" <<  offset << ", len=" << length << dendl;
+      m_blocked_ios.push_back(req);
+      --m_in_flight_ios;
+      return;
+    }
+  }
+
   req->send();
 
-  finish_queued_io(req);
-  if (req->is_write_op()) {
-    finish_in_flight_write();
+  if (write_op) {
+    if (non_blocking_io) {
+      finish_in_flight_write();
+    }
+    unblock_overlapping_io(offset, length, tid);
+    unblock_flushes(tid);
   }
   delete req;
+}
 
+template <typename I>
+void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
+                 << "req=" << req << dendl;
+
+  bool write_op = req->is_write_op();
+
+  process_io(req, true);
+
+  finish_queued_io(write_op);
   finish_in_flight_io();
 }
 
 template <typename I>
-void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) {
+void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
+    bool write_op, uint64_t tid) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
+  {
+    std::lock_guard locker{m_lock};
+    if (write_op) {
+      if (length > 0) {
+        if(!m_in_flight_extents.empty()) {
+          CephContext *cct = m_image_ctx.cct;
+          ldout(cct, 20) << "erasing in flight extents with tid:" 
+                         << tid << dendl;
+          ImageExtentIntervals extents;
+          extents.insert(offset, length);
+          ImageExtentIntervals intersect;
+          intersect.intersection_of(extents, m_in_flight_extents);
+          m_in_flight_extents.subtract(intersect);
+        }
+      }
+      m_queued_or_blocked_io_tids.erase(tid);
+    }
+  }
+}
+
+template <typename I>
+void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
   std::shared_lock locker{m_lock};
-  if (req->is_write_op()) {
+  if (write_op) {
     ceph_assert(m_queued_writes > 0);
     m_queued_writes--;
   } else {
@@ -826,7 +1023,6 @@ void ImageRequestWQ<I>::finish_in_flight_write() {
       writes_blocked = true;
     }
   }
-
   if (writes_blocked) {
     flush_image(m_image_ctx, new C_BlockedWrites(this));
   }
@@ -879,7 +1075,15 @@ void ImageRequestWQ<I>::fail_in_flight_io(
     int r, ImageDispatchSpec<I> *req) {
   this->process_finish();
   req->fail(r);
-  finish_queued_io(req);
+
+  bool write_op = req->is_write_op();
+  uint64_t tid = req->get_tid();
+  const auto& extents = req->get_image_extents();
+  uint64_t offset = extents.size() ? extents.front().first : 0;
+  uint64_t length = extents.size() ? extents.front().second : 0;
+
+  finish_queued_io(write_op);
+  remove_in_flight_write_ios(offset, length, write_op, tid);
   delete req;
   finish_in_flight_io();
 }
index daa596330d82c1cbb3473381ad2daa7c74e784f1..ffb483bec803bdb772d97397bb1704f3fcb2d3fa 100644 (file)
@@ -9,9 +9,10 @@
 #include "common/Throttle.h"
 #include "common/WorkQueue.h"
 #include "librbd/io/Types.h"
-
+#include "include/interval_set.h"
 #include <list>
 #include <atomic>
+#include <vector>
 
 namespace librbd {
 
@@ -77,6 +78,7 @@ public:
   void apply_qos_schedule_tick_min(uint64_t tick);
 
   void apply_qos_limit(const uint64_t flag, uint64_t limit, uint64_t burst);
+
 protected:
   void *_void_dequeue() override;
   void process(ImageDispatchSpec<ImageCtxT> *req) override;
@@ -107,6 +109,14 @@ private:
   std::atomic<unsigned> m_io_blockers { 0 };
   std::atomic<unsigned> m_io_throttled { 0 };
 
+  typedef interval_set<uint64_t> ImageExtentIntervals;
+  ImageExtentIntervals m_in_flight_extents;
+
+  std::vector<ImageDispatchSpec<ImageCtxT>*> m_blocked_ios;
+  std::atomic<unsigned> m_last_tid { 0 };
+  std::set<uint64_t> m_queued_or_blocked_io_tids;
+  std::map<uint64_t, ImageDispatchSpec<ImageCtxT>*> m_queued_flushes;
+
   std::list<std::pair<uint64_t, TokenBucketThrottle*> > m_throttles;
   uint64_t m_qos_enabled_flag = 0;
 
@@ -126,14 +136,23 @@ private:
 
   bool needs_throttle(ImageDispatchSpec<ImageCtxT> *item);
 
-  void finish_queued_io(ImageDispatchSpec<ImageCtxT> *req);
+  void finish_queued_io(bool write_op);
+  void remove_in_flight_write_ios(uint64_t offset, uint64_t length,
+                                  bool write_op, uint64_t tid);
   void finish_in_flight_write();
 
+  void unblock_flushes(uint64_t tid);
+  bool block_overlapping_io(ImageExtentIntervals* in_flight_image_extents,
+                            uint64_t object_off, uint64_t object_len);
+  void unblock_overlapping_io(uint64_t offset, uint64_t length, uint64_t tid);
   int start_in_flight_io(AioCompletion *c);
   void finish_in_flight_io();
   void fail_in_flight_io(int r, ImageDispatchSpec<ImageCtxT> *req);
+  void process_io(ImageDispatchSpec<ImageCtxT> *req, bool non_blocking_io);
 
   void queue(ImageDispatchSpec<ImageCtxT> *req);
+  void queue_unblocked_io(AioCompletion *comp,
+                          ImageDispatchSpec<ImageCtxT> *req);
 
   void handle_acquire_lock(int r, ImageDispatchSpec<ImageCtxT> *req);
   void handle_refreshed(int r, ImageDispatchSpec<ImageCtxT> *req);
index c2e86b104347846a72fb222c94a96b564c892f6b..c5478b6be7f157d0483fbd82bfb33d0254fd53a8 100644 (file)
@@ -44,7 +44,7 @@ struct ImageDispatchSpec<librbd::MockTestImageCtx> {
   static ImageDispatchSpec* create_write_request(
       librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp,
       Extents &&image_extents, bufferlist &&bl, int op_flags,
-      const ZTracer::Trace &parent_trace) {
+      const ZTracer::Trace &parent_trace, uint64_t tid) {
     ceph_assert(s_instance != nullptr);
     s_instance->aio_comp = aio_comp;
     return s_instance;
@@ -66,6 +66,9 @@ struct ImageDispatchSpec<librbd::MockTestImageCtx> {
   MOCK_CONST_METHOD0(were_all_throttled, bool());
   MOCK_CONST_METHOD1(set_throttled, void(uint64_t));
   MOCK_CONST_METHOD2(tokens_requested, bool(uint64_t, uint64_t *));
+  MOCK_CONST_METHOD0(get_image_extents, Extents());
+  MOCK_CONST_METHOD0(get_aio_completion, AioCompletion*());
+  MOCK_CONST_METHOD0(get_tid, uint64_t());
 
   ImageDispatchSpec() {
     s_instance = this;