]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
librbd: add invalidate
authorYuan Lu <yuan.y.lu@intel.com>
Tue, 28 Apr 2020 07:34:46 +0000 (15:34 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Wed, 6 May 2020 15:30:48 +0000 (23:30 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h

index 81257398b0324145c54b536ab048c998765af790..2c4b9f945cedbfd7473d8154001933517e1048a2 100644 (file)
@@ -565,7 +565,7 @@ void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
       });
     /* Complete all in-flight writes before shutting down */
     ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
-    internal_flush(ctx);
+    internal_flush(false, ctx);
   }
 }
 
@@ -770,7 +770,7 @@ void ReplicatedWriteLog<I>::aio_flush(io::FlushSource flush_source, Context *on_
   ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
 
   if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source) {
-    internal_flush(on_finish);
+    internal_flush(false, on_finish);
     return;
   }
   m_perfcounter->inc(l_librbd_rwl_aio_flush, 1);
@@ -870,11 +870,12 @@ void ReplicatedWriteLog<I>::aio_compare_and_write(Extents &&image_extents,
 
 template <typename I>
 void ReplicatedWriteLog<I>::flush(Context *on_finish) {
-  internal_flush(on_finish);
+  internal_flush(false, on_finish);
 }
 
 template <typename I>
 void ReplicatedWriteLog<I>::invalidate(Context *on_finish) {
+  internal_flush(true, on_finish);
 }
 
 template <typename I>
@@ -1733,7 +1734,7 @@ void ReplicatedWriteLog<I>::process_work() {
       std::lock_guard locker(m_lock);
       m_wake_up_requested = false;
     }
-    if (m_alloc_failed_since_retire ||
+    if (m_alloc_failed_since_retire || m_invalidating ||
         m_bytes_allocated > high_water_bytes ||
         (m_log_entries.size() > high_water_entries)) {
       int retired = 0;
@@ -1744,12 +1745,12 @@ void ReplicatedWriteLog<I>::process_work() {
                                  << ", allocated_entries > high_water="
                                  << (m_log_entries.size() > high_water_entries)
                                  << dendl;
-      while (m_alloc_failed_since_retire ||
+      while (m_alloc_failed_since_retire || m_invalidating ||
             (m_bytes_allocated > high_water_bytes) ||
             (m_log_entries.size() > high_water_entries) ||
             (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) &&
             (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
-        if (!retire_entries((m_shutting_down ||
+        if (!retire_entries((m_shutting_down || m_invalidating ||
            (m_bytes_allocated > aggressive_high_water_bytes) ||
            (m_log_entries.size() > aggressive_high_water_entries))
             ? MAX_ALLOC_PER_TRANSACTION
@@ -1788,6 +1789,10 @@ bool ReplicatedWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log
   ldout(cct, 20) << "" << dendl;
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
 
+  if (m_invalidating) {
+    return true;
+  }
+
   /* For OWB we can flush entries with the same sync gen number (write between
    * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
    * its sync gen number is <= the lowest sync gen number carried by all the
@@ -1819,8 +1824,8 @@ bool ReplicatedWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log
 
 template <typename I>
 Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<GenericLogEntry> log_entry) {
-  //TODO handle invalidate in later PRs
   CephContext *cct = m_image_ctx.cct;
+  bool invalidating = m_invalidating; // snapshot so we behave consistently
 
   ldout(cct, 20) << "" << dendl;
   ceph_assert(m_entry_reader_lock.is_locked());
@@ -1835,7 +1840,7 @@ Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<Generi
 
   /* Flush write completion action */
   Context *ctx = new LambdaContext(
-    [this, log_entry](int r) {
+    [this, log_entry, invalidating](int r) {
       {
         std::lock_guard locker(m_lock);
         if (r < 0) {
@@ -1848,6 +1853,7 @@ Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<Generi
           m_bytes_dirty -= log_entry->bytes_dirty();
           sync_point_writer_flushed(log_entry->get_sync_point_entry());
           ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
+                                     << " invalidating=" << invalidating
                                      << dendl;
         }
         m_flush_ops_in_flight -= 1;
@@ -1867,6 +1873,9 @@ Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(std::shared_ptr<Generi
       }
     });
 
+  if (invalidating) {
+    return ctx;
+  }
   return new LambdaContext(
     [this, log_entry, ctx](int r) {
       m_image_ctx.op_work_queue->queue(new LambdaContext(
@@ -2189,7 +2198,16 @@ void ReplicatedWriteLog<I>::flush_dirty_entries(Context *on_finish) {
 }
 
 template <typename I>
-void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
+void ReplicatedWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
+  ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
+
+  if (m_perfcounter) {
+    if (invalidate) {
+      m_perfcounter->inc(l_librbd_rwl_invalidate_cache, 1);
+    } else {
+      m_perfcounter->inc(l_librbd_rwl_flush, 1);
+    }
+  }
 
   /* May be called even if initialization fails */
   if (!m_initialized) {
@@ -2205,27 +2223,32 @@ void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
    * results. */
   GuardedRequestFunctionContext *guarded_ctx =
     new GuardedRequestFunctionContext(
-      [this, on_finish](GuardedRequestFunctionContext &guard_ctx) {
+      [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
         DeferredContexts on_exit;
         ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
         ceph_assert(guard_ctx.cell);
 
         Context *ctx = new LambdaContext(
-          [this, cell=guard_ctx.cell, on_finish](int r) {
+          [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
             std::lock_guard locker(m_lock);
-            ldout(m_image_ctx.cct, 6) << "Done flush" << dendl;
+            m_invalidating = false;
+            ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
+                                      << invalidate << ")" << dendl;
             if (m_log_entries.size()) {
               ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
                                         << m_log_entries.size() << ", "
                                         << "front()=" << *m_log_entries.front()
                                         << dendl;
             }
+            if (invalidate) {
+              ceph_assert(m_log_entries.size() == 0);
+            }
             ceph_assert(m_dirty_log_entries.size() == 0);
             m_image_ctx.op_work_queue->queue(on_finish, r);
             release_guarded_request(cell);
             });
         ctx = new LambdaContext(
-          [this, ctx](int r) {
+          [this, ctx, invalidate](int r) {
             Context *next_ctx = ctx;
             if (r < 0) {
               /* Override on_finish status with this error */
@@ -2233,11 +2256,25 @@ void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
                 ctx->complete(r);
               });
             }
-            {
-              std::lock_guard locker(m_lock);
-              ceph_assert(m_dirty_log_entries.size() == 0);
+            if (invalidate) {
+              {
+                std::lock_guard locker(m_lock);
+                ceph_assert(m_dirty_log_entries.size() == 0);
+                ceph_assert(!m_invalidating);
+                ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
+                m_invalidating = true;
+              }
+              /* Discards all RWL entries */
+              while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
+              next_ctx->complete(0);
+            } else {
+              {
+                std::lock_guard locker(m_lock);
+                ceph_assert(m_dirty_log_entries.size() == 0);
+                ceph_assert(!m_invalidating);
+              }
+              m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
             }
-            m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
           });
         ctx = new LambdaContext(
           [this, ctx](int r) {
index 62c4aad1cf2d096deac900ed195c0791ec85bf00..ef22b41f959342c69c0b1acc8c5d0eadce966a05 100644 (file)
@@ -91,7 +91,7 @@ public:
   /// internal state methods
   void init(Context *on_finish) override;
   void shut_down(Context *on_finish) override;
-  void invalidate(Context *on_finish);
+  void invalidate(Context *on_finish) override;
   void flush(Context *on_finish) override;
 
   using This = ReplicatedWriteLog<ImageCtxT>;
@@ -148,6 +148,7 @@ private:
 
   std::atomic<bool> m_initialized = {false};
   std::atomic<bool> m_shutting_down = {false};
+  std::atomic<bool> m_invalidating = {false};
   PMEMobjpool *m_log_pool = nullptr;
   const char* m_rwl_pool_layout_name;
 
@@ -298,7 +299,7 @@ private:
   int append_op_log_entries(rwl::GenericLogOperations &ops);
   void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
   void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
-  void internal_flush(Context *on_finish);
+  void internal_flush(bool invalidate, Context *on_finish);
 };
 
 } // namespace cache