]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: support dynamically updating recorder flush options
authorJason Dillaman <dillaman@redhat.com>
Thu, 13 Jun 2019 00:06:11 +0000 (20:06 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 19 Jun 2019 14:38:51 +0000 (10:38 -0400)
Default to disabling writeback-style append flushes unless overridden
by a call to 'set_append_batch_options'.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
16 files changed:
src/journal/JournalRecorder.cc
src/journal/JournalRecorder.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/librbd/Journal.cc
src/librbd/journal/DemoteRequest.cc
src/librbd/journal/PromoteRequest.cc
src/test/journal/mock/MockJournaler.h
src/test/journal/test_JournalRecorder.cc
src/test/journal/test_ObjectRecorder.cc
src/test/librbd/fsx.cc
src/test/librbd/journal/test_mock_PromoteRequest.cc
src/test/librbd/test_mock_Journal.cc
src/tools/rbd/action/Journal.cc

index bf795e0e6dd9d3c85ece9834fbcf252764b3af20..aa90660a01fa2500a99badeb93fe3186fbfa11d3 100644 (file)
@@ -50,12 +50,9 @@ struct C_Flush : public Context {
 JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
                                  const std::string &object_oid_prefix,
                                  const JournalMetadataPtr& journal_metadata,
-                                 uint32_t flush_interval, uint64_t flush_bytes,
-                                 double flush_age,
                                  uint64_t max_in_flight_appends)
   : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
-    m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
-    m_flush_bytes(flush_bytes), m_flush_age(flush_age),
+    m_journal_metadata(journal_metadata),
     m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
     m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
     m_current_set(m_journal_metadata->get_active_set()) {
@@ -66,13 +63,14 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
 
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
-    m_object_locks.push_back(shared_ptr<Mutex>(
-                                          new Mutex("ObjectRecorder::m_lock::"+
-                                          std::to_string(splay_offset))));
+    shared_ptr<Mutex> object_lock(new Mutex(
+      "ObjectRecorder::m_lock::" + std::to_string(splay_offset)));
+    m_object_locks.push_back(object_lock);
+
     uint64_t object_number = splay_offset + (m_current_set * splay_width);
+    Mutex::Locker locker(*object_lock);
     m_object_ptrs[splay_offset] = create_object_recorder(
-                                                object_number,
-                                                m_object_locks[splay_offset]);
+      object_number, m_object_locks[splay_offset]);
   }
 
   m_journal_metadata->add_listener(&m_listener);
@@ -109,6 +107,27 @@ void JournalRecorder::shut_down(Context *on_safe) {
   flush(on_safe);
 }
 
+void JournalRecorder::set_append_batch_options(int flush_interval,
+                                               uint64_t flush_bytes,
+                                               double flush_age) {
+  ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+                  << "flush_bytes=" << flush_bytes << ", "
+                  << "flush_age=" << flush_age << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_flush_interval = flush_interval;
+  m_flush_bytes = flush_bytes;
+  m_flush_age = flush_age;
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    Mutex::Locker object_locker(*m_object_locks[splay_offset]);
+    auto object_recorder = get_object(splay_offset);
+    object_recorder->set_append_batch_options(flush_interval, flush_bytes,
+                                              flush_age);
+  }
+}
+
 Future JournalRecorder::append(uint64_t tag_tid,
                                const bufferlist &payload_bl) {
   ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
@@ -286,8 +305,10 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder(
   ObjectRecorderPtr object_recorder(new ObjectRecorder(
     m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
     object_number, lock, m_journal_metadata->get_work_queue(),
-    &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
-    m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+    &m_object_handler, m_journal_metadata->get_order(),
+    m_max_in_flight_appends));
+  object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
+                                            m_flush_age);
   return object_recorder;
 }
 
index c27520a758954f886a8088150bace2b0f32dad1a..382f75acef9c60ad65d0fb2d9cca7353903d3884 100644 (file)
@@ -23,11 +23,14 @@ class JournalRecorder {
 public:
   JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                   const JournalMetadataPtr &journal_metadata,
-                  uint32_t flush_interval, uint64_t flush_bytes,
-                  double flush_age, uint64_t max_in_flight_appends);
+                  uint64_t max_in_flight_appends);
   ~JournalRecorder();
 
   void shut_down(Context *on_safe);
+
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
+
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush(Context *on_safe);
 
@@ -79,9 +82,9 @@ private:
 
   JournalMetadataPtr m_journal_metadata;
 
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
+  uint32_t m_flush_interval = 0;
+  uint64_t m_flush_bytes = 0;
+  double m_flush_age = 0;
   uint64_t m_max_in_flight_appends;
 
   Listener m_listener;
index 34bd30e290fa1fde279c8f2d51dad30e2d0320af..7d9c1409584335bc4e306b783f95d3a65ddb7af7 100644 (file)
@@ -393,15 +393,20 @@ void Journaler::committed(const Future &future) {
   m_trimmer->committed(future_impl->get_commit_tid());
 }
 
-void Journaler::start_append(int flush_interval, uint64_t flush_bytes,
-                            double flush_age, uint64_t max_in_flight_appends) {
+void Journaler::start_append(uint64_t max_in_flight_appends) {
   ceph_assert(m_recorder == nullptr);
 
   // TODO verify active object set >= current replay object set
 
   m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
-                                  m_metadata, flush_interval, flush_bytes,
-                                  flush_age, max_in_flight_appends);
+                                  m_metadata, max_in_flight_appends);
+}
+
+void Journaler::set_append_batch_options(int flush_interval,
+                                         uint64_t flush_bytes,
+                                         double flush_age) {
+  ceph_assert(m_recorder != nullptr);
+  m_recorder->set_append_batch_options(flush_interval, flush_bytes, flush_age);
 }
 
 void Journaler::stop_append(Context *on_safe) {
index 3f3987105e1870183d9c465c77a2f2c7b97f84fe..a063a6d43a018c555de8d05dab53ea4a9b220171 100644 (file)
@@ -110,8 +110,9 @@ public:
   void stop_replay(Context *on_finish);
 
   uint64_t get_max_append_size() const;
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
-                    uint64_t max_in_flight_appends);
+  void start_append(uint64_t max_in_flight_appends);
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
   Future append(uint64_t tag_tid, const bufferlist &bl);
   void flush_append(Context *on_safe);
   void stop_append(Context *on_safe);
index 162dfe90c0899e9fc6b3ec293d6aec67053c0803..127731e95c322f93e2143cf4dde252c8f46b0980 100644 (file)
@@ -21,17 +21,13 @@ namespace journal {
 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                                uint64_t object_number, shared_ptr<Mutex> lock,
                                ContextWQ *work_queue, Handler *handler,
-                               uint8_t order, uint32_t flush_interval,
-                               uint64_t flush_bytes, double flush_age,
-                               int32_t max_in_flight_appends)
+                               uint8_t order, int32_t max_in_flight_appends)
   : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
     m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
     m_order(order), m_soft_max_size(1 << m_order),
-    m_flush_interval(flush_interval), m_flush_bytes(flush_bytes),
-    m_flush_age(flush_age), m_max_in_flight_appends(max_in_flight_appends),
-    m_flush_handler(this), m_lock(lock), m_last_flush_time(ceph_clock_now()),
-    m_append_tid(0), m_overflowed(false), m_object_closed(false),
-    m_in_flight_flushes(false) {
+    m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
+    m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
+    m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   ceph_assert(m_handler != NULL);
@@ -45,6 +41,19 @@ ObjectRecorder::~ObjectRecorder() {
   ceph_assert(m_in_flight_appends.empty());
 }
 
+void ObjectRecorder::set_append_batch_options(int flush_interval,
+                                              uint64_t flush_bytes,
+                                              double flush_age) {
+  ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
+                  << "flush_bytes=" << flush_bytes << ", "
+                  << "flush_age=" << flush_age << dendl;
+
+  ceph_assert(m_lock->is_locked());
+  m_flush_interval = flush_interval;
+  m_flush_bytes = flush_bytes;
+  m_flush_age = flush_age;
+}
+
 bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
   ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
 
index d7cf6e668a97e88d2e598f897ba3242bc8347552..ff00e0a0a1f031da668e03994d6e7e7b7315db68 100644 (file)
@@ -41,10 +41,12 @@ public:
   ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
                  uint64_t object_number, std::shared_ptr<Mutex> lock,
                  ContextWQ *work_queue, Handler *handler, uint8_t order,
-                 uint32_t flush_interval, uint64_t flush_bytes,
-                 double flush_age, int32_t max_in_flight_appends);
+                 int32_t max_in_flight_appends);
   ~ObjectRecorder() override;
 
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age);
+
   inline uint64_t get_object_number() const {
     return m_object_number;
   }
@@ -115,9 +117,9 @@ private:
   uint8_t m_order;
   uint64_t m_soft_max_size;
 
-  uint32_t m_flush_interval;
-  uint64_t m_flush_bytes;
-  double m_flush_age;
+  uint32_t m_flush_interval = 0;
+  uint64_t m_flush_bytes = 0;
+  double m_flush_age = 0;
   int32_t m_max_in_flight_appends;
 
   FlushHandler m_flush_handler;
index 5a648282fe997df2ab032b4e3d62c512a80a6637..11b2f23fa0fc997e90ac191dccf862942ec28d20 100644 (file)
@@ -1169,11 +1169,14 @@ void Journal<I>::complete_event(typename Events::iterator it, int r) {
 template <typename I>
 void Journal<I>::start_append() {
   ceph_assert(m_lock.is_locked());
+
   m_journaler->start_append(
+    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+  m_journaler->set_append_batch_options(
     m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
     m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
-    m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"),
-    m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+    m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+
   transition_state(STATE_READY, 0);
 }
 
index 5b34942684e3903470960afb0dbf552443a973d7..7656caac80909fc03c418799f9180b7ea4647f9e 100644 (file)
@@ -135,7 +135,7 @@ void DemoteRequest<I>::append_event() {
   bufferlist event_entry_bl;
   encode(event_entry, event_entry_bl);
 
-  m_journaler->start_append(0, 0, 0, 0);
+  m_journaler->start_append(0);
   m_future = m_journaler->append(m_tag_tid, event_entry_bl);
 
   auto ctx = create_context_callback<
index 30d9f3bb159f74c3235a44ff100d36dc324976d6..22dc83a32d860ddb28c2212f9646813b5f535ab5 100644 (file)
@@ -119,7 +119,7 @@ void PromoteRequest<I>::append_event() {
   bufferlist event_entry_bl;
   encode(event_entry, event_entry_bl);
 
-  m_journaler->start_append(0, 0, 0, 0);
+  m_journaler->start_append(0);
   m_future = m_journaler->append(m_tag_tid, event_entry_bl);
 
   auto ctx = create_context_callback<
index 787d197dd934742f649192db4acf8c21cd03f821..ab424cd6b092dfd82dcfab49153ec3486b02e75f 100644 (file)
@@ -120,7 +120,8 @@ struct MockJournaler {
   MOCK_METHOD0(stop_replay, void());
   MOCK_METHOD1(stop_replay, void(Context *on_finish));
 
-  MOCK_METHOD4(start_append, void(int, uint64_t, double, uint64_t));
+  MOCK_METHOD1(start_append, void(uint64_t));
+  MOCK_METHOD3(set_append_batch_options, void(int, uint64_t, double));
   MOCK_CONST_METHOD0(get_max_append_size, uint64_t());
   MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
                                        const bufferlist &bl));
@@ -259,11 +260,14 @@ struct MockJournalerProxy {
     MockJournaler::get_instance().stop_replay(on_finish);
   }
 
-  void start_append(int flush_interval, uint64_t flush_bytes, double flush_age,
-                    uint64_t max_in_flight_appends) {
-    MockJournaler::get_instance().start_append(flush_interval, flush_bytes,
-                                               flush_age,
-                                               max_in_flight_appends);
+  void start_append(uint64_t max_in_flight_appends) {
+    MockJournaler::get_instance().start_append(max_in_flight_appends);
+  }
+
+  void set_append_batch_options(int flush_interval, uint64_t flush_bytes,
+                                double flush_age) {
+    MockJournaler::get_instance().set_append_batch_options(
+      flush_interval, flush_bytes, flush_age);
   }
 
   uint64_t get_max_append_size() const {
index fb7c06772eccbd7afd38eac28788e1c2405e8062..7197526a1ce409cc2db20f3be98f6cd12faf1fdb 100644 (file)
@@ -22,8 +22,9 @@ public:
   journal::JournalRecorder *create_recorder(
       const std::string &oid, const journal::JournalMetadataPtr &metadata) {
     journal::JournalRecorder *recorder(new journal::JournalRecorder(
-        m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(),
-        0, 0));
+        m_ioctx, oid + ".", metadata, 0));
+    recorder->set_append_batch_options(0, std::numeric_limits<uint32_t>::max(),
+                                       0);
     m_recorders.push_back(recorder);
     return recorder;
   }
index 7c3e7e99d166646c9b4716146c43a28db60032f2..3cc8e893cfe0c93bd7e4cb0669d9d8334a8b139e 100644 (file)
@@ -94,8 +94,13 @@ public:
   journal::ObjectRecorderPtr create_object(const std::string &oid,
                                            uint8_t order, shared_ptr<Mutex> lock) {
     journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
-      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, m_flush_interval,
-      m_flush_bytes, m_flush_age, m_max_in_flight_appends));
+      m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order,
+      m_max_in_flight_appends));
+    {
+      Mutex::Locker locker(*lock);
+      object->set_append_batch_options(m_flush_interval, m_flush_bytes,
+                                       m_flush_age);
+    }
     m_object_recorders.push_back(object);
     m_object_recorder_locks.insert(std::make_pair(oid, lock));
     m_handler.object_lock = lock;
index 1e96b502798f4baeee27ea466d7c3f3a3b46ab21..25766b5002d37b6bdcdbf5f818cdbcc3d0313447 100644 (file)
@@ -435,7 +435,7 @@ int replay_journal(rados_ioctx_t ioctx, const char *image_name,
                 return r;
         }
 
-        replay_journaler.start_append(0, 0, 0, 0);
+        replay_journaler.start_append(0);
 
         C_SaferCond replay_ctx;
         ReplayHandler replay_handler(&journaler, &replay_journaler,
index 0e61a88890d38887e4a08ee07e72eb4994005350..68a627a79a8d007fe1d191fbdaec031313aaa71d 100644 (file)
@@ -120,7 +120,7 @@ public:
   }
 
   void expect_start_append(::journal::MockJournaler &mock_journaler) {
-    EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+    EXPECT_CALL(mock_journaler, start_append(_));
   }
 
   void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
index 727d0db838193bbc581fe21b41ac9a098bbaa0ec..00367e2214e89bafe60887c8de2daf4e37218897 100644 (file)
@@ -396,7 +396,11 @@ public:
   }
 
   void expect_start_append(::journal::MockJournaler &mock_journaler) {
-    EXPECT_CALL(mock_journaler, start_append(_, _, _, _));
+    EXPECT_CALL(mock_journaler, start_append(_));
+  }
+
+  void expect_set_append_batch_options(::journal::MockJournaler &mock_journaler) {
+    EXPECT_CALL(mock_journaler, set_append_batch_options(_, _, _));
   }
 
   void expect_stop_append(::journal::MockJournaler &mock_journaler, int r) {
@@ -518,6 +522,7 @@ public:
     expect_committed(mock_journaler, 0);
     expect_flush_commit_position(mock_journaler);
     expect_start_append(mock_journaler);
+    expect_set_append_batch_options(mock_journaler);
     ASSERT_EQ(0, when_open(mock_journal));
   }
 
@@ -585,6 +590,7 @@ TEST_F(TestMockJournal, StateTransitions) {
   expect_flush_commit_position(mock_journaler);
 
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   ASSERT_EQ(0, when_open(mock_journal));
 
@@ -662,6 +668,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
@@ -719,6 +726,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, 0);
@@ -773,6 +781,7 @@ TEST_F(TestMockJournal, CorruptEntry) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, -EINVAL);
@@ -811,6 +820,7 @@ TEST_F(TestMockJournal, StopError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   ASSERT_EQ(0, when_open(mock_journal));
 
   expect_stop_append(mock_journaler, -EINVAL);
@@ -876,6 +886,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   C_SaferCond ctx;
   mock_journal.open(&ctx);
@@ -958,6 +969,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   expect_shut_down_replay(mock_image_ctx, mock_journal_replay, 0);
   expect_flush_commit_position(mock_journaler);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
 
   C_SaferCond ctx;
   mock_journal.open(&ctx);
@@ -1272,6 +1284,7 @@ TEST_F(TestMockJournal, ExternalReplay) {
   InSequence seq;
   expect_stop_append(mock_journaler, 0);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
@@ -1303,6 +1316,7 @@ TEST_F(TestMockJournal, ExternalReplayFailure) {
   InSequence seq;
   expect_stop_append(mock_journaler, -EINVAL);
   expect_start_append(mock_journaler);
+  expect_set_append_batch_options(mock_journaler);
   expect_shut_down_journaler(mock_journaler);
 
   C_SaferCond start_ctx;
index 76f76b1c5ebc7d937080ea2a6d698001f6522a48..db506207c4b0c763e81b1126f9b3574aae38fc57 100644 (file)
@@ -832,7 +832,7 @@ public:
     if (r < 0) {
       return r;
     }
-    m_journaler.start_append(0, 0, 0, 0);
+    m_journaler.start_append(0);
 
     int r1 = 0;
     bufferlist bl;