]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: support dynamically updating recorder flush options
authorJason Dillaman <dillaman@redhat.com>
Thu, 13 Jun 2019 00:06:11 +0000 (20:06 -0400)
committerJason Dillaman <dillaman@redhat.com>
Mon, 19 Aug 2019 15:15:49 +0000 (11:15 -0400)
Default to disabling writeback-style append flushes unless overridden
by a call to 'set_append_batch_options'.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit c0322a13c83f590067a120212620ebba15fc8661)

16 files changed:
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/librbd/Journal.cc
src/librbd/journal/DemoteRequest.cc
src/librbd/journal/PromoteRequest.cc
src/test/journal/mock/MockJournaler.h
src/test/journal/test_JournalRecorder.cc
src/test/journal/test_ObjectRecorder.cc
src/test/librbd/fsx.cc
src/test/librbd/journal/test_mock_PromoteRequest.cc
src/test/librbd/test_mock_Journal.cc
src/tools/rbd/action/Journal.cc

index bf795e0e6dd9d3c85ece9834fbcf252764b3af20..aa90660a01fa2500a99badeb93fe3186fbfa11d3 100644 (file)
@@ -50,12 +50,9 @@ struct C_Flush : public Context {
 JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
                                  const std::string &object_oid_prefix,
                                  const JournalMetadataPtr& journal_metadata,
-                                 uint32_t flush_interval, uint64_t flush_bytes,
-                                 double flush_age,
                                  uint64_t max_in_flight_appends)
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
-    m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+    m_journal_metadata(journal_metadata),
     m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
     m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
     m_current_set(m_journal_metadata->get_active_set()) {
@@ -66,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    m_object_locks.push_back(shared_ptr<Mutex>(
-                                          new Mutex("ObjectRecorder::m_lock::"+
-                                          std::to_string(splay_offset))));
+    shared_ptr<Mutex> object_lock(new Mutex(
+      "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
+    m_object_locks.push_back(object_lock);
+
     uint64_t object_number = splay_offset + (m_current_set * splay_width);
+    Mutex::Locker locker(*object_lock);
     m_object_ptrs[splay_offset] = create_object_recorder(
-                                                object_number,
-                                                m_object_locks[splay_offset]);
+      object_number, m_object_locks[splay_offset]);
   }
 
   m_journal_metadata->add_listener(&m_listener);
@@ -109,6 +107,27 @@ void JournalRecorder::shut_down(Context *on_safe) {
   flush(on_safe);
 }
 
+void JournalRecorder::set_append_batch_options(int flush_interval,
+                                               uint64_t flush_bytes,
+                                               double flush_age) {
+  ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+                  << "flush_bytes=" << flush_bytes << ", "
+                  << "flush_age=" << flush_age << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_flush_interval = flush_interval;
+  m_flush_bytes = flush_bytes;
+  m_flush_age = flush_age;
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+    auto object_recorder = get_object(splay_offset);
+    object_recorder->set_append_batch_options(flush_interval, flush_bytes,
+                                              flush_age);
+  }
+}
+
 Future JournalRecorder::append(uint64_t tag_tid,
                                const bufferlist &payload_bl) {
   ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
@@ -286,8 +305,10 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
     object_number, lock, m_journal_metadata->get_work_queue(),
-    &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
-    m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+    &m_object_handler, m_journal_metadata->get_order(),
+    m_max_in_flight_appends));
+  object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
+                                            m_flush_age);
   return object_recorder;
 }
 
index c27520a758954f886a8088150bace2b0f32dad1a..382f75acef9c60ad65d0fb2d9cca7353903d3884 100644 (file)
@@ -23,11 +23,14 @@ class JournalRecorder {
 public:
   JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                   const JournalMetadataPtr &journal_metadata,
-                  uint32_t flush_interval, uint64_t flush_bytes,
-                  double flush_age, uint64_t max_in_flight_appends);
+                  uint64_t max_in_flight_appends);
   ~JournalRecorder();
 
   void shut_down(Context *on_safe);
+
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
+
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush(Context *on_safe);
 
@@ -79,9 +82,9 @@ private:
 
   JournalMetadataPtr m_journal_metadata;
 
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
+  uint32_t m_flush_interval = 0;
+  uint64_t m_flush_bytes = 0;
+  double m_flush_age = 0;
   uint64_t m_max_in_flight_appends;
 
   Listener m_listener;
index cc3bb1d60ca9695423a3abce26d79f2c69d0730d..65435ae900f2f2f930f8157c38d5c4967ba25370 100644 (file)
@@ -391,15 +391,20 @@ void Journaler::committed(const Future &future) {
   m_trimmer->committed(future_impl->get_commit_tid());
 }
 
-void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
-                            double flush_age, uint64_t max_in_flight_appends) {
+void Journaler::start_append(uint64_t max_in_flight_appends) {
   ceph_assert(m_recorder == nullptr);
 
   // TODO verify active object set >= current replay object set
 
   m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
-                                  m_metadata, flush_interval, flush_bytes,
-                                  flush_age, max_in_flight_appends);
+                                  m_metadata, max_in_flight_appends);
+}
+
+void Journaler::set_append_batch_options(int flush_interval,
+                                         uint64_t flush_bytes,
+                                         double flush_age) {
+  ceph_assert(m_recorder != nullptr);
+  m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
 }
 
 void Journaler::stop_append(Context *on_safe) {
index 1424cb7576129b8d23019acc5849ac2ea048766e..5a6e0c7c18a80e16f2a036fe4bf194a9aa77d054 100644 (file)
@@ -106,8 +106,9 @@ public:
   void stop_replay(Context *on_finish);
 
   uint64_t get_max_append_size() const;
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
-                    uint64_t max_in_flight_appends);
+  void start_append(uint64_t max_in_flight_appends);
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush_append(Context *on_safe);
   void stop_append(Context *on_safe);
index 162dfe90c0899e9fc6b3ec293d6aec67053c0803..127731e95c322f93e2143cf4dde252c8f46b0980 100644 (file)
@@ -21,17 +21,13 @@ namespace journal {
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                uint64_t object_number, shared_ptr<Mutex> lock,
                                ContextWQ *work_queue, Handler *handler,
-                               uint8_t order, uint32_t flush_interval,
-                               uint64_t flush_bytes, double flush_age,
-                               int32_t max_in_flight_appends)
+                               uint8_t order, int32_t max_in_flight_appends)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
     m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
     m_order(order), m_soft_max_size(1 << m_order),
-    m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
-    m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends),
-    m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()),
-    m_append_tid(0), m_overflowed(false), m_object_closed(false),
-    m_in_flight_flushes(false) {
+    m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
+    m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
+    m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   ceph_assert(m_handler != NULL);
@@ -45,6 +41,19 @@ ObjectRecorder::~ObjectRecorder() {
   ceph_assert(m_in_flight_appends.empty());
 }
 
+void ObjectRecorder::set_append_batch_options(int flush_interval,
+                                              uint64_t flush_bytes,
+                                              double flush_age) {
+  ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+                  << "flush_bytes=" << flush_bytes << ", "
+                  << "flush_age=" << flush_age << dendl;
+
+  ceph_assert(m_lock->is_locked());
+  m_flush_interval = flush_interval;
+  m_flush_bytes = flush_bytes;
+  m_flush_age = flush_age;
+}
+
 bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
   ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
 
index d7cf6e668a97e88d2e598f897ba3242bc8347552..ff00e0a0a1f031da668e03994d6e7e7b7315db68 100644 (file)
@@ -41,10 +41,12 @@ public:
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                  uint64_t object_number, std::shared_ptr<Mutex> lock,
                  ContextWQ *work_queue, Handler *handler, uint8_t order,
-                 uint32_t flush_interval, uint64_t flush_bytes,
-                 double flush_age, int32_t max_in_flight_appends);
+                 int32_t max_in_flight_appends);
   ~ObjectRecorder() override;
 
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
+
   inline uint64_t get_object_number() const {
     return m_object_number;
   }
@@ -115,9 +117,9 @@ private:
   uint8_t m_order;
   uint64_t m_soft_max_size;
 
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
+  uint32_t m_flush_interval = 0;
+  uint64_t m_flush_bytes = 0;
+  double m_flush_age = 0;
   int32_t m_max_in_flight_appends;
 
   FlushHandler m_flush_handler;
index 45d103f43277d12c8747522c966198f85b6ae473..f1b72b0bf74d24dd023729f6685d456464b2d775 100644 (file)
@@ -1168,11 +1168,14 @@ void Journal<I>::complete_event(typename Events::iterator it, int r) {
 template <typename I>
 void Journal<I>::start_append() {
   ceph_assert(m_lock.is_locked());
+
   m_journaler->start_append(
+    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+  m_journaler->set_append_batch_options(
     m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
     m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
-    m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"),
-    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+    m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+
   transition_state(STATE_READY, 0);
 }
 
index 59aa0365a3f8d30e19aa8f9a32b55ae9a5ffc547..66a1d19e332079c2e0c6a2ceaa26d87dfc57d82c 100644 (file)
@@ -135,7 +135,7 @@ void DemoteRequest<I>::append_event() {
   bufferlist event_entry_bl;
   encode(event_entry, event_entry_bl);
 
-  m_journaler->start_append(0, 0, 0, 0);
+  m_journaler->start_append(0);
   m_future = m_journaler->append(m_tag_tid, event_entry_bl);
 
   auto ctx = create_context_callback<
index 695acc388aaad67ea374f6f64a74172eb50b4713..17f2957e4e322abc496451016b55cc7bf9b822e2 100644 (file)
@@ -119,7 +119,7 @@ void PromoteRequest<I>::append_event() {
   bufferlist event_entry_bl;
   encode(event_entry, event_entry_bl);
 
-  m_journaler->start_append(0, 0, 0, 0);
+  m_journaler->start_append(0);
   m_future = m_journaler->append(m_tag_tid, event_entry_bl);
 
   auto ctx = create_context_callback<
index b925ddfebe9e95c61fd86c79bded12b47d91df18..236a42f90f29229d34a13e99a2fbbe926fef1ab0 100644 (file)
@@ -120,7 +120,8 @@ struct MockJournaler {
   MOCK_METHOD0(stop_replay, void());
   MOCK_METHOD1(stop_replay, void(Context *on_finish));
 
-  MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t));
+  MOCK_METHOD1(start_append, void(uint64_t));
+  MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double));
   MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
   MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
                                        const bufferlist &bl));
@@ -257,11 +258,14 @@ struct MockJournalerProxy {
     MockJournaler::get_instance().stop_replay(on_finish);
   }
 
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
-                    uint64_t max_in_flight_appends) {
-    MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
-                                               flush_age,
-                                               max_in_flight_appends);
+  void start_append(uint64_t max_in_flight_appends) {
+    MockJournaler::get_instance().start_append(max_in_flight_appends);
+  }
+
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age) {
+    MockJournaler::get_instance().set_append_batch_options(
+      flush_interval, flush_bytes, flush_age);
   }
 
   uint64_t get_max_append_size() const {
index fb7c06772eccbd7afd38eac28788e1c2405e8062..7197526a1ce409cc2db20f3be98f6cd12faf1fdb 100644 (file)
@@ -22,8 +22,9 @@ public:
   journal::JournalRecorder *create_recorder(
       const std::string &oid, const journal::JournalMetadataPtr &metadata) {
     journal::JournalRecorder *recorder(new journal::JournalRecorder(
-        m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(),
-        0, 0));
+        m_ioctx, oid + ".", metadata, 0));
+    recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(),
+                                       0);
     m_recorders.push_back(recorder);
     return recorder;
   }
index 7c3e7e99d166646c9b4716146c43a28db60032f2..3cc8e893cfe0c93bd7e4cb0669d9d8334a8b139e 100644 (file)
@@ -94,8 +94,13 @@ public:
   journal::ObjectRecorderPtr create_object(const std::string &oid,
                                            uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval,
-      m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
+      m_max_in_flight_appends));
+    {
+      Mutex::Locker locker(*lock);
+      object->set_append_batch_options(m_flush_interval, m_flush_bytes,
+                                       m_flush_age);
+    }
     m_object_recorders.push_back(object);
     m_object_recorder_locks.insert(std::make_pair(oid, lock));
     m_handler.object_lock = lock;
index e0b40937d316b8ef582944dd2ad12f0529018938..24b0538e72ab0cf784c394e0d3f15f8f408cfcfb 100644 (file)
@@ -431,7 +431,7 @@ int replay_journal(rados_ioctx_t ioctx, const char *image_name,
                 return r;
         }
 
-        replay_journaler.start_append(0, 0, 0, 0);
+        replay_journaler.start_append(0);
 
         C_SaferCond replay_ctx;
         ReplayHandler replay_handler(&journaler, &replay_journaler,
index 0e61a88890d38887e4a08ee07e72eb4994005350..68a627a79a8d007fe1d191fbdaec031313aaa71d 100644 (file)
@@ -120,7 +120,7 @@ public:
   }
 
   void expect_start_append(::journal::MockJournaler &mock_journaler) {
-    EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+    EXPECT_CALL(mock_journaler, start_append(_));
   }
 
   void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
index 12f445d6c88bcec8dd6e24a3e7d8caf77ba3f6fa..144aa120066c5e1022163928798f841ec431fc1d 100644 (file)
@@ -396,7 +396,11 @@ public:
   }
 
   void expect_start_append(::journal::MockJournaler &mock_journaler) {
-    EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+    EXPECT_CALL(mock_journaler, start_append(_));
+  }
+
+  void expect_set_append_batch_options(::journal::MockJournaler &mock_journaler) {
+    EXPECT_CALL(mock_journaler, set_append_batch_options(_, _, _));
   }
 
   void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
@@ -518,6 +522,7 @@ public:
     expect_committed(mock_journaler, 0);
     expect_flush_commit_position(mock_journaler);
     expect_start_append(mock_journaler);
+    expect_set_append_batch_options(mock_journaler);
     ASSERT_EQ(0, when_open(mock_journal));
   }
 
@@ -585,6 +590,7 @@ TEST_F(TestMockJournal, StateTransitions) {
   expect_flush_commit_position(mock_journaler);
 
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   ASSERT_EQ(0, when_open(mock_journal));
 
@@ -662,6 +668,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
@@ -719,6 +726,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
@@ -773,6 +781,7 @@ TEST_F(TestMockJournal, CorruptEntry) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, -EINVAL);
@@ -811,6 +820,7 @@ TEST_F(TestMockJournal, StopError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, -EINVAL);
@@ -876,6 +886,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   C_SaferCond ctx;
   mock_journal.open(&ctx);
@@ -958,6 +969,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   C_SaferCond ctx;
   mock_journal.open(&ctx);
@@ -1272,6 +1284,7 @@ TEST_F(TestMockJournal, ExternalReplay) {
   InSequence seq;
   expect_stop_append(mock_journaler, 0);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
@@ -1303,6 +1316,7 @@ TEST_F(TestMockJournal, ExternalReplayFailure) {
   InSequence seq;
   expect_stop_append(mock_journaler, -EINVAL);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
index e36c6a6a5073ae7c232fde9f58ef5620e7ae8808..d3a54f94f84891fada7fd67e887f3be98fa16e5e 100644 (file)
@@ -832,7 +832,7 @@ public:
     if (r < 0) {
       return r;
     }
-    m_journaler.start_append(0, 0, 0, 0);
+    m_journaler.start_append(0);
 
     int r1 = 0;
     bufferlist bl;