bufferlist::iterator iter = bl.begin();
::decode(object_map, iter);
} catch (const buffer::error &err) {
+ CLS_ERR("failed to decode object map: %s", err.what());
return -EINVAL;
}
return 0;
return r;
}
+ object_map.set_crc_enabled(false);
::encode(object_map, *out);
return 0;
}
bufferlist map;
::encode(object_map, map);
- CLS_LOG(20, "object_map_resize: object size=%llu, byte size=%llu",
- static_cast<unsigned long long>(object_count),
- static_cast<unsigned long long>(map.length()));
+ CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
+ object_count, map.length());
return cls_cxx_write_full(hctx, &map);
}
return -EINVAL;
}
+ uint64_t size;
+ int r = cls_cxx_stat(hctx, &size, NULL);
+ if (r < 0) {
+ return r;
+ }
+
BitVector<2> object_map;
bufferlist header_bl;
- int r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl);
+ r = cls_cxx_read(hctx, 0, object_map.get_header_length(), &header_bl);
if (r < 0) {
return r;
}
bufferlist::iterator it = header_bl.begin();
object_map.decode_header(it);
} catch (const buffer::error &err) {
+ CLS_ERR("failed to decode object map header: %s", err.what());
return -EINVAL;
}
+ bufferlist footer_bl;
+ r = cls_cxx_read(hctx, object_map.get_footer_offset(),
+ size - object_map.get_footer_offset(), &footer_bl);
+ try {
+ bufferlist::iterator it = footer_bl.begin();
+ object_map.decode_footer(it);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode object map footer: %s", err.what());
+ }
+
if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
return -ERANGE;
}
bufferlist::iterator it = data_bl.begin();
object_map.decode_data(it, byte_offset);
} catch (const buffer::error &err) {
+ CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
+ byte_offset, err.what());
return -EINVAL;
}
}
if (updated) {
- CLS_LOG(20, "object_map_update: %llu~%llu -> %llu",
- static_cast<unsigned long long>(byte_offset),
- static_cast<unsigned long long>(byte_length),
- static_cast<unsigned long long>(object_map.get_header_length() +
- byte_offset));
-
- bufferlist update;
- object_map.encode_data(update, byte_offset, byte_length);
+ CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
+ byte_offset, byte_length,
+ object_map.get_header_length() + byte_offset);
+
+ bufferlist data_bl;
+ object_map.encode_data(data_bl, byte_offset, byte_length);
r = cls_cxx_write(hctx, object_map.get_header_length() + byte_offset,
- update.length(), &update);
+ data_bl.length(), &data_bl);
+
+ footer_bl.clear();
+ object_map.encode_footer(footer_bl);
+ r = cls_cxx_write(hctx, object_map.get_footer_offset(), footer_bl.length(),
+ &footer_bl);
}
return r;
}
#include <stdint.h>
#include <cmath>
#include <list>
+#include <vector>
#include <boost/static_assert.hpp>
namespace ceph {
BitVector();
+ void set_crc_enabled(bool enabled) {
+ m_crc_enabled = enabled;
+ }
void clear();
void resize(uint64_t elements);
void get_data_extents(uint64_t offset, uint64_t length,
uint64_t *byte_offset, uint64_t *byte_length) const;
+ void encode_footer(bufferlist& bl) const;
+ void decode_footer(bufferlist::iterator& it);
+ uint64_t get_footer_offset() const;
+
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& it);
void dump(Formatter *f) const;
bufferlist m_data;
uint64_t m_size;
+ bool m_crc_enabled;
+
+ mutable __u32 m_header_crc;
+ mutable std::vector<__u32> m_data_crcs;
static void compute_index(uint64_t offset, uint64_t *index, uint64_t *shift);
};
template <uint8_t _b>
-BitVector<_b>::BitVector() : m_size(0)
+BitVector<_b>::BitVector() : m_size(0), m_crc_enabled(true)
{
}
template <uint8_t _b>
void BitVector<_b>::clear() {
m_data.clear();
+ m_data_crcs.clear();
m_size = 0;
+ m_header_crc = 0;
}
template <uint8_t _b>
void BitVector<_b>::resize(uint64_t size) {
- uint64_t buffer_size = static_cast<uint64_t>(std::ceil(static_cast<double>(size) /
- ELEMENTS_PER_BLOCK));
+ uint64_t buffer_size = (size + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK;
if (buffer_size > m_data.length()) {
m_data.append_zero(buffer_size - m_data.length());
} else if (buffer_size < m_data.length()) {
bl.swap(m_data);
}
m_size = size;
+
+ uint64_t block_count = (buffer_size + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE;
+ m_data_crcs.resize(block_count);
}
template <uint8_t _b>
template <uint8_t _b>
void BitVector<_b>::encode_header(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(m_size, bl);
- ENCODE_FINISH(bl);
+ bufferlist header_bl;
+ ENCODE_START(1, 1, header_bl);
+ ::encode(m_size, header_bl);
+ ENCODE_FINISH(header_bl);
+ m_header_crc = header_bl.crc32c(0);
+
+ ::encode(header_bl, bl);
}
template <uint8_t _b>
void BitVector<_b>::decode_header(bufferlist::iterator& it) {
+ bufferlist header_bl;
+ ::decode(header_bl, it);
+
+ bufferlist::iterator header_it = header_bl.begin();
uint64_t size;
- DECODE_START(1, it);
- ::decode(size, it);
- DECODE_FINISH(it);
+ DECODE_START(1, header_it);
+ ::decode(size, header_it);
+ DECODE_FINISH(header_it);
resize(size);
+ m_header_crc = header_bl.crc32c(0);
}
template <uint8_t _b>
uint64_t BitVector<_b>::get_header_length() const {
- // 6 byte encoding header, 8 byte size
- return 14;
+ // 4 byte bl length, 6 byte encoding header, 8 byte size
+ return 18;
}
template <uint8_t _b>
void BitVector<_b>::encode_data(bufferlist& bl, uint64_t byte_offset,
uint64_t byte_length) const {
- bufferlist bit;
- bit.substr_of(m_data, byte_offset, byte_length);
- bl.append(bit);
+ assert(byte_offset % CEPH_PAGE_SIZE == 0);
+ assert(byte_offset + byte_length == m_data.length() ||
+ byte_length % CEPH_PAGE_SIZE == 0);
+
+ uint64_t end_offset = byte_offset + byte_length;
+ while (byte_offset < end_offset) {
+ uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset);
+
+ bufferlist bit;
+ bit.substr_of(m_data, byte_offset, len);
+ m_data_crcs[byte_offset / CEPH_PAGE_SIZE] =
+ ceph_crc32c(0, reinterpret_cast<unsigned char*>(bit.c_str()),
+ bit.length());
+
+ bl.claim_append(bit);
+ byte_offset += CEPH_PAGE_SIZE;
+ }
}
template <uint8_t _b>
void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset) {
- if (byte_offset + it.get_remaining() > m_data.length()) {
- throw buffer::malformed_input("attempting to decode past end of buffer");
+ assert(byte_offset % CEPH_PAGE_SIZE == 0);
+ if (it.end()) {
+ return;
}
- char* packed_data = m_data.c_str();
- for (; !it.end(); ++it) {
- packed_data[byte_offset++] = *it;
+ uint64_t end_offset = byte_offset + it.get_remaining();
+ if (end_offset > m_data.length()) {
+ throw buffer::end_of_buffer();
}
+
+ bufferlist data;
+ if (byte_offset > 0) {
+ data.substr_of(m_data, 0, byte_offset);
+ }
+
+ while (byte_offset < end_offset) {
+ uint64_t len = MIN(CEPH_PAGE_SIZE, end_offset - byte_offset);
+
+ bufferlist bit;
+ it.copy(len, bit);
+ if (m_crc_enabled &&
+ m_data_crcs[byte_offset / CEPH_PAGE_SIZE] != bit.crc32c(0)) {
+ throw buffer::malformed_input("invalid data block CRC");
+ }
+ data.append(bit);
+ byte_offset += bit.length();
+ }
+
+ if (m_data.length() > end_offset) {
+ bufferlist tail;
+ tail.substr_of(m_data, end_offset, m_data.length() - end_offset);
+ data.append(tail);
+ }
+ assert(data.length() == m_data.length());
+ data.swap(m_data);
}
template <uint8_t _b>
void BitVector<_b>::get_data_extents(uint64_t offset, uint64_t length,
uint64_t *byte_offset,
uint64_t *byte_length) const {
- assert(length > 0);
+ // read CEPH_PAGE_SIZE-aligned chunks
+ assert(length > 0 && offset + length <= m_size);
uint64_t shift;
compute_index(offset, byte_offset, &shift);
+ *byte_offset -= (*byte_offset % CEPH_PAGE_SIZE);
uint64_t end_offset;
compute_index(offset + length - 1, &end_offset, &shift);
+ end_offset += (CEPH_PAGE_SIZE - (end_offset % CEPH_PAGE_SIZE));
assert(*byte_offset <= end_offset);
- *byte_length = end_offset - *byte_offset + 1;
+ *byte_length = MIN(end_offset - *byte_offset, m_data.length());
+}
+
+template <uint8_t _b>
+void BitVector<_b>::encode_footer(bufferlist& bl) const {
+ bufferlist footer_bl;
+ if (m_crc_enabled) {
+ ::encode(m_header_crc, footer_bl);
+ ::encode(m_data_crcs, footer_bl);
+ }
+ ::encode(footer_bl, bl);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::decode_footer(bufferlist::iterator& it) {
+ bufferlist footer_bl;
+ ::decode(footer_bl, it);
+
+ m_crc_enabled = (footer_bl.length() > 0);
+ if (m_crc_enabled) {
+ bufferlist::iterator footer_it = footer_bl.begin();
+
+ __u32 header_crc;
+ ::decode(header_crc, footer_it);
+ if (m_header_crc != header_crc) {
+ throw buffer::malformed_input("incorrect header CRC");
+ }
+
+ uint64_t block_count = (m_data.length() + CEPH_PAGE_SIZE - 1) / CEPH_PAGE_SIZE;
+ ::decode(m_data_crcs, footer_it);
+ if (m_data_crcs.size() != block_count) {
+ throw buffer::malformed_input("invalid data block CRCs");
+ }
+ }
+}
+
+template <uint8_t _b>
+uint64_t BitVector<_b>::get_footer_offset() const {
+ return get_header_length() + m_data.length();
}
template <uint8_t _b>
void BitVector<_b>::encode(bufferlist& bl) const {
encode_header(bl);
- if (size() > 0) {
- encode_data(bl, 0, m_data.length());
- }
+ encode_data(bl, 0, m_data.length());
+ encode_footer(bl);
}
template <uint8_t _b>
void BitVector<_b>::decode(bufferlist::iterator& it) {
decode_header(it);
- decode_data(it, 0);
+
+ bufferlist data_bl;
+ if (m_data.length() > 0) {
+ it.copy(m_data.length(), data_bl);
+ }
+
+ decode_footer(it);
+
+ bufferlist::iterator data_it = data_bl.begin();
+ decode_data(data_it, 0);
}
template <uint8_t _b>
TYPED_TEST(BitVectorTest, get_buffer_extents) {
typename TestFixture::bit_vector_t bit_vector;
- uint64_t offset = 5381;
- uint64_t length = 4111;
+ uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+ bit_vector.resize((2 * CEPH_PAGE_SIZE + 51) * elements_per_byte);
+
+ uint64_t offset = (CEPH_PAGE_SIZE + 11) * elements_per_byte;
+ uint64_t length = (CEPH_PAGE_SIZE + 31) * elements_per_byte;
uint64_t byte_offset;
uint64_t byte_length;
bit_vector.get_data_extents(offset, length, &byte_offset, &byte_length);
+ ASSERT_EQ(CEPH_PAGE_SIZE, byte_offset);
+ ASSERT_EQ(2 * CEPH_PAGE_SIZE, byte_length);
- uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
- uint64_t start_byte = offset / elements_per_byte;
- ASSERT_EQ(start_byte, byte_offset);
-
- uint64_t end_byte = (offset + length - 1) / elements_per_byte;
- ASSERT_EQ(end_byte - start_byte + 1, byte_length);
+ bit_vector.get_data_extents(1, 1, &byte_offset, &byte_length);
+ ASSERT_EQ(0U, byte_offset);
+ ASSERT_EQ(CEPH_PAGE_SIZE, byte_length);
}
TYPED_TEST(BitVectorTest, get_header_length) {
bit_vector.encode_header(bl);
ASSERT_EQ(bl.length(), bit_vector.get_header_length());
}
+
+TYPED_TEST(BitVectorTest, get_footer_offset) {
+ typename TestFixture::bit_vector_t bit_vector;
+
+ bit_vector.resize(5111);
+
+ uint64_t byte_offset;
+ uint64_t byte_length;
+ bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+
+ ASSERT_EQ(bit_vector.get_header_length() + byte_length,
+ bit_vector.get_footer_offset());
+}
+
+TYPED_TEST(BitVectorTest, partial_decode_encode) {
+ typename TestFixture::bit_vector_t bit_vector;
+
+ uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+ bit_vector.resize(5111 * elements_per_byte);
+ for (uint64_t i = 0; i < bit_vector.size(); ++i) {
+ bit_vector[i] = i % 4;
+ }
+
+ bufferlist bl;
+ ::encode(bit_vector, bl);
+ bit_vector.clear();
+
+ bufferlist header_bl;
+ header_bl.substr_of(bl, 0, bit_vector.get_header_length());
+ bufferlist::iterator header_it = header_bl.begin();
+ bit_vector.decode_header(header_it);
+
+ bufferlist footer_bl;
+ footer_bl.substr_of(bl, bit_vector.get_footer_offset(),
+ bl.length() - bit_vector.get_footer_offset());
+ bufferlist::iterator footer_it = footer_bl.begin();
+ bit_vector.decode_footer(footer_it);
+
+ uint64_t byte_offset;
+ uint64_t byte_length;
+ bit_vector.get_data_extents(0, 1, &byte_offset, &byte_length);
+
+ bufferlist data_bl;
+ data_bl.substr_of(bl, bit_vector.get_header_length() + byte_offset,
+ byte_length);
+ bufferlist::iterator data_it = data_bl.begin();
+ bit_vector.decode_data(data_it, byte_offset);
+
+ bit_vector[0] = 3;
+
+ data_bl.clear();
+ bit_vector.encode_data(data_bl, byte_offset, byte_length);
+
+ footer_bl.clear();
+ bit_vector.encode_footer(footer_bl);
+
+ bufferlist updated_bl;
+ updated_bl.substr_of(bl, 0, bit_vector.get_header_length() + byte_offset);
+ updated_bl.append(data_bl);
+
+ uint64_t tail_data_offset = bit_vector.get_header_length() + byte_offset +
+ byte_length;
+ data_bl.substr_of(bl, tail_data_offset,
+ bit_vector.get_footer_offset() - tail_data_offset);
+ updated_bl.append(data_bl);
+ updated_bl.append(footer_bl);
+ ASSERT_EQ(bl.length(), updated_bl.length());
+
+ bufferlist::iterator updated_it = updated_bl.begin();
+ ::decode(bit_vector, updated_it);
+}
+
+TYPED_TEST(BitVectorTest, header_crc) {
+ typename TestFixture::bit_vector_t bit_vector;
+
+ bufferlist header;
+ bit_vector.encode_header(header);
+
+ bufferlist footer;
+ bit_vector.encode_footer(footer);
+
+ bufferlist::iterator it = footer.begin();
+ bit_vector.decode_footer(it);
+
+ bit_vector.resize(1);
+ bit_vector.encode_header(header);
+
+ it = footer.begin();
+ ASSERT_THROW(bit_vector.decode_footer(it), buffer::malformed_input);
+}
+
+TYPED_TEST(BitVectorTest, data_crc) {
+ typename TestFixture::bit_vector_t bit_vector;
+
+ uint64_t elements_per_byte = 8 / bit_vector.BIT_COUNT;
+ bit_vector.resize((CEPH_PAGE_SIZE + 1) * elements_per_byte);
+
+ uint64_t byte_offset;
+ uint64_t byte_length;
+ bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+
+ bufferlist data;
+ bit_vector.encode_data(data, byte_offset, byte_length);
+
+ bufferlist::iterator data_it = data.begin();
+ bit_vector.decode_data(data_it, byte_offset);
+
+ bit_vector[bit_vector.size() - 1] = 1;
+
+ bufferlist dummy_data;
+ bit_vector.encode_data(dummy_data, byte_offset, byte_length);
+
+ data_it = data.begin();
+ ASSERT_THROW(bit_vector.decode_data(data_it, byte_offset),
+ buffer::malformed_input);
+}