]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: improve object map performance under high IOPS workloads 28136/head
authorJason Dillaman <dillaman@redhat.com>
Thu, 28 Feb 2019 21:43:27 +0000 (16:43 -0500)
committerJason Dillaman <dillaman@redhat.com>
Wed, 15 May 2019 21:26:55 +0000 (17:26 -0400)
Do not zero-fill the BitVector's bitset prior to decoding the data.
Additionally, only read-update-modify the portions of the footer
that are potentially affected by the updated state.

Fixes: http://tracker.ceph.com/issues/38538
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 071671fff64f27943047610fe075a7e98f0f705c)

Conflicts:
src/cls/rbd/cls_rbd.cc
src/cls/rbd/cls_rbd_client.h
src/common/bit_vector.hpp
src/test/common/test_bit_vector.cc
src/test/librbd/test_ObjectMap.cc
Trivial conflicts with bufferlist::begin/cbegin and assert/ceph_assert

src/cls/rbd/cls_rbd.cc
src/cls/rbd/cls_rbd_client.cc
src/cls/rbd/cls_rbd_client.h
src/common/bit_vector.hpp
src/test/cls_rbd/test_cls_rbd.cc
src/test/common/test_bit_vector.cc
src/test/librbd/test_ObjectMap.cc

index 152b8bcba64e58cbf831b065f69a02173cfce4c7..5d355d573fe07f4b95de92c28dd2f13ddd368dad 100644 (file)
@@ -2730,35 +2730,57 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     return -EINVAL;
   }
 
+  uint64_t object_byte_offset;
+  uint64_t byte_length;
+  object_map.get_header_crc_extents(&object_byte_offset, &byte_length);
+
   bufferlist footer_bl;
-  r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
-                   size - object_map.get_footer_offset(), &footer_bl,
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
   if (r < 0) {
-    CLS_ERR("object map footer read failed");
+    CLS_ERR("object map footer read header CRC failed");
     return r;
   }
 
   try {
     bufferlist::iterator it = footer_bl.begin();
-    object_map.decode_footer(it);
+    object_map.decode_header_crc(it);
   } catch (const buffer::error &err) {
-    CLS_ERR("failed to decode object map footer: %s", err.what());
+    CLS_ERR("failed to decode object map header CRC: %s", err.what());
   }
 
   if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
     return -ERANGE;
   }
 
-  uint64_t byte_offset;
-  uint64_t byte_length;
-  object_map.get_data_extents(start_object_no,
-                             end_object_no - start_object_no,
-                             &byte_offset, &byte_length);
+  uint64_t object_count = end_object_no - start_object_no;
+  object_map.get_data_crcs_extents(start_object_no, object_count,
+                                   &object_byte_offset, &byte_length);
+  const auto footer_object_offset = object_byte_offset;
+
+  footer_bl.clear();
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
+                    CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  if (r < 0) {
+    CLS_ERR("object map footer read data CRCs failed");
+    return r;
+  }
+
+  try {
+    bufferlist::iterator it = footer_bl.begin();
+    object_map.decode_data_crcs(it, start_object_no);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode object map data CRCs: %s", err.what());
+  }
+
+  uint64_t data_byte_offset;
+  object_map.get_data_extents(start_object_no, object_count,
+                              &data_byte_offset, &object_byte_offset,
+                              &byte_length);
 
   bufferlist data_bl;
-  r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
-                   byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &data_bl,
+                    CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
   if (r < 0) {
     CLS_ERR("object map data read failed");
     return r;
@@ -2766,10 +2788,10 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
 
   try {
     bufferlist::iterator it = data_bl.begin();
-    object_map.decode_data(it, byte_offset);
+    object_map.decode_data(it, data_byte_offset);
   } catch (const buffer::error &err) {
     CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
-           byte_offset, err.what());
+           data_byte_offset, err.what());
     return -EINVAL;
   }
 
@@ -2788,13 +2810,11 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
 
   if (updated) {
     CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
-           byte_offset, byte_length,
-           object_map.get_header_length() + byte_offset);
+           data_byte_offset, byte_length, object_byte_offset);
 
     bufferlist data_bl;
-    object_map.encode_data(data_bl, byte_offset, byte_length);
-    r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
-                      data_bl.length(), &data_bl,
+    object_map.encode_data(data_bl, data_byte_offset, byte_length);
+    r = cls_cxx_write2(hctx, object_byte_offset, data_bl.length(), &data_bl,
                        CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
     if (r < 0) {
       CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
@@ -2802,8 +2822,8 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     }
 
     footer_bl.clear();
-    object_map.encode_footer(footer_bl);
-    r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
+    object_map.encode_data_crcs(footer_bl, start_object_no, object_count);
+    r = cls_cxx_write2(hctx, footer_object_offset, footer_bl.length(),
                       &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
     if (r < 0) {
       CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
index d564ef31456f5ac7b3d181fb9f0b9852b8ea3d85..0820b30b5cdd316c192f0499e57e8be3e21812a7 100644 (file)
@@ -7,6 +7,7 @@
 #include "include/encoding.h"
 #include "include/rbd_types.h"
 #include "include/rados/librados.hpp"
+#include "common/bit_vector.hpp"
 
 #include <errno.h>
 
index d353a9de09332e3a824f48262efb0e5f7a91def9..4e4a0c9b80e09578f73aa3fe40a46efab4010ea7 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
 #ifndef CEPH_LIBRBD_CLS_RBD_CLIENT_H
@@ -6,12 +6,12 @@
 
 #include "cls/lock/cls_lock_types.h"
 #include "cls/rbd/cls_rbd_types.h"
-#include "common/bit_vector.hpp"
 #include "common/snap_types.h"
 #include "include/types.h"
 #include "librbd/Types.h"
 
 class Context;
+namespace ceph { template <uint8_t> class BitVector; }
 namespace librados {
   class ObjectReadOperation;
   class IoCtx;
index 88a81296bc5e62ffa792237043101bfc6b089ec4..832e75fe6acc0174a6d9b685fb410586032cad45 100644 (file)
@@ -14,7 +14,9 @@
 #include "common/Formatter.h"
 #include "include/assert.h"
 #include "include/encoding.h"
+#include <memory>
 #include <utility>
+#include <vector>
 
 namespace ceph {
 
@@ -190,16 +192,29 @@ public:
   void decode_header(bufferlist::iterator& it);
   uint64_t get_header_length() const;
 
-  void encode_data(bufferlist& bl, uint64_t byte_offset,
+  void encode_data(bufferlist& bl, uint64_t data_byte_offset,
                   uint64_t byte_length) const;
   void decode_data(bufferlist::iterator& it, uint64_t byte_offset);
   void get_data_extents(uint64_t offset, uint64_t length,
-                       uint64_t *byte_offset, uint64_t *byte_length) const;
+                        uint64_t *data_byte_offset,
+                        uint64_t *object_byte_offset,
+                        uint64_t *byte_length) const;
 
   void encode_footer(bufferlist& bl) const;
   void decode_footer(bufferlist::iterator& it);
   uint64_t get_footer_offset() const;
 
+  void decode_header_crc(bufferlist::iterator& it);
+  void get_header_crc_extents(uint64_t *byte_offset,
+                              uint64_t *byte_length) const;
+
+  void encode_data_crcs(bufferlist& bl, uint64_t offset,
+                        uint64_t length) const;
+  void decode_data_crcs(bufferlist::iterator& it, uint64_t offset);
+  void get_data_crcs_extents(uint64_t offset, uint64_t length,
+                             uint64_t *byte_offset,
+                             uint64_t *byte_length) const;
+
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& it);
   void dump(Formatter *f) const;
@@ -208,13 +223,24 @@ public:
 
   static void generate_test_instances(std::list<BitVector *> &o);
 private:
+  struct NoInitAllocator : public std::allocator<__u32> {
+    NoInitAllocator(const std::allocator<__u32>& alloc)
+      : std::allocator<__u32>(alloc) {
+    }
+
+    template <class U, class... Args>
+    void construct(U* p, Args&&... args) const {
+    }
+  };
 
   bufferlist m_data;
   uint64_t m_size;
   bool m_crc_enabled;
 
   mutable __u32 m_header_crc;
-  mutable std::vector<__u32> m_data_crcs;
+  mutable std::vector<__u32, NoInitAllocator> m_data_crcs;
+
+  void resize(uint64_t elements, bool zero);
 
   static void compute_index(uint64_t offset, uint64_t *index, uint64_t *shift);
 
@@ -238,9 +264,18 @@ void BitVector<_b>::clear() {
 
 template <uint8_t _b>
 void BitVector<_b>::resize(uint64_t size) {
+  resize(size, true);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::resize(uint64_t size, bool zero) {
   uint64_t buffer_size = (size + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK;
   if (buffer_size > m_data.length()) {
-    m_data.append_zero(buffer_size - m_data.length());
+    if (zero) {
+      m_data.append_zero(buffer_size - m_data.length());
+    } else {
+      m_data.append(std::move(buffer::ptr(buffer_size - m_data.length())));
+    }
   } else if (buffer_size < m_data.length()) {
     bufferlist bl;
     bl.substr_of(m_data, 0, buffer_size);
@@ -291,7 +326,7 @@ void BitVector<_b>::decode_header(bufferlist::iterator& it) {
   decode(size, header_it);
   DECODE_FINISH(header_it);
 
-  resize(size);
+  resize(size, false);
   m_header_crc = header_bl.crc32c(0);
 }
 
@@ -302,44 +337,46 @@ uint64_t BitVector<_b>::get_header_length() const {
 }
 
 template <uint8_t _b>
-void BitVector<_b>::encode_data(bufferlist& bl, uint64_t byte_offset,
+void BitVector<_b>::encode_data(bufferlist& bl, uint64_t data_byte_offset,
                                uint64_t byte_length) const {
-  assert(byte_offset % BLOCK_SIZE == 0);
-  assert(byte_offset + byte_length == m_data.length() ||
-        byte_length % BLOCK_SIZE == 0);
+  assert(data_byte_offset % BLOCK_SIZE == 0);
+  assert(data_byte_offset + byte_length == m_data.length() ||
+         byte_length % BLOCK_SIZE == 0);
 
-  uint64_t end_offset = byte_offset + byte_length;
-  while (byte_offset < end_offset) {
-    uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - byte_offset);
+  uint64_t end_offset = data_byte_offset + byte_length;
+  while (data_byte_offset < end_offset) {
+    uint64_t len = std::min<uint64_t>(BLOCK_SIZE,
+                                      end_offset - data_byte_offset);
 
     bufferlist bit;
-    bit.substr_of(m_data, byte_offset, len);
-    m_data_crcs[byte_offset / BLOCK_SIZE] = bit.crc32c(0);
+    bit.substr_of(m_data, data_byte_offset, len);
+    m_data_crcs[data_byte_offset / BLOCK_SIZE] = bit.crc32c(0);
 
     bl.claim_append(bit);
-    byte_offset += BLOCK_SIZE;
+    data_byte_offset += BLOCK_SIZE;
   }
 }
 
 template <uint8_t _b>
-void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset) {
-  assert(byte_offset % BLOCK_SIZE == 0);
+void BitVector<_b>::decode_data(bufferlist::iterator& it,
+                                uint64_t data_byte_offset) {
+  assert(data_byte_offset % BLOCK_SIZE == 0);
   if (it.end()) {
     return;
   }
 
-  uint64_t end_offset = byte_offset + it.get_remaining();
+  uint64_t end_offset = data_byte_offset + it.get_remaining();
   if (end_offset > m_data.length()) {
     throw buffer::end_of_buffer();
   }
 
   bufferlist data;
-  if (byte_offset > 0) {
-    data.substr_of(m_data, 0, byte_offset);
+  if (data_byte_offset > 0) {
+    data.substr_of(m_data, 0, data_byte_offset);
   }
 
-  while (byte_offset < end_offset) {
-    uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - byte_offset);
+  while (data_byte_offset < end_offset) {
+    uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - data_byte_offset);
 
     bufferptr ptr;
     it.copy_deep(len, ptr);
@@ -347,11 +384,11 @@ void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset)
     bufferlist bit;
     bit.append(ptr);
     if (m_crc_enabled &&
-       m_data_crcs[byte_offset / BLOCK_SIZE] != bit.crc32c(0)) {
+       m_data_crcs[data_byte_offset / BLOCK_SIZE] != bit.crc32c(0)) {
       throw buffer::malformed_input("invalid data block CRC");
     }
     data.append(bit);
-    byte_offset += bit.length();
+    data_byte_offset += bit.length();
   }
 
   if (m_data.length() > end_offset) {
@@ -365,22 +402,24 @@ void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset)
 
 template <uint8_t _b>
 void BitVector<_b>::get_data_extents(uint64_t offset, uint64_t length,
-                                    uint64_t *byte_offset,
-                                    uint64_t *byte_length) const {
+                                     uint64_t *data_byte_offset,
+                                     uint64_t *object_byte_offset,
+                                     uint64_t *byte_length) const {
   // read BLOCK_SIZE-aligned chunks
   assert(length > 0 && offset + length <= m_size);
   uint64_t shift;
-  compute_index(offset, byte_offset, &shift);
-  *byte_offset -= (*byte_offset % BLOCK_SIZE);
+  compute_index(offset, data_byte_offset, &shift);
+  *data_byte_offset -= (*data_byte_offset % BLOCK_SIZE);
 
   uint64_t end_offset;
   compute_index(offset + length - 1, &end_offset, &shift);
   end_offset += (BLOCK_SIZE - (end_offset % BLOCK_SIZE));
-  assert(*byte_offset <= end_offset);
+  assert(*data_byte_offset <= end_offset);
 
-  *byte_length = end_offset - *byte_offset;
-  if (*byte_offset + *byte_length > m_data.length()) {
-    *byte_length = m_data.length() - *byte_offset;
+  *object_byte_offset = get_header_length() + *data_byte_offset;
+  *byte_length = end_offset - *data_byte_offset;
+  if (*data_byte_offset + *byte_length > m_data.length()) {
+    *byte_length = m_data.length() - *data_byte_offset;
   }
 }
 
@@ -390,7 +429,10 @@ void BitVector<_b>::encode_footer(bufferlist& bl) const {
   bufferlist footer_bl;
   if (m_crc_enabled) {
     encode(m_header_crc, footer_bl);
-    encode(m_data_crcs, footer_bl);
+
+    __u32 size = m_data_crcs.size();
+    encode(size, footer_bl);
+    encode_data_crcs(footer_bl, 0, m_size);
   }
   encode(footer_bl, bl);
 }
@@ -404,15 +446,13 @@ void BitVector<_b>::decode_footer(bufferlist::iterator& it) {
   m_crc_enabled = (footer_bl.length() > 0);
   if (m_crc_enabled) {
     bufferlist::iterator footer_it = footer_bl.begin();
+    decode_header_crc(footer_it);
 
-    __u32 header_crc;
-    decode(header_crc, footer_it);
-    if (m_header_crc != header_crc) {
-      throw buffer::malformed_input("incorrect header CRC");
-    }
+    __u32 data_src_size;
+    decode(data_src_size, footer_it);
+    decode_data_crcs(footer_it, 0);
 
     uint64_t block_count = (m_data.length() + BLOCK_SIZE - 1) / BLOCK_SIZE;
-    decode(m_data_crcs, footer_it);
     if (m_data_crcs.size() != block_count) {
       throw buffer::malformed_input("invalid data block CRCs");
     }
@@ -424,6 +464,94 @@ uint64_t BitVector<_b>::get_footer_offset() const {
   return get_header_length() + m_data.length();
 }
 
+template <uint8_t _b>
+void BitVector<_b>::decode_header_crc(bufferlist::iterator& it) {
+  if (it.get_remaining() > 0) {
+    __u32 header_crc;
+    ceph::decode(header_crc, it);
+    if (m_header_crc != header_crc) {
+      throw buffer::malformed_input("incorrect header CRC");
+    }
+  }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::get_header_crc_extents(uint64_t *byte_offset,
+                                           uint64_t *byte_length) const {
+  // footer is prefixed with a bufferlist length
+  *byte_offset = get_footer_offset() + sizeof(__u32);
+  *byte_length = sizeof(__u32);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::encode_data_crcs(bufferlist& bl, uint64_t offset,
+                                     uint64_t length) const {
+  if (length == 0) {
+    return;
+  }
+
+  uint64_t index;
+  uint64_t shift;
+  compute_index(offset, &index, &shift);
+  uint64_t crc_index = index / BLOCK_SIZE;
+
+  compute_index(offset + length - 1, &index, &shift);
+  uint64_t end_crc_index = index / BLOCK_SIZE;
+  while (crc_index <= end_crc_index) {
+    __u32 crc = m_data_crcs[crc_index++];
+    ceph::encode(crc, bl);
+  }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::decode_data_crcs(bufferlist::iterator& it,
+                                     uint64_t offset) {
+  if (it.end()) {
+    return;
+  }
+
+  uint64_t index;
+  uint64_t shift;
+  compute_index(offset, &index, &shift);
+
+  uint64_t crc_index = index / BLOCK_SIZE;
+  uint64_t remaining = it.get_remaining() / sizeof(__u32);
+  while (remaining > 0) {
+    __u32 crc;
+    ceph::decode(crc, it);
+    m_data_crcs[crc_index++] = crc;
+    --remaining;
+  }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::get_data_crcs_extents(uint64_t offset, uint64_t length,
+                                          uint64_t *byte_offset,
+                                          uint64_t *byte_length) const {
+  // data CRCs immediately follow the header CRC
+  get_header_crc_extents(byte_offset, byte_length);
+  *byte_offset += *byte_length;
+
+  // skip past data CRC vector size
+  *byte_offset += sizeof(__u32);
+
+  // CRCs are computed over BLOCK_SIZE chunks
+  ceph_assert(length > 0 && offset + length <= m_size);
+  uint64_t index;
+  uint64_t shift;
+  compute_index(offset, &index, &shift);
+  uint64_t start_byte_offset =
+    *byte_offset + ((index / BLOCK_SIZE) * sizeof(__u32));
+
+  compute_index(offset + length, &index, &shift);
+  uint64_t end_byte_offset =
+    *byte_offset + (((index / BLOCK_SIZE) + 1) * sizeof(__u32));
+  ceph_assert(start_byte_offset < end_byte_offset);
+
+  *byte_offset = start_byte_offset;
+  *byte_length = end_byte_offset - start_byte_offset;
+}
+
 template <uint8_t _b>
 void BitVector<_b>::encode(bufferlist& bl) const {
   encode_header(bl);
@@ -501,7 +629,7 @@ void BitVector<_b>::generate_test_instances(std::list<BitVector *> &o) {
   const uint64_t radix = 1 << b->BIT_COUNT;
   const uint64_t size = 1024;
 
-  b->resize(size);
+  b->resize(size, false);
   for (uint64_t i = 0; i < size; ++i) {
     (*b)[i] = rand() % radix;
   }
index 0ad492510c6f900392f5e3c959bb61c0af26e4aa..8c9e642c3c4da6b92d9dff9bc36d248e477f2ea3 100644 (file)
@@ -5,6 +5,7 @@
 #include "common/config.h"
 #include "common/snap_types.h"
 #include "common/Clock.h"
+#include "common/bit_vector.hpp"
 #include "include/encoding.h"
 #include "include/types.h"
 #include "include/rados/librados.h"
index 9c2bee8cbd83aec0b838450824475eaa8fe79fb6..b15a303782b0e161d2cfedde115ebbd7dd71ac0d 100644 (file)
@@ -94,15 +94,19 @@ TYPED_TEST(BitVectorTest, get_buffer_extents) {
 
   uint64_t offset = (bit_vector.BLOCK_SIZE + 11) * elements_per_byte;
   uint64_t length = (bit_vector.BLOCK_SIZE + 31) * elements_per_byte;
-  uint64_t byte_offset;
+  uint64_t data_byte_offset;
+  uint64_t object_byte_offset;
   uint64_t byte_length;
-  bit_vector.get_data_extents(offset, length, &byte_offset, &byte_length);
-  ASSERT_EQ(bit_vector.BLOCK_SIZE, byte_offset);
+  bit_vector.get_data_extents(offset, length, &data_byte_offset,
+                              &object_byte_offset, &byte_length);
+  ASSERT_EQ(bit_vector.BLOCK_SIZE, data_byte_offset);
   ASSERT_EQ(bit_vector.BLOCK_SIZE + (element_count % bit_vector.BLOCK_SIZE),
             byte_length);
 
-  bit_vector.get_data_extents(1, 1, &byte_offset, &byte_length);
-  ASSERT_EQ(0U, byte_offset);
+  bit_vector.get_data_extents(1, 1, &data_byte_offset, &object_byte_offset,
+                              &byte_length);
+  ASSERT_EQ(0U, data_byte_offset);
+  ASSERT_EQ(bit_vector.get_header_length(), object_byte_offset);
   ASSERT_EQ(bit_vector.BLOCK_SIZE, byte_length);
 }
 
@@ -119,9 +123,11 @@ TYPED_TEST(BitVectorTest, get_footer_offset) {
 
   bit_vector.resize(5111);
 
-  uint64_t byte_offset;
+  uint64_t data_byte_offset;
+  uint64_t object_byte_offset;
   uint64_t byte_length;
-  bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+  bit_vector.get_data_extents(0, bit_vector.size(), &data_byte_offset,
+                              &object_byte_offset, &byte_length);
 
   ASSERT_EQ(bit_vector.get_header_length() + byte_length,
            bit_vector.get_footer_offset());
@@ -145,11 +151,11 @@ TYPED_TEST(BitVectorTest, partial_decode_encode) {
   bufferlist::iterator header_it = header_bl.begin();
   bit_vector.decode_header(header_it);
 
-  bufferlist footer_bl;
-  footer_bl.substr_of(bl, bit_vector.get_footer_offset(),
-                     bl.length() - bit_vector.get_footer_offset());
-  bufferlist::iterator footer_it = footer_bl.begin();
-  bit_vector.decode_footer(footer_it);
+  uint64_t object_byte_offset;
+  uint64_t byte_length;
+  bit_vector.get_header_crc_extents(&object_byte_offset, &byte_length);
+  ASSERT_EQ(bit_vector.get_footer_offset() + 4, object_byte_offset);
+  ASSERT_EQ(4ULL, byte_length);
 
   typedef std::pair<uint64_t, uint64_t> Extent;
   typedef std::list<Extent> Extents;
@@ -162,38 +168,61 @@ TYPED_TEST(BitVectorTest, partial_decode_encode) {
     std::make_pair((2 * bit_vector.BLOCK_SIZE * elements_per_byte) + 2, 2))(
     std::make_pair(2, 2 * bit_vector.BLOCK_SIZE));
   for (Extents::iterator it = extents.begin(); it != extents.end(); ++it) {
+    bufferlist footer_bl;
+    uint64_t footer_byte_offset;
+    uint64_t footer_byte_length;
+    bit_vector.get_data_crcs_extents(it->first, it->second, &footer_byte_offset,
+                                     &footer_byte_length);
+    ASSERT_TRUE(footer_byte_offset + footer_byte_length <= bl.length());
+    footer_bl.substr_of(bl, footer_byte_offset, footer_byte_length);
+    bufferlist::iterator footer_it = footer_bl.begin();
+    bit_vector.decode_data_crcs(footer_it, it->first);
+
     uint64_t element_offset = it->first;
     uint64_t element_length = it->second;
-    uint64_t byte_offset;
-    uint64_t byte_length;
-    bit_vector.get_data_extents(element_offset, element_length, &byte_offset,
+    uint64_t data_byte_offset;
+    bit_vector.get_data_extents(element_offset, element_length,
+                                &data_byte_offset, &object_byte_offset,
                                 &byte_length);
 
     bufferlist data_bl;
-    data_bl.substr_of(bl, bit_vector.get_header_length() + byte_offset,
+    data_bl.substr_of(bl, bit_vector.get_header_length() + data_byte_offset,
                      byte_length);
     bufferlist::iterator data_it = data_bl.begin();
-    bit_vector.decode_data(data_it, byte_offset);
+    bit_vector.decode_data(data_it, data_byte_offset);
 
     data_bl.clear();
-    bit_vector.encode_data(data_bl, byte_offset, byte_length);
+    bit_vector.encode_data(data_bl, data_byte_offset, byte_length);
 
     footer_bl.clear();
-    bit_vector.encode_footer(footer_bl);
+    bit_vector.encode_data_crcs(footer_bl, it->first, it->second);
 
     bufferlist updated_bl;
-    updated_bl.substr_of(bl, 0, bit_vector.get_header_length() + byte_offset);
+    updated_bl.substr_of(bl, 0,
+                         bit_vector.get_header_length() + data_byte_offset);
     updated_bl.append(data_bl);
 
-    if (byte_offset + byte_length < bit_vector.get_footer_offset()) {
-      uint64_t tail_data_offset = bit_vector.get_header_length() + byte_offset +
-                                  byte_length;
+    if (data_byte_offset + byte_length < bit_vector.get_footer_offset()) {
+      uint64_t tail_data_offset = bit_vector.get_header_length() +
+                                  data_byte_offset + byte_length;
       data_bl.substr_of(bl, tail_data_offset,
                        bit_vector.get_footer_offset() - tail_data_offset);
       updated_bl.append(data_bl);
     }
 
-    updated_bl.append(footer_bl);
+    bufferlist full_footer;
+    full_footer.substr_of(bl, bit_vector.get_footer_offset(),
+                          footer_byte_offset - bit_vector.get_footer_offset());
+    full_footer.append(footer_bl);
+
+    if (footer_byte_offset + footer_byte_length < bl.length()) {
+      bufferlist footer_bit;
+      auto footer_offset = footer_byte_offset + footer_byte_length;
+      footer_bit.substr_of(bl, footer_offset, bl.length() - footer_offset);
+      full_footer.append(footer_bit);
+    }
+
+    updated_bl.append(full_footer);
     ASSERT_EQ(bl, updated_bl);
 
     bufferlist::iterator updated_it = updated_bl.begin();
@@ -228,24 +257,25 @@ TYPED_TEST(BitVectorTest, data_crc) {
   bit_vector1.resize((bit_vector1.BLOCK_SIZE + 1) * elements_per_byte);
   bit_vector2.resize((bit_vector2.BLOCK_SIZE + 1) * elements_per_byte);
 
-  uint64_t byte_offset;
+  uint64_t data_byte_offset;
+  uint64_t object_byte_offset;
   uint64_t byte_length;
-  bit_vector1.get_data_extents(0, bit_vector1.size(), &byte_offset,
-                              &byte_length);
+  bit_vector1.get_data_extents(0, bit_vector1.size(), &data_byte_offset,
+                               &object_byte_offset, &byte_length);
 
   bufferlist data;
-  bit_vector1.encode_data(data, byte_offset, byte_length);
+  bit_vector1.encode_data(data, data_byte_offset, byte_length);
 
   bufferlist::iterator data_it = data.begin();
-  bit_vector1.decode_data(data_it, byte_offset);
+  bit_vector1.decode_data(data_it, data_byte_offset);
 
   bit_vector2[bit_vector2.size() - 1] = 1;
 
   bufferlist dummy_data;
-  bit_vector2.encode_data(dummy_data, byte_offset, byte_length);
+  bit_vector2.encode_data(dummy_data, data_byte_offset, byte_length);
 
   data_it = data.begin();
-  ASSERT_THROW(bit_vector2.decode_data(data_it, byte_offset),
+  ASSERT_THROW(bit_vector2.decode_data(data_it, data_byte_offset),
               buffer::malformed_input);
 }
 
index f5e34c860b2374e3be3a14bb9953560208e193cd..d939ffb94d0d7f50609378dfc56c52acf7dd8191 100644 (file)
@@ -8,8 +8,14 @@
 #include "librbd/ImageWatcher.h"
 #include "librbd/internal.h"
 #include "librbd/ObjectMap.h"
+#include "common/Cond.h"
+#include "common/Throttle.h"
 #include "cls/rbd/cls_rbd_client.h"
+#include "cls/rbd/cls_rbd_types.h"
 #include <list>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/rolling_sum.hpp>
 
 void register_test_object_map() {
 }
@@ -148,3 +154,83 @@ TEST_F(TestObjectMap, AcquireLockInvalidatesWhenTooSmall) {
                                 &flags_set));
   ASSERT_TRUE(flags_set);
 }
+
+TEST_F(TestObjectMap, DISABLED_StressTest) {
+  REQUIRE_FEATURE(RBD_FEATURE_OBJECT_MAP);
+
+  uint64_t object_count = cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT;
+  librbd::ImageCtx *ictx;
+  ASSERT_EQ(0, open_image(m_image_name, &ictx));
+  ASSERT_EQ(0, resize(ictx, ictx->layout.object_size * object_count));
+
+  bool flags_set;
+  ASSERT_EQ(0, ictx->test_flags(CEPH_NOSNAP, RBD_FLAG_OBJECT_MAP_INVALID,
+                                &flags_set));
+  ASSERT_FALSE(flags_set);
+
+  srand(time(NULL) % (unsigned long) -1);
+
+  coarse_mono_time start = coarse_mono_clock::now();
+  chrono::duration<double> last = chrono::duration<double>::zero();
+
+  const int WINDOW_SIZE = 5;
+  typedef boost::accumulators::accumulator_set<
+    double, boost::accumulators::stats<
+      boost::accumulators::tag::rolling_sum> > RollingSum;
+
+  RollingSum time_acc(
+    boost::accumulators::tag::rolling_window::window_size = WINDOW_SIZE);
+  RollingSum ios_acc(
+    boost::accumulators::tag::rolling_window::window_size = WINDOW_SIZE);
+
+  uint32_t io_threads = 16;
+  uint64_t cur_ios = 0;
+  SimpleThrottle throttle(io_threads, false);
+  for (uint64_t ios = 0; ios < 100000;) {
+    if (throttle.pending_error()) {
+      break;
+    }
+
+    throttle.start_op();
+    uint64_t object_no = (rand() % object_count);
+    auto ctx = new FunctionContext([&throttle, object_no](int r) {
+        ASSERT_EQ(0, r) << "object_no=" << object_no;
+        throttle.end_op(r);
+      });
+
+    RWLock::RLocker owner_locker(ictx->owner_lock);
+    RWLock::RLocker snap_locker(ictx->snap_lock);
+    RWLock::WLocker object_map_locker(ictx->object_map_lock);
+    ASSERT_TRUE(ictx->object_map != nullptr);
+
+    if (!ictx->object_map->aio_update<
+          Context, &Context::complete>(CEPH_NOSNAP, object_no,
+                                       OBJECT_EXISTS, {}, {}, true,
+                                       ctx)) {
+      ctx->complete(0);
+    } else {
+      ++cur_ios;
+      ++ios;
+    }
+
+    coarse_mono_time now = coarse_mono_clock::now();
+    chrono::duration<double> elapsed = now - start;
+    if (last == chrono::duration<double>::zero()) {
+      last = elapsed;
+    } else if ((int)elapsed.count() != (int)last.count()) {
+      time_acc((elapsed - last).count());
+      ios_acc(static_cast<double>(cur_ios));
+      cur_ios = 0;
+
+      double time_sum = boost::accumulators::rolling_sum(time_acc);
+      std::cerr << std::setw(5) << (int)elapsed.count() << "\t"
+                << std::setw(8) << (int)ios << "\t"
+                << std::fixed << std::setw(8) << std::setprecision(2)
+                << boost::accumulators::rolling_sum(ios_acc) / time_sum
+                << std::endl;
+      last = elapsed;
+    }
+  }
+
+  ASSERT_EQ(0, throttle.wait_for_ret());
+}