]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
journal: add support for aligned appends
authorMykola Golub <mgolub@suse.com>
Fri, 31 May 2019 12:41:22 +0000 (13:41 +0100)
committerMykola Golub <mgolub@suse.com>
Sun, 23 Jun 2019 09:06:45 +0000 (10:06 +0100)
Fixes: https://tracker.ceph.com/issues/39968
Signed-off-by: Mykola Golub <mgolub@suse.com>
src/journal/Entry.cc
src/journal/ObjectPlayer.cc
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h

index 48648a872dfc3ec36a6eae2a68c27b1703306f84..7899bf1cc5a6d4e701aa3ce70a569c0a42b07c21 100644 (file)
@@ -86,7 +86,14 @@ bool Entry::is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed)
   using ceph::decode;
   uint32_t start_off = iter.get_off();
   if (iter.get_remaining() < HEADER_FIXED_SIZE) {
-    *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+    bufferlist sub_bl;
+    sub_bl.substr_of(iter.get_bl(), iter.get_off(), iter.get_remaining());
+    if (sub_bl.length() > 0 && sub_bl.is_zero()) {
+      // pad bytes
+      *bytes_needed = 0;
+    } else {
+      *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+    }
     return false;
   }
   uint64_t bl_preamble;
index d4d9fb750b5e7f3dbc7840ecd0c9243d1e7f490d..0a85b8853f254ef63b7a9bc30a3ad4af5fe8545e 100644 (file)
 
 namespace journal {
 
+namespace {
+
+bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
+                              uint32_t *pad_len, bool *partial_entry) {
+  const uint32_t MAX_PAD = 8;
+  auto pad_bytes = MAX_PAD - off % MAX_PAD;
+  auto next = *iter;
+
+  ceph_assert(!next.end());
+  if (*next != '\0') {
+    return false;
+  }
+
+  for (auto i = pad_bytes - 1; i > 0; i--) {
+    if ((++next).end()) {
+      *partial_entry = true;
+      return false;
+    }
+    if (*next != '\0') {
+      return false;
+    }
+  }
+
+  *iter = next;
+  *pad_len += pad_bytes;
+  return true;
+}
+
+} // anonymous namespace
+
 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
                            const std::string &object_oid_prefix,
                            uint64_t object_num, SafeTimer &timer,
@@ -131,6 +161,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
 
   clear_invalid_range(m_read_bl_off, m_read_bl.length());
   bufferlist::const_iterator iter{&m_read_bl, 0};
+  uint32_t pad_len = 0;
   while (!iter.end()) {
     uint32_t bytes_needed;
     uint32_t bl_off = iter.get_off();
@@ -149,9 +180,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
         break;
       }
 
-      if (!invalid) {
+      if (!invalid &&
+          !advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
+                                    &pad_len, &partial_entry)) {
         invalid_start_off = m_read_bl_off + bl_off;
         invalid = true;
+        if (partial_entry) {
+          if (full_fetch) {
+            lderr(m_cct) << ": partial pad at offset " << invalid_start_off
+                         << dendl;
+          } else {
+            ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
+                             << dendl;
+          }
+          break;
+        }
         lderr(m_cct) << ": detected corrupt journal entry at offset "
                      << invalid_start_off << dendl;
       }
@@ -193,7 +236,8 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
     iter = bufferlist::iterator(&m_read_bl, 0);
 
     // advance the decoded entry offset
-    m_read_bl_off += entry_len;
+    m_read_bl_off += entry_len + pad_len;
+    pad_len = 0;
   }
 
   if (invalid) {
index 127731e95c322f93e2143cf4dde252c8f46b0980..2366759cc5c2bb52d2d6c482623d719f8436083e 100644 (file)
@@ -6,6 +6,7 @@
 #include "journal/Utils.h"
 #include "include/ceph_assert.h"
 #include "common/Timer.h"
+#include "common/errno.h"
 #include "cls/journal/cls_journal_client.h"
 
 #define dout_subsys ceph_subsys_journaler
@@ -31,6 +32,16 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
   ceph_assert(m_handler != NULL);
+
+  librados::Rados rados(m_ioctx);
+  int8_t require_osd_release = 0;
+  int r = rados.get_min_compatible_osd(&require_osd_release);
+  if (r < 0) {
+    ldout(m_cct, 0) << "failed to retrieve min OSD release: "
+                    << cpp_strerror(r) << dendl;
+  }
+  m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS;
+
   ldout(m_cct, 20) << dendl;
 }
 
@@ -287,10 +298,13 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
   }
 
   librados::ObjectWriteOperation op;
-  client::guard_append(&op, m_soft_max_size);
+  if (m_compat_mode) {
+    client::guard_append(&op, m_soft_max_size);
+  }
 
   size_t append_bytes = 0;
   AppendBuffers append_buffers;
+  bufferlist append_bl;
   for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
     auto& future = it->first;
     auto& bl = it->second;
@@ -308,8 +322,12 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
     ldout(m_cct, 20) << "flushing " << *future << dendl;
     future->set_flush_in_progress();
 
-    op.append(bl);
-    op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+    if (m_compat_mode) {
+      op.append(bl);
+      op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+    } else {
+      append_bl.append(bl);
+    }
 
     append_bytes += bl.length();
     append_buffers.push_back(*it);
@@ -332,6 +350,10 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
     ceph_assert(m_pending_bytes >= append_bytes);
     m_pending_bytes -= append_bytes;
 
+    if (!m_compat_mode) {
+      client::append(&op, m_soft_max_size, append_bl);
+    }
+
     auto rados_completion = librados::Rados::aio_create_completion(
       new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
     int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
index ff00e0a0a1f031da668e03994d6e7e7b7315db68..1f6014773ef5d88aa7b4e3c71594422fa8bd6e01 100644 (file)
@@ -122,6 +122,8 @@ private:
   double m_flush_age = 0;
   int32_t m_max_in_flight_appends;
 
+  bool m_compat_mode;
+
   FlushHandler m_flush_handler;
 
   mutable std::shared_ptr<Mutex> m_lock;