]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: queue image IO dispatch layer
authorJason Dillaman <dillaman@redhat.com>
Thu, 30 Apr 2020 17:07:33 +0000 (13:07 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 14 May 2020 15:56:45 +0000 (11:56 -0400)
This layer replaces the queue implemented by ImageRequestWQ and handles
blocking all write requests through the dispatch pipeline.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/CMakeLists.txt
src/librbd/exclusive_lock/ImageDispatch.cc
src/librbd/io/ImageDispatcher.cc
src/librbd/io/ImageDispatcher.h
src/librbd/io/ImageDispatcherInterface.h
src/librbd/io/ImageRequestWQ.cc
src/librbd/io/ImageRequestWQ.h
src/librbd/io/QueueImageDispatch.cc [new file with mode: 0644]
src/librbd/io/QueueImageDispatch.h [new file with mode: 0644]
src/test/librbd/mock/io/MockImageDispatcher.h

index 0ecd0258c930fdbf021ebac47e14b23b7812a1ce..ff6a6990807e5f5b2815122cf9a27cf8f48f7e1b 100644 (file)
@@ -84,6 +84,7 @@ set(librbd_internal_srcs
   io/ObjectDispatcher.cc
   io/ObjectRequest.cc
   io/QosImageDispatch.cc
+  io/QueueImageDispatch.cc
   io/ReadResult.cc
   io/RefreshImageDispatch.cc
   io/SimpleSchedulerObjectDispatch.cc
index f119d60af2cedc8d5dfba8fe8089b185043a39da..93c675e74f6ba9014a67d0ca63e82a726bd8f9b4 100644 (file)
@@ -9,6 +9,7 @@
 #include "librbd/ImageCtx.h"
 #include "librbd/Utils.h"
 #include "librbd/exclusive_lock/Policy.h"
+#include "librbd/io/ImageDispatcherInterface.h"
 #include "librbd/io/ImageRequestWQ.h"
 
 #define dout_subsys ceph_subsys_rbd
@@ -45,7 +46,7 @@ void ImageDispatch<I>::set_require_lock(io::Direction direction,
   // TODO
   if (blocked) {
     std::shared_lock owner_locker{m_image_ctx->owner_lock};
-    m_image_ctx->io_work_queue->block_writes(on_finish);
+    m_image_ctx->io_image_dispatcher->block_writes(on_finish);
   } else {
     on_finish->complete(0);
   }
@@ -57,7 +58,7 @@ void ImageDispatch<I>::unset_require_lock(io::Direction direction) {
 
   // TODO
   if (unblocked) {
-    m_image_ctx->io_work_queue->unblock_writes();
+    m_image_ctx->io_image_dispatcher->unblock_writes();
   }
 }
 
index d48016d8c876a59d35060383c3091439463f9651..b9e00e15c19f4c28fbeb9f94c511066eb722b2f4 100644 (file)
@@ -11,6 +11,7 @@
 #include "librbd/io/ImageDispatch.h"
 #include "librbd/io/ImageDispatchInterface.h"
 #include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/QueueImageDispatch.h"
 #include "librbd/io/QosImageDispatch.h"
 #include "librbd/io/RefreshImageDispatch.h"
 #include <boost/variant.hpp>
@@ -106,6 +107,9 @@ ImageDispatcher<I>::ImageDispatcher(I* image_ctx)
   auto image_dispatch = new ImageDispatch(image_ctx);
   this->register_dispatch(image_dispatch);
 
+  m_queue_image_dispatch = new QueueImageDispatch(image_ctx);
+  this->register_dispatch(m_queue_image_dispatch);
+
   m_qos_image_dispatch = new QosImageDispatch<I>(image_ctx);
   this->register_dispatch(m_qos_image_dispatch);
 
@@ -124,6 +128,31 @@ void ImageDispatcher<I>::apply_qos_limit(uint64_t flag, uint64_t limit,
   m_qos_image_dispatch->apply_qos_limit(flag, limit, burst);
 }
 
+template <typename I>
+bool ImageDispatcher<I>::writes_blocked() const {
+  return m_queue_image_dispatch->writes_blocked();
+}
+
+template <typename I>
+int ImageDispatcher<I>::block_writes() {
+  return m_queue_image_dispatch->block_writes();
+}
+
+template <typename I>
+void ImageDispatcher<I>::block_writes(Context *on_blocked) {
+  m_queue_image_dispatch->block_writes(on_blocked);
+}
+
+template <typename I>
+void ImageDispatcher<I>::unblock_writes() {
+  m_queue_image_dispatch->unblock_writes();
+}
+
+template <typename I>
+void ImageDispatcher<I>::wait_on_writes_unblocked(Context *on_unblocked) {
+  m_queue_image_dispatch->wait_on_writes_unblocked(on_unblocked);
+}
+
 template <typename I>
 void ImageDispatcher<I>::finish(int r, ImageDispatchLayer image_dispatch_layer,
                                 uint64_t tid) {
index 95ea6866a98e44770947b949eaf8848a74d46de5..b1de734be28b012ad13ff9c6f06263148090ee65 100644 (file)
@@ -21,6 +21,7 @@ struct ImageCtx;
 
 namespace io {
 
+template <typename> struct QueueImageDispatch;
 template <typename> struct QosImageDispatch;
 
 template <typename ImageCtxT = ImageCtx>
@@ -31,6 +32,13 @@ public:
   void apply_qos_schedule_tick_min(uint64_t tick) override;
   void apply_qos_limit(uint64_t flag, uint64_t limit, uint64_t burst) override;
 
+  bool writes_blocked() const override;
+  int block_writes() override;
+  void block_writes(Context *on_blocked) override;
+
+  void unblock_writes() override;
+  void wait_on_writes_unblocked(Context *on_unblocked) override;
+
   void finish(int r, ImageDispatchLayer image_dispatch_layer,
               uint64_t tid) override;
 
@@ -42,6 +50,7 @@ protected:
 private:
   struct SendVisitor;
 
+  QueueImageDispatch<ImageCtxT>* m_queue_image_dispatch = nullptr;
   QosImageDispatch<ImageCtxT>* m_qos_image_dispatch = nullptr;
 
 };
index 60d0c25f91c93e407478988d3c7bcfb9cb223557..a0a1e6dd4a88553081ed5c9153de3cbc212bf313 100644 (file)
@@ -21,6 +21,13 @@ public:
   virtual void apply_qos_limit(uint64_t flag, uint64_t limit,
                                uint64_t burst) = 0;
 
+  virtual bool writes_blocked() const = 0;
+  virtual int block_writes() = 0;
+  virtual void block_writes(Context *on_blocked) = 0;
+
+  virtual void unblock_writes() = 0;
+  virtual void wait_on_writes_unblocked(Context *on_unblocked) = 0;
+
   virtual void finish(int r, ImageDispatchLayer image_dispatch_layer,
                       uint64_t tid) = 0;
 };
index 13e6cd709192a3f1729aec8492cebd77fda746a1..968a54ce77521e59347e309c120d294a6fd0df5d 100644 (file)
@@ -627,13 +627,6 @@ void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
   flush_image(m_image_ctx, on_shutdown);
 }
 
-template <typename I>
-int ImageRequestWQ<I>::block_writes() {
-  C_SaferCond cond_ctx;
-  block_writes(&cond_ctx);
-  return cond_ctx.wait();
-}
-
 template <typename I>
 void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
   ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
index 1a1154410739e6f73eb1c031ba95825cf0b1716c..1783d3405491ffff65fde0389957af3a1b0888be 100644 (file)
@@ -65,7 +65,6 @@ public:
     return (m_write_blockers > 0);
   }
 
-  int block_writes();
   void block_writes(Context *on_blocked);
   void unblock_writes();
 
diff --git a/src/librbd/io/QueueImageDispatch.cc b/src/librbd/io/QueueImageDispatch.cc
new file mode 100644 (file)
index 0000000..23a087d
--- /dev/null
@@ -0,0 +1,264 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/io/QueueImageDispatch.h"
+#include "common/dout.h"
+#include "common/Cond.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Utils.h"
+#include "librbd/io/AioCompletion.h"
+#include "librbd/io/ImageDispatchSpec.h"
+#include "librbd/io/ImageRequestWQ.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::io::QueueImageDispatch: " << this \
+                           << " " << __func__ << ": "
+
+namespace librbd {
+namespace io {
+
+template <typename I>
+QueueImageDispatch<I>::QueueImageDispatch(I* image_ctx)
+  : m_image_ctx(image_ctx),
+    m_lock(ceph::make_shared_mutex(
+      util::unique_lock_name("librbd::io::QueueImageDispatch::m_lock",
+                             this))) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 5) << "ictx=" << image_ctx << dendl;
+}
+
+template <typename I>
+void QueueImageDispatch<I>::shut_down(Context* on_finish) {
+  on_finish->complete(0);
+}
+
+template <typename I>
+int QueueImageDispatch<I>::block_writes() {
+  C_SaferCond cond_ctx;
+  block_writes(&cond_ctx);
+  return cond_ctx.wait();
+}
+
+template <typename I>
+void QueueImageDispatch<I>::block_writes(Context *on_blocked) {
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock));
+  auto cct = m_image_ctx->cct;
+
+  // TODO temp
+  auto ctx = new C_Gather(cct, on_blocked);
+  m_image_ctx->io_work_queue->block_writes(ctx->new_sub());
+
+  {
+    std::unique_lock locker{m_lock};
+    ++m_write_blockers;
+    ldout(cct, 5) << m_image_ctx << ", "
+                  << "num=" << m_write_blockers << dendl;
+    if (!m_write_blocker_contexts.empty() || !m_in_flight_write_tids.empty()) {
+      ldout(cct, 5) << "waiting for in-flight writes to complete: "
+                    << "write_tids=" << m_in_flight_write_tids << dendl;
+      m_write_blocker_contexts.push_back(ctx->new_sub());
+      ctx->activate();
+      return;
+    }
+  }
+
+  // ensure that all in-flight IO is flushed
+  flush_image(ctx->new_sub());
+  ctx->activate();
+};
+
+template <typename I>
+void QueueImageDispatch<I>::unblock_writes() {
+  auto cct = m_image_ctx->cct;
+
+  Contexts waiter_contexts;
+  Contexts dispatch_contexts;
+  {
+    std::unique_lock locker{m_lock};
+    ceph_assert(m_write_blockers > 0);
+    --m_write_blockers;
+
+    ldout(cct, 5) << m_image_ctx << ", "
+                  << "num=" << m_write_blockers << dendl;
+    if (m_write_blockers == 0) {
+      std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
+      std::swap(dispatch_contexts, m_on_dispatches);
+    }
+  }
+
+  for (auto ctx : waiter_contexts) {
+    ctx->complete(0);
+  }
+
+  for (auto ctx : dispatch_contexts) {
+    ctx->complete(0);
+  }
+
+  // TODO temp
+  m_image_ctx->io_work_queue->unblock_writes();
+}
+
+template <typename I>
+void QueueImageDispatch<I>::wait_on_writes_unblocked(Context *on_unblocked) {
+  ceph_assert(ceph_mutex_is_locked(m_image_ctx->owner_lock));
+  auto cct = m_image_ctx->cct;
+
+  // TODO temp
+  auto ctx = new C_Gather(cct, on_unblocked);
+  m_image_ctx->io_work_queue->wait_on_writes_unblocked(ctx->new_sub());
+
+  {
+    std::unique_lock locker{m_lock};
+    ldout(cct, 20) << m_image_ctx << ", "
+                   << "write_blockers=" << m_write_blockers << dendl;
+    if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
+      m_unblocked_write_waiter_contexts.push_back(ctx->new_sub());
+      ctx->activate();
+      return;
+    }
+  }
+
+  ctx->activate();
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::read(
+    AioCompletion* aio_comp, Extents &&image_extents, ReadResult &&read_result,
+    int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+    std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  return enqueue(true, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::write(
+    AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+    int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+    std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::discard(
+    AioCompletion* aio_comp, Extents &&image_extents,
+    uint32_t discard_granularity_bytes, const ZTracer::Trace &parent_trace,
+    uint64_t tid, std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::write_same(
+    AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+    int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+    std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::compare_and_write(
+    AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+    bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+    const ZTracer::Trace &parent_trace, uint64_t tid,
+    std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::flush(
+    AioCompletion* aio_comp, FlushSource flush_source,
+    const ZTracer::Trace &parent_trace, uint64_t tid,
+    std::atomic<uint32_t>* image_dispatch_flags,
+    DispatchResult* dispatch_result, Context* on_dispatched) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "tid=" << tid << dendl;
+
+  if (flush_source != FLUSH_SOURCE_USER) {
+    return false;
+  }
+
+  return enqueue(false, tid, dispatch_result, on_dispatched);
+}
+
+template <typename I>
+void QueueImageDispatch<I>::handle_finished(int r, uint64_t tid) {
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << "r=" << r << ", tid=" << tid << dendl;
+
+  std::unique_lock locker{m_lock};
+  auto it = m_in_flight_write_tids.find(tid);
+  if (it == m_in_flight_write_tids.end()) {
+    // assumed to be a read op
+    return;
+  }
+  m_in_flight_write_tids.erase(it);
+
+  Contexts write_blocker_contexts;
+  if (m_in_flight_write_tids.empty()) {
+    std::swap(write_blocker_contexts, m_write_blocker_contexts);
+  }
+  locker.unlock();
+
+  for (auto ctx : write_blocker_contexts) {
+    ctx->complete(0);
+  }
+}
+
+template <typename I>
+bool QueueImageDispatch<I>::enqueue(
+    bool read_op, uint64_t tid, DispatchResult* dispatch_result,
+    Context* on_dispatched) {
+  std::unique_lock locker{m_lock};
+  if (!read_op) {
+    if (m_write_blockers > 0 || !m_on_dispatches.empty()) {
+      *dispatch_result = DISPATCH_RESULT_RESTART;
+      m_on_dispatches.push_back(on_dispatched);
+      return true;
+    }
+
+    m_in_flight_write_tids.insert(tid);
+  }
+  locker.unlock();
+
+  if (!m_image_ctx->non_blocking_aio) {
+    return false;
+  }
+
+  *dispatch_result = DISPATCH_RESULT_CONTINUE;
+  m_image_ctx->op_work_queue->queue(on_dispatched, 0);
+  return true;
+}
+
+template <typename I>
+void QueueImageDispatch<I>::flush_image(Context* on_finish) {
+  auto aio_comp = AioCompletion::create_and_start(
+    on_finish, util::get_image_ctx(m_image_ctx), librbd::io::AIO_TYPE_FLUSH);
+  auto req = ImageDispatchSpec<I>::create_flush_request(
+    *m_image_ctx, IMAGE_DISPATCH_LAYER_QUEUE, aio_comp, FLUSH_SOURCE_INTERNAL,
+    {});
+  req->send();
+}
+
+} // namespace io
+} // namespace librbd
+
+template class librbd::io::QueueImageDispatch<librbd::ImageCtx>;
diff --git a/src/librbd/io/QueueImageDispatch.h b/src/librbd/io/QueueImageDispatch.h
new file mode 100644 (file)
index 0000000..bd48f7f
--- /dev/null
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H
+#define CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H
+
+#include "librbd/io/ImageDispatchInterface.h"
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/ceph_mutex.h"
+#include "common/zipkin_trace.h"
+#include "common/Throttle.h"
+#include "librbd/io/ReadResult.h"
+#include "librbd/io/Types.h"
+#include <list>
+#include <set>
+
+struct Context;
+
+namespace librbd {
+
+struct ImageCtx;
+
+namespace io {
+
+struct AioCompletion;
+
+template <typename ImageCtxT>
+class QueueImageDispatch : public ImageDispatchInterface {
+public:
+  QueueImageDispatch(ImageCtxT* image_ctx);
+
+  ImageDispatchLayer get_dispatch_layer() const override {
+    return IMAGE_DISPATCH_LAYER_QUEUE;
+  }
+
+  void shut_down(Context* on_finish) override;
+
+  int block_writes();
+  void block_writes(Context *on_blocked);
+  void unblock_writes();
+
+  inline bool writes_blocked() const {
+    std::shared_lock locker{m_lock};
+    return (m_write_blockers > 0);
+  }
+
+  void wait_on_writes_unblocked(Context *on_unblocked);
+
+  bool read(
+      AioCompletion* aio_comp, Extents &&image_extents,
+      ReadResult &&read_result, int op_flags,
+      const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+  bool write(
+      AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+      int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+  bool discard(
+      AioCompletion* aio_comp, Extents &&image_extents,
+      uint32_t discard_granularity_bytes,
+      const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+  bool write_same(
+      AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&bl,
+      int op_flags, const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+  bool compare_and_write(
+      AioCompletion* aio_comp, Extents &&image_extents, bufferlist &&cmp_bl,
+      bufferlist &&bl, uint64_t *mismatch_offset, int op_flags,
+      const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+  bool flush(
+      AioCompletion* aio_comp, FlushSource flush_source,
+      const ZTracer::Trace &parent_trace, uint64_t tid,
+      std::atomic<uint32_t>* image_dispatch_flags,
+      DispatchResult* dispatch_result, Context* on_dispatched) override;
+
+  void handle_finished(int r, uint64_t tid) override;
+
+private:
+  typedef std::list<Context*> Contexts;
+  typedef std::set<uint64_t> Tids;
+
+  ImageCtxT* m_image_ctx;
+
+  mutable ceph::shared_mutex m_lock;
+  Contexts m_on_dispatches;
+  Tids m_in_flight_write_tids;
+
+  uint32_t m_write_blockers = 0;
+  Contexts m_write_blocker_contexts;
+  Contexts m_unblocked_write_waiter_contexts;
+
+  bool enqueue(bool read_op, uint64_t tid, DispatchResult* dispatch_result,
+               Context* on_dispatched);
+  void flush_image(Context* on_blocked);
+
+};
+
+} // namespace io
+} // namespace librbd
+
+extern template class librbd::io::QueueImageDispatch<librbd::ImageCtx>;
+
+#endif // CEPH_LIBRBD_IO_QUEUE_IMAGE_DISPATCH_H
index bd7b8428e5d071058da008d55397a051011f5148..7fe508e3d9e992afa498a294250d935cfbf85f9a 100644 (file)
@@ -29,6 +29,13 @@ public:
 
   MOCK_METHOD1(apply_qos_schedule_tick_min, void(uint64_t));
   MOCK_METHOD3(apply_qos_limit, void(uint64_t, uint64_t, uint64_t));
+
+  MOCK_CONST_METHOD0(writes_blocked, bool());
+  MOCK_METHOD0(block_writes, int());
+  MOCK_METHOD1(block_writes, void(Context*));
+
+  MOCK_METHOD0(unblock_writes, void());
+  MOCK_METHOD1(wait_on_writes_unblocked, void(Context*));
 };
 
 } // namespace io