]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add aio_flush
authorYuan Lu <yuan.y.lu@intel.com>
Tue, 7 Apr 2020 05:09:35 +0000 (13:09 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Mon, 20 Apr 2020 06:03:45 +0000 (14:03 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
12 files changed:
src/librbd/cache/ImageCache.h
src/librbd/cache/ImageWriteback.cc
src/librbd/cache/ImageWriteback.h
src/librbd/cache/PassthroughImageCache.cc
src/librbd/cache/PassthroughImageCache.h
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/Types.cc
src/librbd/cache/rwl/Types.h
src/librbd/io/ImageRequest.cc
src/librbd/io/Types.h
src/test/librbd/mock/cache/MockImageCache.h

index 01ca6d188bf57a09e82d785b6a922fc9c3b2ae8b..896c43630210ca5f2e9fa664599bffaae65d9f93 100644 (file)
@@ -36,7 +36,7 @@ public:
   virtual void aio_discard(uint64_t offset, uint64_t length,
                            uint32_t discard_granularity_bytes,
                            Context *on_finish) = 0;
-  virtual void aio_flush(Context *on_finish) = 0;
+  virtual void aio_flush(io::FlushSource flush_source, Context *on_finish) = 0;
   virtual void aio_writesame(uint64_t offset, uint64_t length,
                              ceph::bufferlist&& bl,
                              int fadvise_flags, Context *on_finish) = 0;
index 102d02943a41708eeb99aa3a72978282dcff99ef..9965c0363c64d3914ce996b75d1cf5f7b07dd911 100644 (file)
@@ -74,14 +74,15 @@ void ImageWriteback<I>::aio_discard(uint64_t offset, uint64_t length,
 }
 
 template <typename I>
-void ImageWriteback<I>::aio_flush(Context *on_finish) {
+void ImageWriteback<I>::aio_flush(io::FlushSource flush_source, Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << "on_finish=" << on_finish << dendl;
 
   ImageCtx *image_ctx = util::get_image_ctx(&m_image_ctx);
   auto aio_comp = io::AioCompletion::create_and_start(
       on_finish, image_ctx, io::AIO_TYPE_FLUSH);
-  io::ImageFlushRequest<> req(*image_ctx, aio_comp, io::FLUSH_SOURCE_INTERNAL, {});
+  io::ImageFlushRequest<> req(*image_ctx, aio_comp, flush_source,
+                              {});
   req.set_bypass_image_cache();
   req.send();
 }
index bbcc85c8ae5439cb9046d48dd1eee623a311b604..3f62391e4ef78984c1c45ede8d6cf8fe5ad3f5da 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "include/buffer_fwd.h"
 #include "include/int_types.h"
+#include "librbd/io/Types.h"
 #include <vector>
 
 class Context;
@@ -27,7 +28,7 @@ public:
                          int fadvise_flags, Context *on_finish) = 0;
   virtual void aio_discard(uint64_t offset, uint64_t length,
                            uint32_t discard_granularity_bytes, Context *on_finish) = 0;
-  virtual void aio_flush(Context *on_finish) = 0 ;
+  virtual void aio_flush(io::FlushSource flush_source, Context *on_finish) = 0 ;
   virtual void aio_writesame(uint64_t offset, uint64_t length,
                              ceph::bufferlist&& bl,
                              int fadvise_flags, Context *on_finish) = 0;
@@ -54,7 +55,7 @@ public:
                  int fadvise_flags, Context *on_finish);
   void aio_discard(uint64_t offset, uint64_t length,
                    uint32_t discard_granularity_bytes, Context *on_finish);
-  void aio_flush(Context *on_finish);
+  void aio_flush(io::FlushSource flush_source, Context *on_finish);
   void aio_writesame(uint64_t offset, uint64_t length,
                      ceph::bufferlist&& bl,
                      int fadvise_flags, Context *on_finish);
index c3672f531a2ad75ecb00d24e3b40744387ffc21a..44d2cdb360d2e9288691a35774b6b9211de4d3d1 100644 (file)
@@ -57,11 +57,11 @@ void PassthroughImageCache<I>::aio_discard(uint64_t offset, uint64_t length,
 }
 
 template <typename I>
-void PassthroughImageCache<I>::aio_flush(Context *on_finish) {
+void PassthroughImageCache<I>::aio_flush(librbd::io::FlushSource flush_source, Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << "on_finish=" << on_finish << dendl;
 
-  m_image_writeback.aio_flush(on_finish);
+  m_image_writeback.aio_flush(flush_source, on_finish);
 }
 
 template <typename I>
@@ -126,7 +126,7 @@ void PassthroughImageCache<I>::flush(Context *on_finish) {
 
   // internal flush -- nothing to writeback but make sure
   // in-flight IO is flushed
-  aio_flush(on_finish);
+  aio_flush(librbd::io::FLUSH_SOURCE_INTERNAL, on_finish);
 }
 
 } // namespace cache
index b4b4633507f41b77ba213276dbfe417b581bdcce..1101422899ec777884a63a62a7c3242790563228 100644 (file)
@@ -30,7 +30,7 @@ public:
   void aio_discard(uint64_t offset, uint64_t length,
                    uint32_t discard_granularity_bytes,
                    Context *on_finish) override;
-  void aio_flush(Context *on_finish) override;
+  void aio_flush(librbd::io::FlushSource flush_source, Context *on_finish) override;
   void aio_writesame(uint64_t offset, uint64_t length,
                      ceph::bufferlist&& bl,
                      int fadvise_flags, Context *on_finish) override;
index 87c130f15099921f8eb47cd910e494cde3a14969..661215085d33b1083e0871616b841d2e345b0826 100644 (file)
@@ -428,14 +428,14 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
   m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
   /* Do these after we drop lock */
   later.add(new LambdaContext([this](int r) {
-        if (m_periodic_stats_enabled) {
-          /* Log stats for the first time */
-          periodic_stats();
-          /* Arm periodic stats logging for the first time */
-          std::lock_guard timer_locker(*m_timer_lock);
-          arm_periodic_stats();
-        }
-      }));
+    if (m_periodic_stats_enabled) {
+      /* Log stats for the first time */
+      periodic_stats();
+      /* Arm periodic stats logging for the first time */
+      std::lock_guard timer_locker(*m_timer_lock);
+      arm_periodic_stats();
+    }
+  }));
   m_image_ctx.op_work_queue->queue(on_finish, 0);
 }
 
@@ -467,24 +467,98 @@ void ReplicatedWriteLog<I>::init(Context *on_finish) {
 
 template <typename I>
 void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
-  // Here we only close pmem pool file and remove the pool file.
-  // TODO: We'll continue to update this part in later PRs.
-  if (m_log_pool) {
-    ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
-    pmemobj_close(m_log_pool);
-  }
-  if (m_log_is_poolset) {
-    ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << dendl;
+
+  ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
+
+  Context *ctx = new LambdaContext(
+    [this, on_finish](int r) {
+      ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
+      m_image_ctx.op_work_queue->queue(on_finish, r);
+    });
+  ctx = new LambdaContext(
+    [this, ctx](int r) {
+      Context *next_ctx = override_ctx(r, ctx);
+      bool periodic_stats_enabled = m_periodic_stats_enabled;
+      m_periodic_stats_enabled = false;
+
+      if (periodic_stats_enabled) {
+        /* Log stats one last time if they were enabled */
+        periodic_stats();
+      }
+      {
+        std::lock_guard locker(m_lock);
+        ceph_assert(m_dirty_log_entries.size() == 0);
+        m_cache_state->clean = true;
+        m_log_entries.clear();
+        if (m_log_pool) {
+          ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
+          pmemobj_close(m_log_pool);
+        }
+        if (m_cache_state->clean) {
+          if (m_log_is_poolset) {
+            ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+          } else {
+            ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << m_log_pool_name << dendl;
+            if (remove(m_log_pool_name.c_str()) != 0) {
+              lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << m_log_pool_name << "\": "
+                                     << pmemobj_errormsg() << dendl;
+            } else {
+              m_cache_state->clean = true;
+              m_cache_state->empty = true;
+              m_cache_state->present = false;
+            }
+          }
+        } else {
+          if (m_log_is_poolset) {
+            ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl;
+          } else {
+            ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << m_log_pool_name << dendl;
+          }
+        }
+        if (m_perfcounter) {
+          perf_stop();
+        }
+      }
+      update_image_cache_state(next_ctx);
+    });
+  if (m_first_free_entry == m_first_valid_entry) { //if the log entries are free.
+    m_image_ctx.op_work_queue->queue(ctx, 0);
   } else {
-    ldout(m_image_ctx.cct, 5) << "Removing empty pool file: "
-                              << m_log_pool_name << dendl;
-    if (remove(m_log_pool_name.c_str()) != 0) {
-      lderr(m_image_ctx.cct) << "failed to remove empty pool \""
-                             << m_log_pool_name << "\": "
-                             << pmemobj_errormsg() << dendl;
-    }
+    ctx = new LambdaContext(
+      [this, ctx](int r) {
+        Context *next_ctx = override_ctx(r, ctx);
+        {
+          /* Sync with process_writeback_dirty_entries() */
+          RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
+          m_shutting_down = true;
+          /* Flush all writes to OSDs (unless disabled) and wait for all
+             in-progress flush writes to complete */
+          ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
+          if (m_periodic_stats_enabled) {
+            periodic_stats();
+          }
+        }
+        flush_dirty_entries(next_ctx);
+      });
+    ctx = new LambdaContext(
+      [this, ctx](int r) {
+        Context *next_ctx = override_ctx(r, ctx);
+        ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
+        // Wait for in progress IOs to complete
+        next_ctx = util::create_async_context_callback(m_image_ctx, next_ctx);
+        m_async_op_tracker.wait_for_ops(next_ctx);
+      });
+    ctx = new LambdaContext(
+      [this, ctx](int r) {
+        ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
+        m_work_queue.queue(ctx, r);
+      });
+    /* Complete all in-flight writes before shutting down */
+    ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
+    internal_flush(ctx);
   }
-  on_finish->complete(0);
 }
 
 template <typename I>
@@ -520,7 +594,7 @@ void ReplicatedWriteLog<I>::aio_write(Extents &&image_extents,
       alloc_and_dispatch_io_req(write_req);
     });
 
-  detain_guarded_request(write_req, guarded_ctx);
+  detain_guarded_request(write_req, guarded_ctx, false);
 }
 
 template <typename I>
@@ -529,8 +603,74 @@ void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
                                         Context *on_finish) {
 }
 
+/**
+ * Aio_flush completes when all previously completed writes are
+ * flushed to persistent cache. We make a best-effort attempt to also
+ * defer until all in-progress writes complete, but we may not know
+ * about all of the writes the application considers in-progress yet,
+ * due to uncertainty in the IO submission workq (multiple WQ threads
+ * may allow out-of-order submission).
+ *
+ * This flush operation will not wait for writes deferred for overlap
+ * in the block guard.
+ */
 template <typename I>
-void ReplicatedWriteLog<I>::aio_flush(Context *on_finish) {
+void ReplicatedWriteLog<I>::aio_flush(io::FlushSource flush_source, Context *on_finish) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
+
+  if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source) {
+    internal_flush(on_finish);
+    return;
+  }
+  m_perfcounter->inc(l_librbd_rwl_aio_flush, 1);
+
+  /* May be called even if initialization fails */
+  if (!m_initialized) {
+    ldout(cct, 05) << "never initialized" << dendl;
+    /* Deadlock if completed here */
+    m_image_ctx.op_work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  {
+    std::shared_lock image_locker(m_image_ctx.image_lock);
+    if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
+      on_finish->complete(-EROFS);
+      return;
+    }
+  }
+
+  auto flush_req = make_flush_req(on_finish);
+
+  GuardedRequestFunctionContext *guarded_ctx =
+    new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
+      ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
+      ceph_assert(guard_ctx.cell);
+      flush_req->detained = guard_ctx.state.detained;
+      /* We don't call flush_req->set_cell(), because the block guard will be released here */
+      {
+        DeferredContexts post_unlock; /* Do these when the lock below is released */
+        std::lock_guard locker(m_lock);
+
+        if (!m_persist_on_flush && m_persist_on_write_until_flush) {
+          m_persist_on_flush = true;
+          ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
+        }
+
+        /*
+         * Create a new sync point if there have been writes since the last
+         * one.
+         *
+         * We do not flush the caches below the RWL here.
+         */
+        flush_new_sync_point_if_needed(flush_req, post_unlock);
+      }
+
+      release_guarded_request(guard_ctx.cell);
+    });
+
+  detain_guarded_request(flush_req, guarded_ctx, true);
 }
 
 template <typename I>
@@ -613,15 +753,15 @@ BlockGuardCell* ReplicatedWriteLog<I>::detain_guarded_request_barrier_helper(
 
 template <typename I>
 void ReplicatedWriteLog<I>::detain_guarded_request(
-  C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
+  C_BlockIORequestT *request,
+  GuardedRequestFunctionContext *guarded_ctx,
+  bool is_barrier)
 {
   BlockExtent extent;
-  bool is_barrier = false;
   if (request) {
     extent = request->image_extents_summary.block_extent();
   } else {
     extent = block_extent(whole_volume_extent());
-    is_barrier = true;
   }
   auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
   BlockGuardCell *cell = nullptr;
@@ -1507,7 +1647,7 @@ Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<Generi
                                << cpp_strerror(r) << dendl;
         ctx->complete(r);
       } else {
-        m_image_writeback.aio_flush(ctx);
+        m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
       }
     });
 
@@ -1881,7 +2021,7 @@ void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
               std::lock_guard locker(m_lock);
               ceph_assert(m_dirty_log_entries.size() == 0);
             }
-            m_image_writeback.aio_flush(next_ctx);
+            m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
           });
         ctx = new LambdaContext(
           [this, ctx](int r) {
@@ -1898,7 +2038,7 @@ void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
         auto flush_req = make_flush_req(ctx);
         flush_new_sync_point_if_needed(flush_req, on_exit);
       });
-  detain_guarded_request(nullptr, guarded_ctx);
+  detain_guarded_request(nullptr, guarded_ctx, true);
 }
 
 } // namespace cache
index 1f510286919217d958063eb6163c121f32c33f58..88fa56e5a71a612405365a3e9da05f0a5281f2f4 100644 (file)
@@ -74,7 +74,7 @@ public:
   void aio_discard(uint64_t offset, uint64_t length,
                    uint32_t discard_granularity_bytes,
                    Context *on_finish) override;
-  void aio_flush(Context *on_finish) override;
+  void aio_flush(io::FlushSource flush_source, Context *on_finish) override;
   void aio_writesame(uint64_t offset, uint64_t length,
                      ceph::bufferlist&& bl,
                      int fadvise_flags, Context *on_finish) override;
@@ -131,7 +131,9 @@ private:
 
   BlockGuardCell* detain_guarded_request_helper(rwl::GuardedRequest &req);
   BlockGuardCell* detain_guarded_request_barrier_helper(rwl::GuardedRequest &req);
-  void detain_guarded_request(C_BlockIORequestT *request, rwl::GuardedRequestFunctionContext *guarded_ctx);
+  void detain_guarded_request(C_BlockIORequestT *request,
+                              rwl::GuardedRequestFunctionContext *guarded_ctx,
+                              bool is_barrier);
 
   librbd::cache::rwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
 
@@ -177,7 +179,6 @@ private:
   uint64_t m_flushed_sync_gen = 0;
 
   bool m_persist_on_write_until_flush = true;
-  bool m_flush_seen = false;
 
   AsyncOpTracker m_async_op_tracker;
   /* Debug counters for the places m_async_op_tracker is used */
index e6f10de2114a24efcefef775fa5dc3bba60b10a0..9f73357774fc83926378e77e2ed05e960ce4b1c7 100644 (file)
@@ -111,6 +111,18 @@ BlockExtent block_extent(const io::Extent& image_extent) {
   return convert_to_block_extent(image_extent.first, image_extent.second);
 }
 
+Context * override_ctx(int r, Context *ctx) {
+  if (r < 0) {
+    /* Override next_ctx status with this error */
+    return new LambdaContext(
+      [r, ctx](int _r) {
+        ctx->complete(r);
+      });
+  } else {
+    return ctx;
+  }
+}
+
 } // namespace rwl
 } // namespace cache
 } // namespace librbd
index 768902044fa78e2f6ae882eec80bd4a9223cbe08..48df0edf4322c6430e9b91f6d879c6dfff03430b 100644 (file)
@@ -263,6 +263,8 @@ io::Extent whole_volume_extent();
 
 BlockExtent block_extent(const io::Extent& image_extent);
 
+Context * override_ctx(int r, Context *ctx);
+
 } // namespace rwl
 } // namespace cache
 } // namespace librbd
index 977182eacd1c53426c4bbaa1dd3164d8f8f89f56..0f93c95da4c0533741906ab697b559353ffd9966 100644 (file)
@@ -717,7 +717,11 @@ void ImageFlushRequest<I>::send_request() {
     });
 
   // ensure all in-flight IOs are settled if non-user flush request
-  aio_comp->async_op.flush(ctx);
+  if (m_flush_source == FLUSH_SOURCE_WRITEBACK) {
+    ctx->complete(0);
+  } else {
+    aio_comp->async_op.flush(ctx);
+  }
 
   // might be flushing during image shutdown
   if (image_ctx.perfcounter != nullptr) {
@@ -733,7 +737,7 @@ void ImageFlushRequest<I>::send_image_cache_request() {
   AioCompletion *aio_comp = this->m_aio_comp;
   aio_comp->set_request_count(1);
   C_AioRequest *req_comp = new C_AioRequest(aio_comp);
-  image_ctx.image_cache->aio_flush(req_comp);
+  image_ctx.image_cache->aio_flush(librbd::io::FLUSH_SOURCE_USER, req_comp);
 }
 
 template <typename I>
index 1742d70fb42f163f76685d8117f3fdd1d67df2f5..daa663c4cbb5da0e8828b055b1824646572754d5 100644 (file)
@@ -42,7 +42,8 @@ typedef enum {
 enum FlushSource {
   FLUSH_SOURCE_USER,
   FLUSH_SOURCE_INTERNAL,
-  FLUSH_SOURCE_SHUTDOWN
+  FLUSH_SOURCE_SHUTDOWN,
+  FLUSH_SOURCE_WRITEBACK
 };
 
 enum Direction {
index dd16a90f62e5530858c74f215b55018f5e430fa3..c9a3cc335a5c53e1462485dc94571ea288eea293 100644 (file)
@@ -5,6 +5,7 @@
 #define CEPH_TEST_LIBRBD_CACHE_MOCK_IMAGE_CACHE_H
 
 #include "gmock/gmock.h"
+#include "librbd/io/Types.h"
 #include <vector>
 
 namespace librbd {
@@ -30,6 +31,7 @@ struct MockImageCache {
 
   MOCK_METHOD4(aio_discard, void(uint64_t, uint64_t, uint32_t, Context *));
   MOCK_METHOD1(aio_flush, void(Context *));
+  MOCK_METHOD2(aio_flush, void(librbd::io::FlushSource, Context *));
   MOCK_METHOD5(aio_writesame_mock, void(uint64_t, uint64_t, ceph::bufferlist& bl,
                                         int, Context *));
   void aio_writesame(uint64_t off, uint64_t len, ceph::bufferlist&& bl,