From: Jason Dillaman Date: Wed, 4 Feb 2015 07:44:50 +0000 (-0500) Subject: cls_rbd: added CRC validation to object map X-Git-Tag: v0.93~38^2~1^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4638eaf775a67073d4f0a6a205a9650871fe8dc2;p=ceph.git cls_rbd: added CRC validation to object map Added a footer to the object map which stores a header CRC and and data CRCs for each 4KB chunk. Updates to the object map only require recomputing the CRC to the affected 4KB chunk. Signed-off-by: Jason Dillaman --- diff --git a/src/cls/rbd/cls_rbd.cc b/src/cls/rbd/cls_rbd.cc index bfde2b847db7..25ce93f49823 100644 --- a/src/cls/rbd/cls_rbd.cc +++ b/src/cls/rbd/cls_rbd.cc @@ -1916,6 +1916,7 @@ int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map) bufferlist::iterator iter = bl.begin(); ::decode(object_map, iter); } catch (const buffer::error &err) { + CLS_ERR("failed to decode object map: %s", err.what()); return -EINVAL; } return 0; @@ -1939,6 +1940,7 @@ int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out) return r; } + object_map.set_crc_enabled(false); ::encode(object_map, *out); return 0; } @@ -1989,9 +1991,8 @@ int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out bufferlist map; ::encode(object_map, map); - CLS_LOG(20, "object_map_resize: object size=%llu, byte size=%llu", - static_cast(object_count), - static_cast(map.length())); + CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u", + object_count, map.length()); return cls_cxx_write_full(hctx, &map); } @@ -2023,9 +2024,15 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out return -EINVAL; } + uint64_t size; + int r = cls_cxx_stat(hctx, &size, NULL); + if (r < 0) { + return r; + } + BitVector<2> object_map; bufferlist header_bl; - int r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl); + r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl); if (r < 0) { return r; } @@ -2034,9 +2041,20 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out bufferlist::iterator it = header_bl.begin(); object_map.decode_header(it); } catch (const buffer::error &err) { + CLS_ERR("failed to decode object map header: %s", err.what()); return -EINVAL; } + bufferlist footer_bl; + r = cls_cxx_read(hctx, object_map.get_footer_offset(), + size - object_map.get_footer_offset(), &footer_bl); + try { + bufferlist::iterator it = footer_bl.begin(); + object_map.decode_footer(it); + } catch (const buffer::error &err) { + CLS_ERR("failed to decode object map footer: %s", err.what()); + } + if (start_object_no >= end_object_no || end_object_no > object_map.size()) { return -ERANGE; } @@ -2058,6 +2076,8 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out bufferlist::iterator it = data_bl.begin(); object_map.decode_data(it, byte_offset); } catch (const buffer::error &err) { + CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s", + byte_offset, err.what()); return -EINVAL; } @@ -2072,16 +2092,19 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out } if (updated) { - CLS_LOG(20, "object_map_update: %llu~%llu -> %llu", - static_cast(byte_offset), - static_cast(byte_length), - static_cast(object_map.get_header_length() + - byte_offset)); - - bufferlist update; - object_map.encode_data(update, byte_offset, byte_length); + CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64, + byte_offset, byte_length, + object_map.get_header_length() + byte_offset); + + bufferlist data_bl; + object_map.encode_data(data_bl, byte_offset, byte_length); r = cls_cxx_write(hctx, object_map.get_header_length() + byte_offset, - update.length(), &update); + data_bl.length(), &data_bl); + + footer_bl.clear(); + object_map.encode_footer(footer_bl); + r = cls_cxx_write(hctx, object_map.get_footer_offset(), footer_bl.length(), + &footer_bl); } return r; } diff --git a/src/common/bit_vector.hpp b/src/common/bit_vector.hpp index 5c8e20f99c20..83d49d3ace77 100644 --- a/src/common/bit_vector.hpp +++ b/src/common/bit_vector.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include namespace ceph { @@ -62,6 +63,9 @@ public: BitVector(); + void set_crc_enabled(bool enabled) { + m_crc_enabled = enabled; + } void clear(); void resize(uint64_t elements); @@ -82,6 +86,10 @@ public: void get_data_extents(uint64_t offset, uint64_t length, uint64_t *byte_offset, uint64_t *byte_length) const; + void encode_footer(bufferlist& bl) const; + void decode_footer(bufferlist::iterator& it); + uint64_t get_footer_offset() const; + void encode(bufferlist& bl) const; void decode(bufferlist::iterator& it); void dump(Formatter *f) const; @@ -93,26 +101,31 @@ private: bufferlist m_data; uint64_t m_size; + bool m_crc_enabled; + + mutable __u32 m_header_crc; + mutable std::vector<__u32> m_data_crcs; static void compute_index(uint64_t offset, uint64_t *index, uint64_t *shift); }; template -BitVector<_b>::BitVector() : m_size(0) +BitVector<_b>::BitVector() : m_size(0), m_crc_enabled(true) { } template void BitVector<_b>::clear() { m_data.clear(); + m_data_crcs.clear(); m_size = 0; + m_header_crc = 0; } template void BitVector<_b>::resize(uint64_t size) { - uint64_t buffer_size = static_cast(std::ceil(static_cast(size) / - ELEMENTS_PER_BLOCK)); + uint64_t buffer_size = (size + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK; if (buffer_size > m_data.length()) { m_data.append_zero(buffer_size - m_data.length()); } else if (buffer_size < m_data.length()) { @@ -121,6 +134,9 @@ void BitVector<_b>::resize(uint64_t size) { bl.swap(m_data); } m_size = size; + + uint64_t block_count = (buffer_size + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE; + m_data_crcs.resize(block_count); } template @@ -141,74 +157,173 @@ void BitVector<_b>::compute_index(uint64_t offset, uint64_t *index, uint64_t *sh template void BitVector<_b>::encode_header(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - ::encode(m_size, bl); - ENCODE_FINISH(bl); + bufferlist header_bl; + ENCODE_START(1, 1, header_bl); + ::encode(m_size, header_bl); + ENCODE_FINISH(header_bl); + m_header_crc = header_bl.crc32c(0); + + ::encode(header_bl, bl); } template void BitVector<_b>::decode_header(bufferlist::iterator& it) { + bufferlist header_bl; + ::decode(header_bl, it); + + bufferlist::iterator header_it = header_bl.begin(); uint64_t size; - DECODE_START(1, it); - ::decode(size, it); - DECODE_FINISH(it); + DECODE_START(1, header_it); + ::decode(size, header_it); + DECODE_FINISH(header_it); resize(size); + m_header_crc = header_bl.crc32c(0); } template uint64_t BitVector<_b>::get_header_length() const { - // 6 byte encoding header, 8 byte size - return 14; + // 4 byte bl length, 6 byte encoding header, 8 byte size + return 18; } template void BitVector<_b>::encode_data(bufferlist& bl, uint64_t byte_offset, uint64_t byte_length) const { - bufferlist bit; - bit.substr_of(m_data, byte_offset, byte_length); - bl.append(bit); + assert(byte_offset % CEPH_PAGE_SIZE == 0); + assert(byte_offset + byte_length == m_data.length() || + byte_length % CEPH_PAGE_SIZE == 0); + + uint64_t end_offset = byte_offset + byte_length; + while (byte_offset < end_offset) { + uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset); + + bufferlist bit; + bit.substr_of(m_data, byte_offset, len); + m_data_crcs[byte_offset / CEPH_PAGE_SIZE] = + ceph_crc32c(0, reinterpret_cast(bit.c_str()), + bit.length()); + + bl.claim_append(bit); + byte_offset += CEPH_PAGE_SIZE; + } } template void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset) { - if (byte_offset + it.get_remaining() > m_data.length()) { - throw buffer::malformed_input("attempting to decode past end of buffer"); + assert(byte_offset % CEPH_PAGE_SIZE == 0); + if (it.end()) { + return; } - char* packed_data = m_data.c_str(); - for (; !it.end(); ++it) { - packed_data[byte_offset++] = *it; + uint64_t end_offset = byte_offset + it.get_remaining(); + if (end_offset > m_data.length()) { + throw buffer::end_of_buffer(); } + + bufferlist data; + if (byte_offset > 0) { + data.substr_of(m_data, 0, byte_offset); + } + + while (byte_offset < end_offset) { + uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset); + + bufferlist bit; + it.copy(len, bit); + if (m_crc_enabled && + m_data_crcs[byte_offset / CEPH_PAGE_SIZE] != bit.crc32c(0)) { + throw buffer::malformed_input("invalid data block CRC"); + } + data.append(bit); + byte_offset += bit.length(); + } + + if (m_data.length() > end_offset) { + bufferlist tail; + tail.substr_of(m_data, end_offset, m_data.length() - end_offset); + data.append(tail); + } + assert(data.length() == m_data.length()); + data.swap(m_data); } template void BitVector<_b>::get_data_extents(uint64_t offset, uint64_t length, uint64_t *byte_offset, uint64_t *byte_length) const { - assert(length > 0); + // read CEPH_PAGE_SIZE-aligned chunks + assert(length > 0 && offset + length <= m_size); uint64_t shift; compute_index(offset, byte_offset, &shift); + *byte_offset -= (*byte_offset % CEPH_PAGE_SIZE); uint64_t end_offset; compute_index(offset + length - 1, &end_offset, &shift); + end_offset += (CEPH_PAGE_SIZE - (end_offset % CEPH_PAGE_SIZE)); assert(*byte_offset <= end_offset); - *byte_length = end_offset - *byte_offset + 1; + *byte_length = MIN(end_offset - *byte_offset, m_data.length()); +} + +template +void BitVector<_b>::encode_footer(bufferlist& bl) const { + bufferlist footer_bl; + if (m_crc_enabled) { + ::encode(m_header_crc, footer_bl); + ::encode(m_data_crcs, footer_bl); + } + ::encode(footer_bl, bl); +} + +template +void BitVector<_b>::decode_footer(bufferlist::iterator& it) { + bufferlist footer_bl; + ::decode(footer_bl, it); + + m_crc_enabled = (footer_bl.length() > 0); + if (m_crc_enabled) { + bufferlist::iterator footer_it = footer_bl.begin(); + + __u32 header_crc; + ::decode(header_crc, footer_it); + if (m_header_crc != header_crc) { + throw buffer::malformed_input("incorrect header CRC"); + } + + uint64_t block_count = (m_data.length() + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE; + ::decode(m_data_crcs, footer_it); + if (m_data_crcs.size() != block_count) { + throw buffer::malformed_input("invalid data block CRCs"); + } + } +} + +template +uint64_t BitVector<_b>::get_footer_offset() const { + return get_header_length() + m_data.length(); } template void BitVector<_b>::encode(bufferlist& bl) const { encode_header(bl); - if (size() > 0) { - encode_data(bl, 0, m_data.length()); - } + encode_data(bl, 0, m_data.length()); + encode_footer(bl); } template void BitVector<_b>::decode(bufferlist::iterator& it) { decode_header(it); - decode_data(it, 0); + + bufferlist data_bl; + if (m_data.length() > 0) { + it.copy(m_data.length(), data_bl); + } + + decode_footer(it); + + bufferlist::iterator data_it = data_bl.begin(); + decode_data(data_it, 0); } template diff --git a/src/test/common/test_bit_vector.cc b/src/test/common/test_bit_vector.cc index ad6c243849e9..679b687252a0 100644 --- a/src/test/common/test_bit_vector.cc +++ b/src/test/common/test_bit_vector.cc @@ -87,18 +87,20 @@ TYPED_TEST(BitVectorTest, get_set) { TYPED_TEST(BitVectorTest, get_buffer_extents) { typename TestFixture::bit_vector_t bit_vector; - uint64_t offset = 5381; - uint64_t length = 4111; + uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT; + bit_vector.resize((2 * CEPH_PAGE_SIZE + 51) * elements_per_byte); + + uint64_t offset = (CEPH_PAGE_SIZE + 11) * elements_per_byte; + uint64_t length = (CEPH_PAGE_SIZE + 31) * elements_per_byte; uint64_t byte_offset; uint64_t byte_length; bit_vector.get_data_extents(offset, length, &byte_offset, &byte_length); + ASSERT_EQ(CEPH_PAGE_SIZE, byte_offset); + ASSERT_EQ(2 * CEPH_PAGE_SIZE, byte_length); - uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT; - uint64_t start_byte = offset / elements_per_byte; - ASSERT_EQ(start_byte, byte_offset); - - uint64_t end_byte = (offset + length - 1) / elements_per_byte; - ASSERT_EQ(end_byte - start_byte + 1, byte_length); + bit_vector.get_data_extents(1, 1, &byte_offset, &byte_length); + ASSERT_EQ(0U, byte_offset); + ASSERT_EQ(CEPH_PAGE_SIZE, byte_length); } TYPED_TEST(BitVectorTest, get_header_length) { @@ -108,3 +110,119 @@ TYPED_TEST(BitVectorTest, get_header_length) { bit_vector.encode_header(bl); ASSERT_EQ(bl.length(), bit_vector.get_header_length()); } + +TYPED_TEST(BitVectorTest, get_footer_offset) { + typename TestFixture::bit_vector_t bit_vector; + + bit_vector.resize(5111); + + uint64_t byte_offset; + uint64_t byte_length; + bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length); + + ASSERT_EQ(bit_vector.get_header_length() + byte_length, + bit_vector.get_footer_offset()); +} + +TYPED_TEST(BitVectorTest, partial_decode_encode) { + typename TestFixture::bit_vector_t bit_vector; + + uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT; + bit_vector.resize(5111 * elements_per_byte); + for (uint64_t i = 0; i < bit_vector.size(); ++i) { + bit_vector[i] = i % 4; + } + + bufferlist bl; + ::encode(bit_vector, bl); + bit_vector.clear(); + + bufferlist header_bl; + header_bl.substr_of(bl, 0, bit_vector.get_header_length()); + bufferlist::iterator header_it = header_bl.begin(); + bit_vector.decode_header(header_it); + + bufferlist footer_bl; + footer_bl.substr_of(bl, bit_vector.get_footer_offset(), + bl.length() - bit_vector.get_footer_offset()); + bufferlist::iterator footer_it = footer_bl.begin(); + bit_vector.decode_footer(footer_it); + + uint64_t byte_offset; + uint64_t byte_length; + bit_vector.get_data_extents(0, 1, &byte_offset, &byte_length); + + bufferlist data_bl; + data_bl.substr_of(bl, bit_vector.get_header_length() + byte_offset, + byte_length); + bufferlist::iterator data_it = data_bl.begin(); + bit_vector.decode_data(data_it, byte_offset); + + bit_vector[0] = 3; + + data_bl.clear(); + bit_vector.encode_data(data_bl, byte_offset, byte_length); + + footer_bl.clear(); + bit_vector.encode_footer(footer_bl); + + bufferlist updated_bl; + updated_bl.substr_of(bl, 0, bit_vector.get_header_length() + byte_offset); + updated_bl.append(data_bl); + + uint64_t tail_data_offset = bit_vector.get_header_length() + byte_offset + + byte_length; + data_bl.substr_of(bl, tail_data_offset, + bit_vector.get_footer_offset() - tail_data_offset); + updated_bl.append(data_bl); + updated_bl.append(footer_bl); + ASSERT_EQ(bl.length(), updated_bl.length()); + + bufferlist::iterator updated_it = updated_bl.begin(); + ::decode(bit_vector, updated_it); +} + +TYPED_TEST(BitVectorTest, header_crc) { + typename TestFixture::bit_vector_t bit_vector; + + bufferlist header; + bit_vector.encode_header(header); + + bufferlist footer; + bit_vector.encode_footer(footer); + + bufferlist::iterator it = footer.begin(); + bit_vector.decode_footer(it); + + bit_vector.resize(1); + bit_vector.encode_header(header); + + it = footer.begin(); + ASSERT_THROW(bit_vector.decode_footer(it), buffer::malformed_input); +} + +TYPED_TEST(BitVectorTest, data_crc) { + typename TestFixture::bit_vector_t bit_vector; + + uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT; + bit_vector.resize((CEPH_PAGE_SIZE + 1) * elements_per_byte); + + uint64_t byte_offset; + uint64_t byte_length; + bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length); + + bufferlist data; + bit_vector.encode_data(data, byte_offset, byte_length); + + bufferlist::iterator data_it = data.begin(); + bit_vector.decode_data(data_it, byte_offset); + + bit_vector[bit_vector.size() - 1] = 1; + + bufferlist dummy_data; + bit_vector.encode_data(dummy_data, byte_offset, byte_length); + + data_it = data.begin(); + ASSERT_THROW(bit_vector.decode_data(data_it, byte_offset), + buffer::malformed_input); +}