]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: wait for in flight advance sets on stopping recorder 28529/head
authorMykola Golub <mgolub@suse.com>
Thu, 13 Jun 2019 11:05:17 +0000 (12:05 +0100)
committerMykola Golub <mgolub@suse.com>
Thu, 13 Jun 2019 11:05:17 +0000 (12:05 +0100)
Before object overflow detection optimization (83461c42b) the
overflow was detected when trying to send data, so when an object
set was advanced we always had some data and flush always
completed later. But now we detect overflow on client side, it
may happen when buffer is empty and flush may complete
when "advance object set" is still in flight.

Signed-off-by: Mykola Golub <mgolub@suse.com>
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/Journaler.cc

index 6fd5d7fc189f555ad68b36eb650baffa06e0d0ca..d4b023c3d5eb23c0886c9b842db27372896591e1 100644 (file)
@@ -85,6 +85,29 @@ JournalRecorder::~JournalRecorder() {
   ceph_assert(m_in_flight_object_closes == 0);
 }
 
+void JournalRecorder::shut_down(Context *on_safe) {
+  on_safe = new FunctionContext(
+    [this, on_safe](int r) {
+      Context *ctx = nullptr;
+      {
+        Mutex::Locker locker(m_lock);
+        if (m_in_flight_advance_sets != 0) {
+          ceph_assert(m_on_object_set_advanced == nullptr);
+          m_on_object_set_advanced = new FunctionContext(
+            [on_safe, r](int) {
+              on_safe->complete(r);
+            });
+        } else {
+          ctx = on_safe;
+        }
+      }
+      if (ctx != nullptr) {
+        ctx->complete(r);
+      }
+    });
+  flush(on_safe);
+}
+
 Future JournalRecorder::append(uint64_t tag_tid,
                                const bufferlist &payload_bl) {
 
@@ -181,19 +204,26 @@ void JournalRecorder::advance_object_set() {
 }
 
 void JournalRecorder::handle_advance_object_set(int r) {
-  Mutex::Locker locker(m_lock);
-  ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
+  Context *on_object_set_advanced = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
 
-  ceph_assert(m_in_flight_advance_sets > 0);
-  --m_in_flight_advance_sets;
+    ceph_assert(m_in_flight_advance_sets > 0);
+    --m_in_flight_advance_sets;
 
-  if (r < 0 && r != -ESTALE) {
-    lderr(m_cct) << __func__ << ": failed to advance object set: "
-                 << cpp_strerror(r) << dendl;
-  }
+    if (r < 0 && r != -ESTALE) {
+      lderr(m_cct) << __func__ << ": failed to advance object set: "
+                   << cpp_strerror(r) << dendl;
+    }
 
-  if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
-    open_object_set();
+    if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
+      open_object_set();
+      std::swap(on_object_set_advanced, m_on_object_set_advanced);
+    }
+  }
+  if (on_object_set_advanced != nullptr) {
+    on_object_set_advanced->complete(0);
   }
 }
 
index 93c0e9e5bb7f64ed5612adaef26f09b359d5f685..bdeaab58a81aee8c08ab23592261afdea70db6d8 100644 (file)
@@ -27,6 +27,7 @@ public:
                   double flush_age, uint64_t max_in_flight_appends);
   ~JournalRecorder();
 
+  void shut_down(Context *on_safe);
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush(Context *on_safe);
 
@@ -96,6 +97,8 @@ private:
 
   FutureImplPtr m_prev_future;
 
+  Context *m_on_object_set_advanced = nullptr;
+
   void open_object_set();
   bool close_object_set(uint64_t active_set);
 
index 9f78643703b32dc357a7d960c8884ec9ac624675..cc3bb1d60ca9695423a3abce26d79f2c69d0730d 100644 (file)
@@ -412,7 +412,7 @@ void Journaler::stop_append(Context *on_safe) {
       delete recorder;
       on_safe->complete(r);
     });
-  recorder->flush(on_safe);
+  recorder->shut_down(on_safe);
 }
 
 uint64_t Journaler::get_max_append_size() const {