]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: add internal flush
authorYuan Lu <yuan.y.lu@intel.com>
Mon, 17 Feb 2020 09:35:35 +0000 (17:35 +0800)
committerYuan Lu <yuan.y.lu@intel.com>
Tue, 31 Mar 2020 03:06:51 +0000 (11:06 +0800)
Signed-off-by: Peterson, Scott <scott.d.peterson@intel.com>
Signed-off-by: Li, Xiaoyan <xiaoyan.li@intel.com>
Signed-off-by: Lu, Yuan <yuan.y.lu@intel.com>
Signed-off-by: Chamarthy, Mahati <mahati.chamarthy@intel.com>
src/librbd/cache/ReplicatedWriteLog.cc
src/librbd/cache/ReplicatedWriteLog.h
src/librbd/cache/rwl/Types.cc
src/librbd/cache/rwl/Types.h

index ca0fd2751f3de2f38e0298b653857d80502def5a..e12579d8d968f5f9b579395d07fa1ff1e5f411b9 100644 (file)
@@ -479,7 +479,7 @@ void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
                               << m_log_pool_name << dendl;
     if (remove(m_log_pool_name.c_str()) != 0) {
       lderr(m_image_ctx.cct) << "failed to remove empty pool \""
-                            << m_log_pool_name << "\": "
+                             << m_log_pool_name << "\": "
                              << pmemobj_errormsg() << dendl;
     }
   }
@@ -525,7 +525,7 @@ void ReplicatedWriteLog<I>::aio_write(Extents &&image_extents,
 template <typename I>
 void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
                                         uint32_t discard_granularity_bytes,
-                                       Context *on_finish) {
+                                        Context *on_finish) {
 }
 
 template <typename I>
@@ -549,6 +549,7 @@ void ReplicatedWriteLog<I>::aio_compare_and_write(Extents &&image_extents,
 
 template <typename I>
 void ReplicatedWriteLog<I>::flush(Context *on_finish) {
+  internal_flush(on_finish);
 }
 
 template <typename I>
@@ -613,8 +614,15 @@ template <typename I>
 void ReplicatedWriteLog<I>::detain_guarded_request(
   C_BlockIORequestT *request, GuardedRequestFunctionContext *guarded_ctx)
 {
-  //TODO: add is_barrier for flush request in later PRs
-  auto req = GuardedRequest(request->image_extents_summary.block_extent(), guarded_ctx);
+  BlockExtent extent;
+  bool is_barrier = false;
+  if (request) {
+    extent = request->image_extents_summary.block_extent();
+  } else {
+    extent = block_extent(whole_volume_extent());
+    is_barrier = true;
+  }
+  auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
   BlockGuardCell *cell = nullptr;
 
   ldout(m_image_ctx.cct, 20) << dendl;
@@ -710,7 +718,7 @@ void ReplicatedWriteLog<I>::append_scheduled_ops(void)
         ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
         ops_remain = true; /* Always check again before leaving */
         ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
-                                  << m_ops_to_append.size() << " remain" << dendl;
+                                   << m_ops_to_append.size() << " remain" << dendl;
       } else {
         ops_remain = false;
         if (appending) {
@@ -817,7 +825,7 @@ void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
         ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
         ops_remain = !m_ops_to_flush.empty();
         ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
-                                  << m_ops_to_flush.size() << " remain" << dendl;
+                                   << m_ops_to_flush.size() << " remain" << dendl;
       } else {
         ops_remain = false;
       }
@@ -946,9 +954,9 @@ void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops
 
   ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
                              << "start address="
-                            << ops.front()->get_log_entry()->pmem_entry << " "
+                             << ops.front()->get_log_entry()->pmem_entry << " "
                              << "bytes="
-                            << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
+                             << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
                              << dendl;
   pmemobj_flush(m_log_pool,
                 ops.front()->get_log_entry()->pmem_entry,
@@ -1004,7 +1012,7 @@ int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
     ldout(m_image_ctx.cct, 20) << "APPENDING: index="
                                << operation->get_log_entry()->log_entry_index << " "
                                << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
-                              << "]" << dendl;
+                               << "]" << dendl;
     entries_to_flush.push_back(operation);
   }
   flush_op_log_entries(entries_to_flush);
@@ -1068,10 +1076,10 @@ void ReplicatedWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
     }
     op->complete(result);
     m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_app_t,
-                       op->log_append_time - op->dispatch_time);
+                        op->log_append_time - op->dispatch_time);
     m_perfcounter->tinc(l_librbd_rwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
     m_perfcounter->hinc(l_librbd_rwl_log_op_dis_to_cmp_t_hist,
-                       utime_t(now - op->dispatch_time).to_nsec(),
+                        utime_t(now - op->dispatch_time).to_nsec(),
                         log_entry->ram_entry.write_bytes);
     utime_t app_lat = op->log_append_comp_time - op->log_append_time;
     m_perfcounter->tinc(l_librbd_rwl_log_op_app_to_appc_t, app_lat);
@@ -1263,7 +1271,7 @@ bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
       if (!req->has_io_waited_for_buffers()) {
         req->set_io_waited_for_entries(true);
         ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
-                                 << m_bytes_allocated_cap
+                                  << m_bytes_allocated_cap
                                   << ", allocated=" << m_bytes_allocated
                                   << ") in write [" << *req << "]" << dendl;
       }
@@ -1424,8 +1432,6 @@ bool ReplicatedWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log
   ldout(cct, 20) << "" << dendl;
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
 
-  //TODO handle invalidate
-
   /* For OWB we can flush entries with the same sync gen number (write between
    * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
    * its sync gen number is <= the lowest sync gen number carried by all the
@@ -1553,11 +1559,19 @@ void ReplicatedWriteLog<I>::process_writeback_dirty_entries() {
   }
 
   if (all_clean) {
-    // TODO: all flusing complete
+    /* All flushing complete, drain outside lock */
+    Contexts flush_contexts;
+    {
+      std::lock_guard locker(m_lock);
+      flush_contexts.swap(m_flush_complete_contexts);
+    }
+    finish_contexts(m_image_ctx.cct, flush_contexts, 0);
   }
 }
 
-/* Update/persist the last flushed sync point in the log */
+/**
+ * Update/persist the last flushed sync point in the log
+ */
 template <typename I>
 void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
 {
@@ -1638,7 +1652,7 @@ void ReplicatedWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointL
 }
 
 /* Make a new sync point and flush the previous during initialization, when there may or may
-* not be a previous sync point */
+ * not be a previous sync point */
 template <typename I>
 void ReplicatedWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
@@ -1653,8 +1667,8 @@ void ReplicatedWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
 }
 
 /**
-* Begin a new sync point
-*/
+ * Begin a new sync point
+ */
 template <typename I>
 void ReplicatedWriteLog<I>::new_sync_point(DeferredContexts &later) {
   CephContext *cct = m_image_ctx.cct;
@@ -1698,7 +1712,8 @@ void ReplicatedWriteLog<I>::new_sync_point(DeferredContexts &later) {
 }
 
 template <typename I>
-void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, DeferredContexts &later) {
+void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
+                                                 DeferredContexts &later) {
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
 
   if (!flush_req) {
@@ -1745,6 +1760,146 @@ void ReplicatedWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req, Def
   to_append->add_in_on_persisted_ctxs(flush_req);
 }
 
+template <typename I>
+void ReplicatedWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
+                                                           DeferredContexts &later) {
+  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+
+  /* If there have been writes since the last sync point ... */
+  if (m_current_sync_point->log_entry->writes) {
+    flush_new_sync_point(flush_req, later);
+  } else {
+    /* There have been no writes to the current sync point. */
+    if (m_current_sync_point->earlier_sync_point) {
+      /* If previous sync point hasn't completed, complete this flush
+       * with the earlier sync point. No alloc or dispatch needed. */
+      m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
+    } else {
+      /* The previous sync point has already completed and been
+       * appended. The current sync point has no writes, so this flush
+       * has nothing to wait for. This flush completes now. */
+      later.add(flush_req);
+    }
+  }
+}
+
+/*
+ * RWL internal flush - will actually flush the RWL.
+ *
+ * User flushes should arrive at aio_flush(), and only flush prior
+ * writes to all log replicas.
+ *
+ * Librbd internal flushes will arrive at flush(invalidate=false,
+ * discard=false), and traverse the block guard to ensure in-flight writes are
+ * flushed.
+ */
+template <typename I>
+void ReplicatedWriteLog<I>::flush_dirty_entries(Context *on_finish) {
+  CephContext *cct = m_image_ctx.cct;
+  bool all_clean;
+  bool flushing;
+  bool stop_flushing;
+
+  {
+    std::lock_guard locker(m_lock);
+    flushing = (0 != m_flush_ops_in_flight);
+    all_clean = m_dirty_log_entries.empty();
+    stop_flushing = (m_shutting_down);
+  }
+
+  if (!flushing && (all_clean || stop_flushing)) {
+    /* Complete without holding m_lock */
+    if (all_clean) {
+      ldout(cct, 20) << "no dirty entries" << dendl;
+    } else {
+      ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
+    }
+    on_finish->complete(0);
+  } else {
+    if (all_clean) {
+      ldout(cct, 5) << "flush ops still in progress" << dendl;
+    } else {
+      ldout(cct, 20) << "dirty entries remain" << dendl;
+    }
+    std::lock_guard locker(m_lock);
+    /* on_finish can't be completed yet */
+    m_flush_complete_contexts.push_back(new LambdaContext(
+      [this, on_finish](int r) {
+        flush_dirty_entries(on_finish);
+      }));
+    wake_up();
+  }
+}
+
+template <typename I>
+void ReplicatedWriteLog<I>::internal_flush(Context *on_finish) {
+
+  /* May be called even if initialization fails */
+  if (!m_initialized) {
+    ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
+    /* Deadlock if completed here */
+    m_image_ctx.op_work_queue->queue(on_finish, 0);
+    return;
+  }
+
+  /* Flush/invalidate must pass through block guard to ensure all layers of
+   * cache are consistently flush/invalidated. This ensures no in-flight write leaves
+   * some layers with valid regions, which may later produce inconsistent read
+   * results. */
+  GuardedRequestFunctionContext *guarded_ctx =
+    new GuardedRequestFunctionContext(
+      [this, on_finish](GuardedRequestFunctionContext &guard_ctx) {
+        DeferredContexts on_exit;
+        ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
+        ceph_assert(guard_ctx.cell);
+
+        Context *ctx = new LambdaContext(
+          [this, cell=guard_ctx.cell, on_finish](int r) {
+            std::lock_guard locker(m_lock);
+            ldout(m_image_ctx.cct, 6) << "Done flush" << dendl;
+            if (m_log_entries.size()) {
+              ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
+                                        << m_log_entries.size() << ", "
+                                        << "front()=" << *m_log_entries.front()
+                                        << dendl;
+            }
+            ceph_assert(m_dirty_log_entries.size() == 0);
+            m_image_ctx.op_work_queue->queue(on_finish, r);
+            release_guarded_request(cell);
+            });
+        ctx = new LambdaContext(
+          [this, ctx](int r) {
+            Context *next_ctx = ctx;
+            if (r < 0) {
+              /* Override on_finish status with this error */
+              next_ctx = new LambdaContext([r, ctx](int _r) {
+                ctx->complete(r);
+              });
+            }
+            {
+              std::lock_guard locker(m_lock);
+              ceph_assert(m_dirty_log_entries.size() == 0);
+            }
+            m_image_writeback.aio_flush(next_ctx);
+          });
+        ctx = new LambdaContext(
+          [this, ctx](int r) {
+            flush_dirty_entries(ctx);
+          });
+        std::lock_guard locker(m_lock);
+        /* Even if we're throwing everything away, but we want the last entry to
+         * be a sync point so we can cleanly resume.
+         *
+         * Also, the blockguard only guarantees the replication of this op
+         * can't overlap with prior ops. It doesn't guarantee those are all
+         * completed and eligible for flush & retire, which we require here.
+         */
+        auto flush_req = make_flush_req(ctx);
+        flush_new_sync_point_if_needed(flush_req, on_exit);
+      });
+  detain_guarded_request(nullptr, guarded_ctx);
+}
+
 } // namespace cache
 } // namespace librbd
 
index 482d2f9af518375dbeb2df46e07ac08e2113cf5f..1f510286919217d958063eb6163c121f32c33f58 100644 (file)
@@ -214,6 +214,8 @@ private:
   bool m_appending = false;
   bool m_dispatching_deferred_ops = false;
 
+  Contexts m_flush_complete_contexts;
+
   rwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
   rwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
 
@@ -256,6 +258,7 @@ private:
   void wake_up();
   void process_work();
 
+  void flush_dirty_entries(Context *on_finish);
   bool can_flush_entry(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
   Context *construct_flush_entry_ctx(const std::shared_ptr<rwl::GenericLogEntry> log_entry);
   void persist_last_flushed_sync_gen();
@@ -266,6 +269,7 @@ private:
   void init_flush_new_sync_point(rwl::DeferredContexts &later);
   void new_sync_point(rwl::DeferredContexts &later);
   rwl::C_FlushRequest<ReplicatedWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
+  void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, rwl::DeferredContexts &later);
 
   void dispatch_deferred_writes(void);
   void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
@@ -279,6 +283,7 @@ private:
   int append_op_log_entries(rwl::GenericLogOperations &ops);
   void complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
   void schedule_complete_op_log_entries(rwl::GenericLogOperations &&ops, const int r);
+  void internal_flush(Context *on_finish);
 };
 
 } // namespace cache
index 67188e0536d9515df809d2429f8c7c854a7d7311..e6f10de2114a24efcefef775fa5dc3bba60b10a0 100644 (file)
@@ -107,6 +107,10 @@ io::Extent whole_volume_extent() {
   return io::Extent({0, std::numeric_limits<uint64_t>::max()});
 }
 
+BlockExtent block_extent(const io::Extent& image_extent) {
+  return convert_to_block_extent(image_extent.first, image_extent.second);
+}
+
 } // namespace rwl
 } // namespace cache
 } // namespace librbd
index bd2ece22e21870bba37b5667b6a981b36c8a2e43..768902044fa78e2f6ae882eec80bd4a9223cbe08 100644 (file)
@@ -261,6 +261,8 @@ public:
 
 io::Extent whole_volume_extent();
 
+BlockExtent block_extent(const io::Extent& image_extent);
+
 } // namespace rwl
 } // namespace cache
 } // namespace librbd