]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rbd: added CRC validation to object map
authorJason Dillaman <dillaman@redhat.com>
Wed, 4 Feb 2015 07:44:50 +0000 (02:44 -0500)
committerJosh Durgin <jdurgin@redhat.com>
Mon, 16 Feb 2015 21:27:34 +0000 (13:27 -0800)
Added a footer to the object map which stores a header CRC and
and data CRCs for each 4KB chunk.  Updates to the object map only
require recomputing the CRC to the affected 4KB chunk.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/cls/rbd/cls_rbd.cc
src/common/bit_vector.hpp
src/test/common/test_bit_vector.cc

index bfde2b847db76df877b29d891d9feca5010fa8fc..25ce93f49823620d09edb4774d25f2220a26e5cd 100644 (file)
@@ -1916,6 +1916,7 @@ int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
     bufferlist::iterator iter = bl.begin();
     ::decode(object_map, iter);
   } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode object map: %s", err.what());
     return -EINVAL;
   }
   return 0;
@@ -1939,6 +1940,7 @@ int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     return r;
   }
 
+  object_map.set_crc_enabled(false);
   ::encode(object_map, *out);
   return 0;
 }
@@ -1989,9 +1991,8 @@ int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out
 
   bufferlist map;
   ::encode(object_map, map);
-  CLS_LOG(20, "object_map_resize: object size=%llu, byte size=%llu",
-         static_cast<unsigned long long>(object_count),
-         static_cast<unsigned long long>(map.length()));
+  CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
+         object_count, map.length());
   return cls_cxx_write_full(hctx, &map);
 }
 
@@ -2023,9 +2024,15 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     return -EINVAL;
   }
 
+  uint64_t size;
+  int r = cls_cxx_stat(hctx, &size, NULL);
+  if (r < 0) {
+    return r;
+  }
+
   BitVector<2> object_map;
   bufferlist header_bl;
-  int r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl);
+  r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl);
   if (r < 0) {
     return r;
   }
@@ -2034,9 +2041,20 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     bufferlist::iterator it = header_bl.begin();
     object_map.decode_header(it);
   } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode object map header: %s", err.what());
     return -EINVAL;
   }
 
+  bufferlist footer_bl;
+  r = cls_cxx_read(hctx, object_map.get_footer_offset(),
+                  size - object_map.get_footer_offset(), &footer_bl);
+  try {
+    bufferlist::iterator it = footer_bl.begin();
+    object_map.decode_footer(it);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode object map footer: %s", err.what());
+  }
+
   if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
     return -ERANGE;
   }
@@ -2058,6 +2076,8 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     bufferlist::iterator it = data_bl.begin();
     object_map.decode_data(it, byte_offset);
   } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
+           byte_offset, err.what());
     return -EINVAL;
   } 
 
@@ -2072,16 +2092,19 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
   }
 
   if (updated) {
-    CLS_LOG(20, "object_map_update: %llu~%llu -> %llu",
-           static_cast<unsigned long long>(byte_offset),
-           static_cast<unsigned long long>(byte_length),
-           static_cast<unsigned long long>(object_map.get_header_length() +
-                                           byte_offset));
-
-    bufferlist update;
-    object_map.encode_data(update, byte_offset, byte_length); 
+    CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
+           byte_offset, byte_length,
+           object_map.get_header_length() + byte_offset);
+
+    bufferlist data_bl;
+    object_map.encode_data(data_bl, byte_offset, byte_length);
     r = cls_cxx_write(hctx, object_map.get_header_length() + byte_offset,
-                     update.length(), &update);
+                     data_bl.length(), &data_bl);
+
+    footer_bl.clear();
+    object_map.encode_footer(footer_bl);
+    r = cls_cxx_write(hctx, object_map.get_footer_offset(), footer_bl.length(),
+                     &footer_bl);
   }
   return r;
 }
index 5c8e20f99c205c7222428d9aa881d3f3835ae0fa..83d49d3ace77b949c29e0e9ed080beb441c3d9b8 100644 (file)
@@ -18,6 +18,7 @@
 #include <stdint.h>
 #include <cmath>
 #include <list>
+#include <vector>
 #include <boost/static_assert.hpp>
 
 namespace ceph {
@@ -62,6 +63,9 @@ public:
 
   BitVector();
 
+  void set_crc_enabled(bool enabled) {
+    m_crc_enabled = enabled;
+  }
   void clear();
 
   void resize(uint64_t elements);
@@ -82,6 +86,10 @@ public:
   void get_data_extents(uint64_t offset, uint64_t length,
                        uint64_t *byte_offset, uint64_t *byte_length) const;
 
+  void encode_footer(bufferlist& bl) const;
+  void decode_footer(bufferlist::iterator& it);
+  uint64_t get_footer_offset() const;
+
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& it);
   void dump(Formatter *f) const;
@@ -93,26 +101,31 @@ private:
 
   bufferlist m_data;
   uint64_t m_size;
+  bool m_crc_enabled;
+
+  mutable __u32 m_header_crc;
+  mutable std::vector<__u32> m_data_crcs;
 
   static void compute_index(uint64_t offset, uint64_t *index, uint64_t *shift);
 
 };
 
 template <uint8_t _b>
-BitVector<_b>::BitVector() : m_size(0)
+BitVector<_b>::BitVector() : m_size(0), m_crc_enabled(true)
 {
 }
 
 template <uint8_t _b>
 void BitVector<_b>::clear() {
   m_data.clear();
+  m_data_crcs.clear();
   m_size = 0;
+  m_header_crc = 0;
 }
 
 template <uint8_t _b>
 void BitVector<_b>::resize(uint64_t size) {
-  uint64_t buffer_size = static_cast<uint64_t>(std::ceil(static_cast<double>(size) /
-                                          ELEMENTS_PER_BLOCK));
+  uint64_t buffer_size = (size + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK;
   if (buffer_size > m_data.length()) {
     m_data.append_zero(buffer_size - m_data.length());
   } else if (buffer_size < m_data.length()) {
@@ -121,6 +134,9 @@ void BitVector<_b>::resize(uint64_t size) {
     bl.swap(m_data);
   }
   m_size = size;
+
+  uint64_t block_count = (buffer_size + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE;
+  m_data_crcs.resize(block_count);
 }
 
 template <uint8_t _b>
@@ -141,74 +157,173 @@ void BitVector<_b>::compute_index(uint64_t offset, uint64_t *index, uint64_t *sh
 
 template <uint8_t _b>
 void BitVector<_b>::encode_header(bufferlist& bl) const {
-  ENCODE_START(1, 1, bl);
-  ::encode(m_size, bl);
-  ENCODE_FINISH(bl);
+  bufferlist header_bl;
+  ENCODE_START(1, 1, header_bl);
+  ::encode(m_size, header_bl);
+  ENCODE_FINISH(header_bl);
+  m_header_crc = header_bl.crc32c(0);
+
+  ::encode(header_bl, bl);
 }
 
 template <uint8_t _b>
 void BitVector<_b>::decode_header(bufferlist::iterator& it) {
+  bufferlist header_bl;
+  ::decode(header_bl, it);
+
+  bufferlist::iterator header_it = header_bl.begin();
   uint64_t size;
-  DECODE_START(1, it);
-  ::decode(size, it);
-  DECODE_FINISH(it);
+  DECODE_START(1, header_it);
+  ::decode(size, header_it);
+  DECODE_FINISH(header_it);
 
   resize(size);
+  m_header_crc = header_bl.crc32c(0);
 }
 
 template <uint8_t _b>
 uint64_t BitVector<_b>::get_header_length() const {
-  // 6 byte encoding header, 8 byte size
-  return 14;
+  // 4 byte bl length, 6 byte encoding header, 8 byte size
+  return 18;
 }
 
 template <uint8_t _b>
 void BitVector<_b>::encode_data(bufferlist& bl, uint64_t byte_offset,
                                uint64_t byte_length) const {
-  bufferlist bit;
-  bit.substr_of(m_data, byte_offset, byte_length);
-  bl.append(bit);
+  assert(byte_offset % CEPH_PAGE_SIZE == 0);
+  assert(byte_offset + byte_length == m_data.length() ||
+        byte_length % CEPH_PAGE_SIZE == 0);
+
+  uint64_t end_offset = byte_offset + byte_length;
+  while (byte_offset < end_offset) {
+    uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset);
+
+    bufferlist bit;
+    bit.substr_of(m_data, byte_offset, len);
+    m_data_crcs[byte_offset / CEPH_PAGE_SIZE] =
+      ceph_crc32c(0, reinterpret_cast<unsigned char*>(bit.c_str()),
+                 bit.length());
+
+    bl.claim_append(bit);
+    byte_offset += CEPH_PAGE_SIZE;
+  }
 }
 
 template <uint8_t _b>
 void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset) {
-  if (byte_offset + it.get_remaining() > m_data.length()) {
-    throw buffer::malformed_input("attempting to decode past end of buffer");
+  assert(byte_offset % CEPH_PAGE_SIZE == 0);
+  if (it.end()) {
+    return;
   }
 
-  char* packed_data = m_data.c_str();
-  for (; !it.end(); ++it) {
-    packed_data[byte_offset++] = *it;
+  uint64_t end_offset = byte_offset + it.get_remaining();
+  if (end_offset > m_data.length()) {
+    throw buffer::end_of_buffer();
   }
+
+  bufferlist data;
+  if (byte_offset > 0) {
+    data.substr_of(m_data, 0, byte_offset);
+  }
+
+  while (byte_offset < end_offset) {
+    uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset);
+
+    bufferlist bit;
+    it.copy(len, bit);
+    if (m_crc_enabled &&
+       m_data_crcs[byte_offset / CEPH_PAGE_SIZE] != bit.crc32c(0)) {
+      throw buffer::malformed_input("invalid data block CRC");
+    }
+    data.append(bit);
+    byte_offset += bit.length();
+  }
+
+  if (m_data.length() > end_offset) {
+    bufferlist tail;
+    tail.substr_of(m_data, end_offset, m_data.length() - end_offset);
+    data.append(tail);
+  }
+  assert(data.length() == m_data.length());
+  data.swap(m_data);
 }
 
 template <uint8_t _b>
 void BitVector<_b>::get_data_extents(uint64_t offset, uint64_t length,
                                     uint64_t *byte_offset,
                                     uint64_t *byte_length) const {
-  assert(length > 0);
+  // read CEPH_PAGE_SIZE-aligned chunks
+  assert(length > 0 && offset + length <= m_size);
   uint64_t shift;
   compute_index(offset, byte_offset, &shift);
+  *byte_offset -= (*byte_offset % CEPH_PAGE_SIZE);
 
   uint64_t end_offset;
   compute_index(offset + length - 1, &end_offset, &shift);
+  end_offset += (CEPH_PAGE_SIZE - (end_offset % CEPH_PAGE_SIZE));
   assert(*byte_offset <= end_offset);
 
-  *byte_length = end_offset - *byte_offset + 1;
+  *byte_length = MIN(end_offset - *byte_offset, m_data.length());
+}
+
+template <uint8_t _b>
+void BitVector<_b>::encode_footer(bufferlist& bl) const {
+  bufferlist footer_bl;
+  if (m_crc_enabled) {
+    ::encode(m_header_crc, footer_bl);
+    ::encode(m_data_crcs, footer_bl);
+  }
+  ::encode(footer_bl, bl);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::decode_footer(bufferlist::iterator& it) {
+  bufferlist footer_bl;
+  ::decode(footer_bl, it);
+
+  m_crc_enabled = (footer_bl.length() > 0);
+  if (m_crc_enabled) {
+    bufferlist::iterator footer_it = footer_bl.begin();
+
+    __u32 header_crc;
+    ::decode(header_crc, footer_it);
+    if (m_header_crc != header_crc) {
+      throw buffer::malformed_input("incorrect header CRC");
+    }
+
+    uint64_t block_count = (m_data.length() + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE;
+    ::decode(m_data_crcs, footer_it);
+    if (m_data_crcs.size() != block_count) {
+      throw buffer::malformed_input("invalid data block CRCs");
+    }
+  }
+}
+
+template <uint8_t _b>
+uint64_t BitVector<_b>::get_footer_offset() const {
+  return get_header_length() + m_data.length();
 }
 
 template <uint8_t _b>
 void BitVector<_b>::encode(bufferlist& bl) const {
   encode_header(bl);
-  if (size() > 0) {
-    encode_data(bl, 0, m_data.length()); 
-  }
+  encode_data(bl, 0, m_data.length());
+  encode_footer(bl);
 }
 
 template <uint8_t _b>
 void BitVector<_b>::decode(bufferlist::iterator& it) {
   decode_header(it);
-  decode_data(it, 0);
+
+  bufferlist data_bl;
+  if (m_data.length() > 0) {
+    it.copy(m_data.length(), data_bl);
+  }
+
+  decode_footer(it);
+
+  bufferlist::iterator data_it = data_bl.begin();
+  decode_data(data_it, 0);
 }
 
 template <uint8_t _b>
index ad6c243849e96fc0a8b904dba7bf4a68bd1e2790..679b687252a03bd7296c243d7b619c2701d80c27 100644 (file)
@@ -87,18 +87,20 @@ TYPED_TEST(BitVectorTest, get_set) {
 TYPED_TEST(BitVectorTest, get_buffer_extents) {
   typename TestFixture::bit_vector_t bit_vector;
 
-  uint64_t offset = 5381;
-  uint64_t length = 4111;
+  uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+  bit_vector.resize((2 * CEPH_PAGE_SIZE + 51) * elements_per_byte);
+
+  uint64_t offset = (CEPH_PAGE_SIZE + 11) * elements_per_byte;
+  uint64_t length = (CEPH_PAGE_SIZE + 31) * elements_per_byte;
   uint64_t byte_offset;
   uint64_t byte_length;
   bit_vector.get_data_extents(offset, length, &byte_offset, &byte_length);
+  ASSERT_EQ(CEPH_PAGE_SIZE, byte_offset);
+  ASSERT_EQ(2 * CEPH_PAGE_SIZE, byte_length);
 
-  uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
-  uint64_t start_byte = offset / elements_per_byte;
-  ASSERT_EQ(start_byte, byte_offset);
-
-  uint64_t end_byte = (offset + length - 1) / elements_per_byte;
-  ASSERT_EQ(end_byte - start_byte + 1, byte_length);
+  bit_vector.get_data_extents(1, 1, &byte_offset, &byte_length);
+  ASSERT_EQ(0U, byte_offset);
+  ASSERT_EQ(CEPH_PAGE_SIZE, byte_length);
 }
 
 TYPED_TEST(BitVectorTest, get_header_length) {
@@ -108,3 +110,119 @@ TYPED_TEST(BitVectorTest, get_header_length) {
   bit_vector.encode_header(bl);
   ASSERT_EQ(bl.length(), bit_vector.get_header_length());
 }
+
+TYPED_TEST(BitVectorTest, get_footer_offset) {
+  typename TestFixture::bit_vector_t bit_vector;
+
+  bit_vector.resize(5111);
+
+  uint64_t byte_offset;
+  uint64_t byte_length;
+  bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+
+  ASSERT_EQ(bit_vector.get_header_length() + byte_length,
+           bit_vector.get_footer_offset());
+}
+
+TYPED_TEST(BitVectorTest, partial_decode_encode) {
+  typename TestFixture::bit_vector_t bit_vector;
+
+  uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+  bit_vector.resize(5111 * elements_per_byte);
+  for (uint64_t i = 0; i < bit_vector.size(); ++i) {
+    bit_vector[i] = i % 4;
+  }
+
+  bufferlist bl;
+  ::encode(bit_vector, bl);
+  bit_vector.clear();
+
+  bufferlist header_bl;
+  header_bl.substr_of(bl, 0, bit_vector.get_header_length());
+  bufferlist::iterator header_it = header_bl.begin();
+  bit_vector.decode_header(header_it);
+
+  bufferlist footer_bl;
+  footer_bl.substr_of(bl, bit_vector.get_footer_offset(),
+                     bl.length() - bit_vector.get_footer_offset());
+  bufferlist::iterator footer_it = footer_bl.begin();
+  bit_vector.decode_footer(footer_it);
+
+  uint64_t byte_offset;
+  uint64_t byte_length;
+  bit_vector.get_data_extents(0, 1, &byte_offset, &byte_length); 
+
+  bufferlist data_bl;
+  data_bl.substr_of(bl, bit_vector.get_header_length() + byte_offset,
+                   byte_length);
+  bufferlist::iterator data_it = data_bl.begin();
+  bit_vector.decode_data(data_it, byte_offset);
+
+  bit_vector[0] = 3;
+
+  data_bl.clear();
+  bit_vector.encode_data(data_bl, byte_offset, byte_length);
+
+  footer_bl.clear();
+  bit_vector.encode_footer(footer_bl);
+
+  bufferlist updated_bl;
+  updated_bl.substr_of(bl, 0, bit_vector.get_header_length() + byte_offset);
+  updated_bl.append(data_bl);
+
+  uint64_t tail_data_offset = bit_vector.get_header_length() + byte_offset +
+                             byte_length;
+  data_bl.substr_of(bl, tail_data_offset,
+                   bit_vector.get_footer_offset() - tail_data_offset);
+  updated_bl.append(data_bl);
+  updated_bl.append(footer_bl);
+  ASSERT_EQ(bl.length(), updated_bl.length());
+
+  bufferlist::iterator updated_it = updated_bl.begin();
+  ::decode(bit_vector, updated_it); 
+}
+
+TYPED_TEST(BitVectorTest, header_crc) {
+  typename TestFixture::bit_vector_t bit_vector;
+
+  bufferlist header;
+  bit_vector.encode_header(header);
+
+  bufferlist footer;
+  bit_vector.encode_footer(footer);
+
+  bufferlist::iterator it = footer.begin();
+  bit_vector.decode_footer(it);
+
+  bit_vector.resize(1);
+  bit_vector.encode_header(header);
+
+  it = footer.begin();
+  ASSERT_THROW(bit_vector.decode_footer(it), buffer::malformed_input);
+}
+
+TYPED_TEST(BitVectorTest, data_crc) {
+  typename TestFixture::bit_vector_t bit_vector;
+
+  uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+  bit_vector.resize((CEPH_PAGE_SIZE + 1) * elements_per_byte);
+
+  uint64_t byte_offset;
+  uint64_t byte_length;
+  bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+
+  bufferlist data;
+  bit_vector.encode_data(data, byte_offset, byte_length);
+
+  bufferlist::iterator data_it = data.begin();
+  bit_vector.decode_data(data_it, byte_offset); 
+
+  bit_vector[bit_vector.size() - 1] = 1;
+
+  bufferlist dummy_data;
+  bit_vector.encode_data(dummy_data, byte_offset, byte_length);
+
+  data_it = data.begin();
+  ASSERT_THROW(bit_vector.decode_data(data_it, byte_offset),
+              buffer::malformed_input);
+}