return -EINVAL;
}
+ uint64_t object_byte_offset;
+ uint64_t byte_length;
+ object_map.get_header_crc_extents(&object_byte_offset, &byte_length);
+
bufferlist footer_bl;
- r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
- size - object_map.get_footer_offset(), &footer_bl,
+ r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (r < 0) {
- CLS_ERR("object map footer read failed");
+ CLS_ERR("object map footer read header CRC failed");
return r;
}
try {
bufferlist::iterator it = footer_bl.begin();
- object_map.decode_footer(it);
+ object_map.decode_header_crc(it);
} catch (const buffer::error &err) {
- CLS_ERR("failed to decode object map footer: %s", err.what());
+ CLS_ERR("failed to decode object map header CRC: %s", err.what());
}
if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
return -ERANGE;
}
- uint64_t byte_offset;
- uint64_t byte_length;
- object_map.get_data_extents(start_object_no,
- end_object_no - start_object_no,
- &byte_offset, &byte_length);
+ uint64_t object_count = end_object_no - start_object_no;
+ object_map.get_data_crcs_extents(start_object_no, object_count,
+ &object_byte_offset, &byte_length);
+ const auto footer_object_offset = object_byte_offset;
+
+ footer_bl.clear();
+ r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ if (r < 0) {
+ CLS_ERR("object map footer read data CRCs failed");
+ return r;
+ }
+
+ try {
+ bufferlist::iterator it = footer_bl.begin();
+ object_map.decode_data_crcs(it, start_object_no);
+ } catch (const buffer::error &err) {
+ CLS_ERR("failed to decode object map data CRCs: %s", err.what());
+ }
+
+ uint64_t data_byte_offset;
+ object_map.get_data_extents(start_object_no, object_count,
+ &data_byte_offset, &object_byte_offset,
+ &byte_length);
bufferlist data_bl;
- r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
- byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+ r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &data_bl,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (r < 0) {
CLS_ERR("object map data read failed");
return r;
try {
bufferlist::iterator it = data_bl.begin();
- object_map.decode_data(it, byte_offset);
+ object_map.decode_data(it, data_byte_offset);
} catch (const buffer::error &err) {
CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
- byte_offset, err.what());
+ data_byte_offset, err.what());
return -EINVAL;
}
if (updated) {
CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
- byte_offset, byte_length,
- object_map.get_header_length() + byte_offset);
+ data_byte_offset, byte_length, object_byte_offset);
bufferlist data_bl;
- object_map.encode_data(data_bl, byte_offset, byte_length);
- r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
- data_bl.length(), &data_bl,
+ object_map.encode_data(data_bl, data_byte_offset, byte_length);
+ r = cls_cxx_write2(hctx, object_byte_offset, data_bl.length(), &data_bl,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (r < 0) {
CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
}
footer_bl.clear();
- object_map.encode_footer(footer_bl);
- r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
+ object_map.encode_data_crcs(footer_bl, start_object_no, object_count);
+ r = cls_cxx_write2(hctx, footer_object_offset, footer_bl.length(),
&footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
if (r < 0) {
CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
#include "include/encoding.h"
#include "include/rbd_types.h"
#include "include/rados/librados.hpp"
+#include "common/bit_vector.hpp"
#include <errno.h>
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_LIBRBD_CLS_RBD_CLIENT_H
#include "cls/lock/cls_lock_types.h"
#include "cls/rbd/cls_rbd_types.h"
-#include "common/bit_vector.hpp"
#include "common/snap_types.h"
#include "include/types.h"
#include "librbd/Types.h"
class Context;
+namespace ceph { template <uint8_t> class BitVector; }
namespace librados {
class ObjectReadOperation;
class IoCtx;
#include "common/Formatter.h"
#include "include/assert.h"
#include "include/encoding.h"
+#include <memory>
#include <utility>
+#include <vector>
namespace ceph {
void decode_header(bufferlist::iterator& it);
uint64_t get_header_length() const;
- void encode_data(bufferlist& bl, uint64_t byte_offset,
+ void encode_data(bufferlist& bl, uint64_t data_byte_offset,
uint64_t byte_length) const;
void decode_data(bufferlist::iterator& it, uint64_t byte_offset);
void get_data_extents(uint64_t offset, uint64_t length,
- uint64_t *byte_offset, uint64_t *byte_length) const;
+ uint64_t *data_byte_offset,
+ uint64_t *object_byte_offset,
+ uint64_t *byte_length) const;
void encode_footer(bufferlist& bl) const;
void decode_footer(bufferlist::iterator& it);
uint64_t get_footer_offset() const;
+ void decode_header_crc(bufferlist::iterator& it);
+ void get_header_crc_extents(uint64_t *byte_offset,
+ uint64_t *byte_length) const;
+
+ void encode_data_crcs(bufferlist& bl, uint64_t offset,
+ uint64_t length) const;
+ void decode_data_crcs(bufferlist::iterator& it, uint64_t offset);
+ void get_data_crcs_extents(uint64_t offset, uint64_t length,
+ uint64_t *byte_offset,
+ uint64_t *byte_length) const;
+
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& it);
void dump(Formatter *f) const;
static void generate_test_instances(std::list<BitVector *> &o);
private:
+ struct NoInitAllocator : public std::allocator<__u32> {
+ NoInitAllocator(const std::allocator<__u32>& alloc)
+ : std::allocator<__u32>(alloc) {
+ }
+
+ template <class U, class... Args>
+ void construct(U* p, Args&&... args) const {
+ }
+ };
bufferlist m_data;
uint64_t m_size;
bool m_crc_enabled;
mutable __u32 m_header_crc;
- mutable std::vector<__u32> m_data_crcs;
+ mutable std::vector<__u32, NoInitAllocator> m_data_crcs;
+
+ void resize(uint64_t elements, bool zero);
static void compute_index(uint64_t offset, uint64_t *index, uint64_t *shift);
template <uint8_t _b>
void BitVector<_b>::resize(uint64_t size) {
+ resize(size, true);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::resize(uint64_t size, bool zero) {
uint64_t buffer_size = (size + ELEMENTS_PER_BLOCK - 1) / ELEMENTS_PER_BLOCK;
if (buffer_size > m_data.length()) {
- m_data.append_zero(buffer_size - m_data.length());
+ if (zero) {
+ m_data.append_zero(buffer_size - m_data.length());
+ } else {
+ m_data.append(std::move(buffer::ptr(buffer_size - m_data.length())));
+ }
} else if (buffer_size < m_data.length()) {
bufferlist bl;
bl.substr_of(m_data, 0, buffer_size);
decode(size, header_it);
DECODE_FINISH(header_it);
- resize(size);
+ resize(size, false);
m_header_crc = header_bl.crc32c(0);
}
}
template <uint8_t _b>
-void BitVector<_b>::encode_data(bufferlist& bl, uint64_t byte_offset,
+void BitVector<_b>::encode_data(bufferlist& bl, uint64_t data_byte_offset,
uint64_t byte_length) const {
- assert(byte_offset % BLOCK_SIZE == 0);
- assert(byte_offset + byte_length == m_data.length() ||
- byte_length % BLOCK_SIZE == 0);
+ assert(data_byte_offset % BLOCK_SIZE == 0);
+ assert(data_byte_offset + byte_length == m_data.length() ||
+ byte_length % BLOCK_SIZE == 0);
- uint64_t end_offset = byte_offset + byte_length;
- while (byte_offset < end_offset) {
- uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - byte_offset);
+ uint64_t end_offset = data_byte_offset + byte_length;
+ while (data_byte_offset < end_offset) {
+ uint64_t len = std::min<uint64_t>(BLOCK_SIZE,
+ end_offset - data_byte_offset);
bufferlist bit;
- bit.substr_of(m_data, byte_offset, len);
- m_data_crcs[byte_offset / BLOCK_SIZE] = bit.crc32c(0);
+ bit.substr_of(m_data, data_byte_offset, len);
+ m_data_crcs[data_byte_offset / BLOCK_SIZE] = bit.crc32c(0);
bl.claim_append(bit);
- byte_offset += BLOCK_SIZE;
+ data_byte_offset += BLOCK_SIZE;
}
}
template <uint8_t _b>
-void BitVector<_b>::decode_data(bufferlist::iterator& it, uint64_t byte_offset) {
- assert(byte_offset % BLOCK_SIZE == 0);
+void BitVector<_b>::decode_data(bufferlist::iterator& it,
+ uint64_t data_byte_offset) {
+ assert(data_byte_offset % BLOCK_SIZE == 0);
if (it.end()) {
return;
}
- uint64_t end_offset = byte_offset + it.get_remaining();
+ uint64_t end_offset = data_byte_offset + it.get_remaining();
if (end_offset > m_data.length()) {
throw buffer::end_of_buffer();
}
bufferlist data;
- if (byte_offset > 0) {
- data.substr_of(m_data, 0, byte_offset);
+ if (data_byte_offset > 0) {
+ data.substr_of(m_data, 0, data_byte_offset);
}
- while (byte_offset < end_offset) {
- uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - byte_offset);
+ while (data_byte_offset < end_offset) {
+ uint64_t len = std::min<uint64_t>(BLOCK_SIZE, end_offset - data_byte_offset);
bufferptr ptr;
it.copy_deep(len, ptr);
bufferlist bit;
bit.append(ptr);
if (m_crc_enabled &&
- m_data_crcs[byte_offset / BLOCK_SIZE] != bit.crc32c(0)) {
+ m_data_crcs[data_byte_offset / BLOCK_SIZE] != bit.crc32c(0)) {
throw buffer::malformed_input("invalid data block CRC");
}
data.append(bit);
- byte_offset += bit.length();
+ data_byte_offset += bit.length();
}
if (m_data.length() > end_offset) {
template <uint8_t _b>
void BitVector<_b>::get_data_extents(uint64_t offset, uint64_t length,
- uint64_t *byte_offset,
- uint64_t *byte_length) const {
+ uint64_t *data_byte_offset,
+ uint64_t *object_byte_offset,
+ uint64_t *byte_length) const {
// read BLOCK_SIZE-aligned chunks
assert(length > 0 && offset + length <= m_size);
uint64_t shift;
- compute_index(offset, byte_offset, &shift);
- *byte_offset -= (*byte_offset % BLOCK_SIZE);
+ compute_index(offset, data_byte_offset, &shift);
+ *data_byte_offset -= (*data_byte_offset % BLOCK_SIZE);
uint64_t end_offset;
compute_index(offset + length - 1, &end_offset, &shift);
end_offset += (BLOCK_SIZE - (end_offset % BLOCK_SIZE));
- assert(*byte_offset <= end_offset);
+ assert(*data_byte_offset <= end_offset);
- *byte_length = end_offset - *byte_offset;
- if (*byte_offset + *byte_length > m_data.length()) {
- *byte_length = m_data.length() - *byte_offset;
+ *object_byte_offset = get_header_length() + *data_byte_offset;
+ *byte_length = end_offset - *data_byte_offset;
+ if (*data_byte_offset + *byte_length > m_data.length()) {
+ *byte_length = m_data.length() - *data_byte_offset;
}
}
bufferlist footer_bl;
if (m_crc_enabled) {
encode(m_header_crc, footer_bl);
- encode(m_data_crcs, footer_bl);
+
+ __u32 size = m_data_crcs.size();
+ encode(size, footer_bl);
+ encode_data_crcs(footer_bl, 0, m_size);
}
encode(footer_bl, bl);
}
m_crc_enabled = (footer_bl.length() > 0);
if (m_crc_enabled) {
bufferlist::iterator footer_it = footer_bl.begin();
+ decode_header_crc(footer_it);
- __u32 header_crc;
- decode(header_crc, footer_it);
- if (m_header_crc != header_crc) {
- throw buffer::malformed_input("incorrect header CRC");
- }
+ __u32 data_src_size;
+ decode(data_src_size, footer_it);
+ decode_data_crcs(footer_it, 0);
uint64_t block_count = (m_data.length() + BLOCK_SIZE - 1) / BLOCK_SIZE;
- decode(m_data_crcs, footer_it);
if (m_data_crcs.size() != block_count) {
throw buffer::malformed_input("invalid data block CRCs");
}
return get_header_length() + m_data.length();
}
+template <uint8_t _b>
+void BitVector<_b>::decode_header_crc(bufferlist::iterator& it) {
+ if (it.get_remaining() > 0) {
+ __u32 header_crc;
+ ceph::decode(header_crc, it);
+ if (m_header_crc != header_crc) {
+ throw buffer::malformed_input("incorrect header CRC");
+ }
+ }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::get_header_crc_extents(uint64_t *byte_offset,
+ uint64_t *byte_length) const {
+ // footer is prefixed with a bufferlist length
+ *byte_offset = get_footer_offset() + sizeof(__u32);
+ *byte_length = sizeof(__u32);
+}
+
+template <uint8_t _b>
+void BitVector<_b>::encode_data_crcs(bufferlist& bl, uint64_t offset,
+ uint64_t length) const {
+ if (length == 0) {
+ return;
+ }
+
+ uint64_t index;
+ uint64_t shift;
+ compute_index(offset, &index, &shift);
+ uint64_t crc_index = index / BLOCK_SIZE;
+
+ compute_index(offset + length - 1, &index, &shift);
+ uint64_t end_crc_index = index / BLOCK_SIZE;
+ while (crc_index <= end_crc_index) {
+ __u32 crc = m_data_crcs[crc_index++];
+ ceph::encode(crc, bl);
+ }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::decode_data_crcs(bufferlist::iterator& it,
+ uint64_t offset) {
+ if (it.end()) {
+ return;
+ }
+
+ uint64_t index;
+ uint64_t shift;
+ compute_index(offset, &index, &shift);
+
+ uint64_t crc_index = index / BLOCK_SIZE;
+ uint64_t remaining = it.get_remaining() / sizeof(__u32);
+ while (remaining > 0) {
+ __u32 crc;
+ ceph::decode(crc, it);
+ m_data_crcs[crc_index++] = crc;
+ --remaining;
+ }
+}
+
+template <uint8_t _b>
+void BitVector<_b>::get_data_crcs_extents(uint64_t offset, uint64_t length,
+ uint64_t *byte_offset,
+ uint64_t *byte_length) const {
+ // data CRCs immediately follow the header CRC
+ get_header_crc_extents(byte_offset, byte_length);
+ *byte_offset += *byte_length;
+
+ // skip past data CRC vector size
+ *byte_offset += sizeof(__u32);
+
+ // CRCs are computed over BLOCK_SIZE chunks
+ ceph_assert(length > 0 && offset + length <= m_size);
+ uint64_t index;
+ uint64_t shift;
+ compute_index(offset, &index, &shift);
+ uint64_t start_byte_offset =
+ *byte_offset + ((index / BLOCK_SIZE) * sizeof(__u32));
+
+ compute_index(offset + length, &index, &shift);
+ uint64_t end_byte_offset =
+ *byte_offset + (((index / BLOCK_SIZE) + 1) * sizeof(__u32));
+ ceph_assert(start_byte_offset < end_byte_offset);
+
+ *byte_offset = start_byte_offset;
+ *byte_length = end_byte_offset - start_byte_offset;
+}
+
template <uint8_t _b>
void BitVector<_b>::encode(bufferlist& bl) const {
encode_header(bl);
const uint64_t radix = 1 << b->BIT_COUNT;
const uint64_t size = 1024;
- b->resize(size);
+ b->resize(size, false);
for (uint64_t i = 0; i < size; ++i) {
(*b)[i] = rand() % radix;
}
#include "common/config.h"
#include "common/snap_types.h"
#include "common/Clock.h"
+#include "common/bit_vector.hpp"
#include "include/encoding.h"
#include "include/types.h"
#include "include/rados/librados.h"
uint64_t offset = (bit_vector.BLOCK_SIZE + 11) * elements_per_byte;
uint64_t length = (bit_vector.BLOCK_SIZE + 31) * elements_per_byte;
- uint64_t byte_offset;
+ uint64_t data_byte_offset;
+ uint64_t object_byte_offset;
uint64_t byte_length;
- bit_vector.get_data_extents(offset, length, &byte_offset, &byte_length);
- ASSERT_EQ(bit_vector.BLOCK_SIZE, byte_offset);
+ bit_vector.get_data_extents(offset, length, &data_byte_offset,
+ &object_byte_offset, &byte_length);
+ ASSERT_EQ(bit_vector.BLOCK_SIZE, data_byte_offset);
ASSERT_EQ(bit_vector.BLOCK_SIZE + (element_count % bit_vector.BLOCK_SIZE),
byte_length);
- bit_vector.get_data_extents(1, 1, &byte_offset, &byte_length);
- ASSERT_EQ(0U, byte_offset);
+ bit_vector.get_data_extents(1, 1, &data_byte_offset, &object_byte_offset,
+ &byte_length);
+ ASSERT_EQ(0U, data_byte_offset);
+ ASSERT_EQ(bit_vector.get_header_length(), object_byte_offset);
ASSERT_EQ(bit_vector.BLOCK_SIZE, byte_length);
}
bit_vector.resize(5111);
- uint64_t byte_offset;
+ uint64_t data_byte_offset;
+ uint64_t object_byte_offset;
uint64_t byte_length;
- bit_vector.get_data_extents(0, bit_vector.size(), &byte_offset, &byte_length);
+ bit_vector.get_data_extents(0, bit_vector.size(), &data_byte_offset,
+ &object_byte_offset, &byte_length);
ASSERT_EQ(bit_vector.get_header_length() + byte_length,
bit_vector.get_footer_offset());
bufferlist::iterator header_it = header_bl.begin();
bit_vector.decode_header(header_it);
- bufferlist footer_bl;
- footer_bl.substr_of(bl, bit_vector.get_footer_offset(),
- bl.length() - bit_vector.get_footer_offset());
- bufferlist::iterator footer_it = footer_bl.begin();
- bit_vector.decode_footer(footer_it);
+ uint64_t object_byte_offset;
+ uint64_t byte_length;
+ bit_vector.get_header_crc_extents(&object_byte_offset, &byte_length);
+ ASSERT_EQ(bit_vector.get_footer_offset() + 4, object_byte_offset);
+ ASSERT_EQ(4ULL, byte_length);
typedef std::pair<uint64_t, uint64_t> Extent;
typedef std::list<Extent> Extents;
std::make_pair((2 * bit_vector.BLOCK_SIZE * elements_per_byte) + 2, 2))(
std::make_pair(2, 2 * bit_vector.BLOCK_SIZE));
for (Extents::iterator it = extents.begin(); it != extents.end(); ++it) {
+ bufferlist footer_bl;
+ uint64_t footer_byte_offset;
+ uint64_t footer_byte_length;
+ bit_vector.get_data_crcs_extents(it->first, it->second, &footer_byte_offset,
+ &footer_byte_length);
+ ASSERT_TRUE(footer_byte_offset + footer_byte_length <= bl.length());
+ footer_bl.substr_of(bl, footer_byte_offset, footer_byte_length);
+ bufferlist::iterator footer_it = footer_bl.begin();
+ bit_vector.decode_data_crcs(footer_it, it->first);
+
uint64_t element_offset = it->first;
uint64_t element_length = it->second;
- uint64_t byte_offset;
- uint64_t byte_length;
- bit_vector.get_data_extents(element_offset, element_length, &byte_offset,
+ uint64_t data_byte_offset;
+ bit_vector.get_data_extents(element_offset, element_length,
+ &data_byte_offset, &object_byte_offset,
&byte_length);
bufferlist data_bl;
- data_bl.substr_of(bl, bit_vector.get_header_length() + byte_offset,
+ data_bl.substr_of(bl, bit_vector.get_header_length() + data_byte_offset,
byte_length);
bufferlist::iterator data_it = data_bl.begin();
- bit_vector.decode_data(data_it, byte_offset);
+ bit_vector.decode_data(data_it, data_byte_offset);
data_bl.clear();
- bit_vector.encode_data(data_bl, byte_offset, byte_length);
+ bit_vector.encode_data(data_bl, data_byte_offset, byte_length);
footer_bl.clear();
- bit_vector.encode_footer(footer_bl);
+ bit_vector.encode_data_crcs(footer_bl, it->first, it->second);
bufferlist updated_bl;
- updated_bl.substr_of(bl, 0, bit_vector.get_header_length() + byte_offset);
+ updated_bl.substr_of(bl, 0,
+ bit_vector.get_header_length() + data_byte_offset);
updated_bl.append(data_bl);
- if (byte_offset + byte_length < bit_vector.get_footer_offset()) {
- uint64_t tail_data_offset = bit_vector.get_header_length() + byte_offset +
- byte_length;
+ if (data_byte_offset + byte_length < bit_vector.get_footer_offset()) {
+ uint64_t tail_data_offset = bit_vector.get_header_length() +
+ data_byte_offset + byte_length;
data_bl.substr_of(bl, tail_data_offset,
bit_vector.get_footer_offset() - tail_data_offset);
updated_bl.append(data_bl);
}
- updated_bl.append(footer_bl);
+ bufferlist full_footer;
+ full_footer.substr_of(bl, bit_vector.get_footer_offset(),
+ footer_byte_offset - bit_vector.get_footer_offset());
+ full_footer.append(footer_bl);
+
+ if (footer_byte_offset + footer_byte_length < bl.length()) {
+ bufferlist footer_bit;
+ auto footer_offset = footer_byte_offset + footer_byte_length;
+ footer_bit.substr_of(bl, footer_offset, bl.length() - footer_offset);
+ full_footer.append(footer_bit);
+ }
+
+ updated_bl.append(full_footer);
ASSERT_EQ(bl, updated_bl);
bufferlist::iterator updated_it = updated_bl.begin();
bit_vector1.resize((bit_vector1.BLOCK_SIZE + 1) * elements_per_byte);
bit_vector2.resize((bit_vector2.BLOCK_SIZE + 1) * elements_per_byte);
- uint64_t byte_offset;
+ uint64_t data_byte_offset;
+ uint64_t object_byte_offset;
uint64_t byte_length;
- bit_vector1.get_data_extents(0, bit_vector1.size(), &byte_offset,
- &byte_length);
+ bit_vector1.get_data_extents(0, bit_vector1.size(), &data_byte_offset,
+ &object_byte_offset, &byte_length);
bufferlist data;
- bit_vector1.encode_data(data, byte_offset, byte_length);
+ bit_vector1.encode_data(data, data_byte_offset, byte_length);
bufferlist::iterator data_it = data.begin();
- bit_vector1.decode_data(data_it, byte_offset);
+ bit_vector1.decode_data(data_it, data_byte_offset);
bit_vector2[bit_vector2.size() - 1] = 1;
bufferlist dummy_data;
- bit_vector2.encode_data(dummy_data, byte_offset, byte_length);
+ bit_vector2.encode_data(dummy_data, data_byte_offset, byte_length);
data_it = data.begin();
- ASSERT_THROW(bit_vector2.decode_data(data_it, byte_offset),
+ ASSERT_THROW(bit_vector2.decode_data(data_it, data_byte_offset),
buffer::malformed_input);
}
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
+#include "common/Cond.h"
+#include "common/Throttle.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "cls/rbd/cls_rbd_types.h"
#include <list>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/rolling_sum.hpp>
void register_test_object_map() {
}
&flags_set));
ASSERT_TRUE(flags_set);
}
+
+TEST_F(TestObjectMap, DISABLED_StressTest) {
+ REQUIRE_FEATURE(RBD_FEATURE_OBJECT_MAP);
+
+ uint64_t object_count = cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT;
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+ ASSERT_EQ(0, resize(ictx, ictx->layout.object_size * object_count));
+
+ bool flags_set;
+ ASSERT_EQ(0, ictx->test_flags(CEPH_NOSNAP, RBD_FLAG_OBJECT_MAP_INVALID,
+ &flags_set));
+ ASSERT_FALSE(flags_set);
+
+ srand(time(NULL) % (unsigned long) -1);
+
+ coarse_mono_time start = coarse_mono_clock::now();
+ chrono::duration<double> last = chrono::duration<double>::zero();
+
+ const int WINDOW_SIZE = 5;
+ typedef boost::accumulators::accumulator_set<
+ double, boost::accumulators::stats<
+ boost::accumulators::tag::rolling_sum> > RollingSum;
+
+ RollingSum time_acc(
+ boost::accumulators::tag::rolling_window::window_size = WINDOW_SIZE);
+ RollingSum ios_acc(
+ boost::accumulators::tag::rolling_window::window_size = WINDOW_SIZE);
+
+ uint32_t io_threads = 16;
+ uint64_t cur_ios = 0;
+ SimpleThrottle throttle(io_threads, false);
+ for (uint64_t ios = 0; ios < 100000;) {
+ if (throttle.pending_error()) {
+ break;
+ }
+
+ throttle.start_op();
+ uint64_t object_no = (rand() % object_count);
+ auto ctx = new FunctionContext([&throttle, object_no](int r) {
+ ASSERT_EQ(0, r) << "object_no=" << object_no;
+ throttle.end_op(r);
+ });
+
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ RWLock::RLocker snap_locker(ictx->snap_lock);
+ RWLock::WLocker object_map_locker(ictx->object_map_lock);
+ ASSERT_TRUE(ictx->object_map != nullptr);
+
+ if (!ictx->object_map->aio_update<
+ Context, &Context::complete>(CEPH_NOSNAP, object_no,
+ OBJECT_EXISTS, {}, {}, true,
+ ctx)) {
+ ctx->complete(0);
+ } else {
+ ++cur_ios;
+ ++ios;
+ }
+
+ coarse_mono_time now = coarse_mono_clock::now();
+ chrono::duration<double> elapsed = now - start;
+ if (last == chrono::duration<double>::zero()) {
+ last = elapsed;
+ } else if ((int)elapsed.count() != (int)last.count()) {
+ time_acc((elapsed - last).count());
+ ios_acc(static_cast<double>(cur_ios));
+ cur_ios = 0;
+
+ double time_sum = boost::accumulators::rolling_sum(time_acc);
+ std::cerr << std::setw(5) << (int)elapsed.count() << "\t"
+ << std::setw(8) << (int)ios << "\t"
+ << std::fixed << std::setw(8) << std::setprecision(2)
+ << boost::accumulators::rolling_sum(ios_acc) / time_sum
+ << std::endl;
+ last = elapsed;
+ }
+ }
+
+ ASSERT_EQ(0, throttle.wait_for_ret());
+}