]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: split large AioWrite journal events 9048/head
authorJason Dillaman <dillaman@redhat.com>
Fri, 6 May 2016 04:08:38 +0000 (00:08 -0400)
committerJason Dillaman <dillaman@redhat.com>
Tue, 10 May 2016 17:29:40 +0000 (13:29 -0400)
Ensure that AioWrite journal events fit within a single
journal object.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 70b9fbf2745c795fe345f0298a2f30af80a12985)

src/librbd/AioImageRequest.cc
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/journal/Types.h
src/test/librbd/journal/test_Entries.cc
src/test/librbd/test_mock_Journal.cc

index e7cfd393167ea929120a09e5ee804b73a250344b..5a1904b8cb30c2c2b30ced365f1cc34a40fb5894 100644 (file)
@@ -318,11 +318,9 @@ uint64_t AioImageWrite::append_journal_event(
   bufferlist bl;
   bl.append(m_buf, m_len);
 
-  journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
-  uint64_t tid = m_image_ctx.journal->append_io_event(m_aio_comp,
-                                                      std::move(event_entry),
-                                                      requests, m_off, m_len,
-                                                      synchronous);
+  uint64_t tid = m_image_ctx.journal->append_write_event(m_aio_comp, m_off,
+                                                         m_len, bl, requests,
+                                                         synchronous);
   if (m_image_ctx.object_cacher == NULL) {
     m_aio_comp->associate_journal_event(tid);
   }
index 265a869d36f8a6e941dc93b8e3aad1ebc9003930..8b6874739953ed474bb8938bb7306ed8bb499dcf 100644 (file)
@@ -794,6 +794,42 @@ void Journal<I>::flush_commit_position(Context *on_finish) {
   m_journaler->flush_commit_position(on_finish);
 }
 
+template <typename I>
+uint64_t Journal<I>::append_write_event(AioCompletion *aio_comp,
+                                        uint64_t offset, size_t length,
+                                        const bufferlist &bl,
+                                        const AioObjectRequests &requests,
+                                        bool flush_entry) {
+  assert(m_image_ctx.owner_lock.is_locked());
+
+  assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
+  uint64_t max_write_data_size =
+    m_max_append_size - journal::AioWriteEvent::get_fixed_size();
+
+  // ensure that the write event fits within the journal entry
+  Bufferlists bufferlists;
+  uint64_t bytes_remaining = length;
+  uint64_t event_offset = 0;
+  do {
+    uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
+
+    bufferlist event_bl;
+    event_bl.substr_of(bl, event_offset, event_length);
+    journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
+                                                           event_length,
+                                                           event_bl));
+
+    bufferlists.emplace_back();
+    ::encode(event_entry, bufferlists.back());
+
+    event_offset += event_length;
+    bytes_remaining -= event_length;
+  } while (bytes_remaining > 0);
+
+  return append_io_events(aio_comp, journal::EVENT_TYPE_AIO_WRITE, bufferlists,
+                          requests, offset, length, flush_entry);
+}
+
 template <typename I>
 uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
                                      journal::EventEntry &&event_entry,
@@ -804,8 +840,21 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
 
   bufferlist bl;
   ::encode(event_entry, bl);
+  return append_io_events(aio_comp, event_entry.get_event_type(), {bl},
+                          requests, offset, length, flush_entry);
+}
 
-  Future future;
+template <typename I>
+uint64_t Journal<I>::append_io_events(AioCompletion *aio_comp,
+                                      journal::EventType event_type,
+                                      const Bufferlists &bufferlists,
+                                      const AioObjectRequests &requests,
+                                      uint64_t offset, size_t length,
+                                      bool flush_entry) {
+  assert(m_image_ctx.owner_lock.is_locked());
+  assert(!bufferlists.empty());
+
+  Futures futures;
   uint64_t tid;
   {
     Mutex::Locker locker(m_lock);
@@ -815,13 +864,16 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
     tid = ++m_event_tid;
     assert(tid != 0);
 
-    future = m_journaler->append(m_tag_tid, bl);
-    m_events[tid] = Event(future, aio_comp, requests, offset, length);
+    for (auto &bl : bufferlists) {
+      assert(bl.length() <= m_max_append_size);
+      futures.push_back(m_journaler->append(m_tag_tid, bl));
+    }
+    m_events[tid] = Event(futures, aio_comp, requests, offset, length);
   }
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": "
-                 << "event=" << event_entry.get_event_type() << ", "
+                 << "event=" << event_type << ", "
                  << "new_reqs=" << requests.size() << ", "
                  << "offset=" << offset << ", "
                  << "length=" << length << ", "
@@ -830,9 +882,9 @@ uint64_t Journal<I>::append_io_event(AioCompletion *aio_comp,
   Context *on_safe = create_async_context_callback(
     m_image_ctx, new C_IOEventSafe(this, tid));
   if (flush_entry) {
-    future.flush(on_safe);
+    futures.back().flush(on_safe);
   } else {
-    future.wait(on_safe);
+    futures.back().wait(on_safe);
   }
   return tid;
 }
@@ -1006,7 +1058,7 @@ typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
 
   event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
                                                                  on_safe));
-  return event.future;
+  return event.futures.back();
 }
 
 template <typename I>
@@ -1107,7 +1159,9 @@ void Journal<I>::complete_event(typename Events::iterator it, int r) {
   event.committed_io = true;
   if (event.safe) {
     if (r >= 0) {
-      m_journaler->committed(event.future);
+      for (auto &future : event.futures) {
+        m_journaler->committed(future);
+      }
     }
     m_events.erase(it);
   }
@@ -1129,6 +1183,9 @@ void Journal<I>::handle_initialized(int r) {
     return;
   }
 
+  m_max_append_size = m_journaler->get_max_append_size();
+  ldout(cct, 20) << this << " max_append_size=" << m_max_append_size << dendl;
+
   // locate the master image client record
   cls::journal::Client client;
   r = m_journaler->get_cached_client(Journal<ImageCtx>::IMAGE_CLIENT_ID,
@@ -1387,7 +1444,9 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
       // failed journal write so IO won't be sent -- or IO extent was
       // overwritten by future IO operations so this was a no-op IO event
       event.ret_val = r;
-      m_journaler->committed(event.future);
+      for (auto &future : event.futures) {
+        m_journaler->committed(future);
+      }
     }
 
     if (event.committed_io) {
index 424ef0e4b3201450fe534ebd5fc5dce0ccd05d30..2eca784aa537dd5583c453b336a161f7a01cb0ee 100644 (file)
@@ -125,6 +125,11 @@ public:
 
   void flush_commit_position(Context *on_finish);
 
+  uint64_t append_write_event(AioCompletion *aio_comp,
+                              uint64_t offset, size_t length,
+                              const bufferlist &bl,
+                              const AioObjectRequests &requests,
+                              bool flush_entry);
   uint64_t append_io_event(AioCompletion *aio_comp,
                            journal::EventEntry &&event_entry,
                            const AioObjectRequests &requests,
@@ -160,11 +165,13 @@ private:
   typedef typename TypeTraits::Future Future;
   typedef typename TypeTraits::ReplayEntry ReplayEntry;
 
+  typedef std::list<bufferlist> Bufferlists;
   typedef std::list<Context *> Contexts;
+  typedef std::list<Future> Futures;
   typedef interval_set<uint64_t> ExtentInterval;
 
   struct Event {
-    Future future;
+    Futures futures;
     AioCompletion *aio_comp = nullptr;
     AioObjectRequests aio_object_requests;
     Contexts on_safe_contexts;
@@ -175,9 +182,9 @@ private:
 
     Event() {
     }
-    Event(const Future &_future, AioCompletion *_aio_comp,
+    Event(const Futures &_futures, AioCompletion *_aio_comp,
           const AioObjectRequests &_requests, uint64_t offset, size_t length)
-      : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+      : futures(_futures), aio_comp(_aio_comp), aio_object_requests(_requests) {
       if (length > 0) {
         pending_extents.insert(offset, length);
       }
@@ -256,6 +263,7 @@ private:
   Journaler *m_journaler;
   mutable Mutex m_lock;
   State m_state;
+  uint64_t m_max_append_size = 0;
   uint64_t m_tag_class = 0;
   uint64_t m_tag_tid = 0;
   journal::TagData m_tag_data;
@@ -277,6 +285,11 @@ private:
 
   journal::Replay<ImageCtxT> *m_journal_replay;
 
+  uint64_t append_io_events(AioCompletion *aio_comp,
+                            journal::EventType event_type,
+                            const Bufferlists &bufferlists,
+                            const AioObjectRequests &requests,
+                            uint64_t offset, size_t length, bool flush_entry);
   Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
 
   void create_journaler();
index 0f2576647920b15a659ed13a5f58cb98cbb0e93e..4008a0f15bfaf719adb5a1380ad067b5ad7b5de6 100644 (file)
@@ -59,9 +59,13 @@ struct AioWriteEvent {
   static const EventType TYPE = EVENT_TYPE_AIO_WRITE;
 
   uint64_t offset;
-  size_t length;
+  uint64_t length;
   bufferlist data;
 
+  static uint32_t get_fixed_size() {
+    return 30; /// version encoding, type, offset, length
+  }
+
   AioWriteEvent() : offset(0), length(0) {
   }
   AioWriteEvent(uint64_t _offset, size_t _length, const bufferlist &_data)
index 91c5c78a8b8c2eb5297345889f98a2789ca749bb..ec6c689b69d42fe57866035a5dc4ec4341a1b83f 100644 (file)
@@ -150,6 +150,9 @@ TEST_F(TestJournalEntries, AioWrite) {
   bufferlist buffer_bl;
   buffer_bl.append(buffer);
   ASSERT_TRUE(aio_write_event.data.contents_equal(buffer_bl));
+
+  ASSERT_EQ(librbd::journal::AioWriteEvent::get_fixed_size() +
+              aio_write_event.data.length(), replay_entry.get_data().length());
 }
 
 TEST_F(TestJournalEntries, AioDiscard) {
index 70deac04199ccaa36e67ff230948b9db98459c4a..92976e2b3d36219091cadb66b37ccbb8aafa03a4 100644 (file)
@@ -344,6 +344,12 @@ public:
                   .WillOnce(CompleteContext(r, NULL));
   }
 
+  void expect_get_max_append_size(::journal::MockJournaler &mock_journaler,
+                                  uint32_t max_size) {
+    EXPECT_CALL(mock_journaler, get_max_append_size())
+                  .WillOnce(Return(max_size));
+  }
+
   void expect_get_journaler_cached_client(::journal::MockJournaler &mock_journaler, int r) {
 
     journal::ImageClientMeta image_client_meta;
@@ -467,6 +473,15 @@ public:
     return ctx.wait();
   }
 
+  uint64_t when_append_write_event(MockJournalImageCtx &mock_image_ctx,
+                                   MockJournal &mock_journal, uint64_t length) {
+    bufferlist bl;
+    bl.append_zero(length);
+
+    RWLock::RLocker owner_locker(mock_image_ctx.owner_lock);
+    return mock_journal.append_write_event(nullptr, 0, length, bl, {}, false);
+  }
+
   uint64_t when_append_io_event(MockJournalImageCtx &mock_image_ctx,
                                 MockJournal &mock_journal,
                                 AioCompletion *aio_comp = nullptr) {
@@ -505,6 +520,7 @@ public:
     InSequence seq;
     expect_construct_journaler(mock_journaler);
     expect_init_journaler(mock_journaler, 0);
+    expect_get_max_append_size(mock_journaler, 1 << 16);
     expect_get_journaler_cached_client(mock_journaler, 0);
     expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
     expect_start_replay(
@@ -550,6 +566,7 @@ TEST_F(TestMockJournal, StateTransitions) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -615,6 +632,7 @@ TEST_F(TestMockJournal, GetCachedClientError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, -ENOENT);
   ASSERT_EQ(-ENOENT, when_open(mock_journal));
 }
@@ -634,6 +652,7 @@ TEST_F(TestMockJournal, GetTagsError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, -EBADMSG);
   ASSERT_EQ(-EBADMSG, when_open(mock_journal));
@@ -654,6 +673,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -668,6 +688,7 @@ TEST_F(TestMockJournal, ReplayCompleteError) {
   // replay failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -699,6 +720,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -718,6 +740,7 @@ TEST_F(TestMockJournal, FlushReplayError) {
   // replay flush failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -749,6 +772,7 @@ TEST_F(TestMockJournal, StopError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -780,6 +804,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
 
@@ -806,6 +831,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPreFlushError) {
   // replay write-to-disk failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -858,6 +884,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   ::journal::MockJournaler mock_journaler;
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -881,6 +908,7 @@ TEST_F(TestMockJournal, ReplayOnDiskPostFlushError) {
   // replay write-to-disk failure should result in replay-restart
   expect_construct_journaler(mock_journaler);
   expect_init_journaler(mock_journaler, 0);
+  expect_get_max_append_size(mock_journaler, 1 << 16);
   expect_get_journaler_cached_client(mock_journaler, 0);
   expect_get_journaler_tags(mock_image_ctx, mock_journaler, 0);
   expect_start_replay(
@@ -963,6 +991,42 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) {
   ASSERT_EQ(0, event_ctx.wait());
 }
 
+TEST_F(TestMockJournal, AppendWriteEvent) {
+  REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+  MockJournalImageCtx mock_image_ctx(*ictx);
+  MockJournal mock_journal(mock_image_ctx);
+  ::journal::MockJournaler mock_journaler;
+  open_journal(mock_image_ctx, mock_journal, mock_journaler);
+  BOOST_SCOPE_EXIT_ALL(&) {
+    close_journal(mock_journal, mock_journaler);
+  };
+
+  InSequence seq;
+
+  ::journal::MockFuture mock_future;
+  Context *on_journal_safe = nullptr;
+  expect_append_journaler(mock_journaler);
+  expect_append_journaler(mock_journaler);
+  expect_append_journaler(mock_journaler);
+  expect_wait_future(mock_future, &on_journal_safe);
+  ASSERT_EQ(1U, when_append_write_event(mock_image_ctx, mock_journal, 1 << 17));
+
+  on_journal_safe->complete(0);
+  C_SaferCond event_ctx;
+  mock_journal.wait_event(1U, &event_ctx);
+  ASSERT_EQ(0, event_ctx.wait());
+
+  expect_future_committed(mock_journaler);
+  expect_future_committed(mock_journaler);
+  expect_future_committed(mock_journaler);
+  mock_journal.commit_io_event(1U, 0);
+  ictx->op_work_queue->drain();
+}
+
 TEST_F(TestMockJournal, EventCommitError) {
   REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);