}
template <typename I>
-bool ObjectMap<I>::update_required(uint64_t object_no, uint8_t new_state) {
+bool ObjectMap<I>::update_required(const ceph::BitVector<2>::Iterator& it,
+ uint8_t new_state) {
assert(m_image_ctx.object_map_lock.is_wlocked());
- uint8_t state = (*this)[object_no];
-
+ uint8_t state = *it;
if ((state == new_state) ||
(new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
(new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING)) {
BlockGuardCell *cell;
int r = m_update_guard->detain({op.start_object_no, op.end_object_no},
- &op, &cell);
+ &op, &cell);
if (r < 0) {
lderr(cct) << "failed to detain object map update: " << cpp_strerror(r)
<< dendl;
return;
}
- uint64_t object_no;
- for (object_no = start_object_no; object_no < end_object_no; ++object_no) {
- if (update_required(object_no, new_state)) {
+ auto it = m_object_map.begin() + start_object_no;
+ auto end_it = m_object_map.begin() + end_object_no;
+ for (; it != end_it; ++it) {
+ if (update_required(it, new_state)) {
break;
}
}
- if (object_no == end_object_no) {
+ if (it == end_it) {
ldout(cct, 20) << "object map update not required" << dendl;
m_image_ctx.op_work_queue->queue(on_finish, 0);
return;
const ZTracer::Trace &parent_trace, T *callback_object) {
assert(start_object_no < end_object_no);
if (snap_id == CEPH_NOSNAP) {
- uint64_t object_no;
- for (object_no = start_object_no; object_no < end_object_no;
- ++object_no) {
- if (update_required(object_no, new_state)) {
+ auto it = m_object_map.begin() + start_object_no;
+ auto end_it = m_object_map.begin() + end_object_no;
+ for (; it != end_it; ++it) {
+ if (update_required(it, new_state)) {
break;
}
}
- if (object_no == end_object_no) {
+ if (it == end_it) {
return false;
}
uint64_t end_object_no, uint8_t new_state,
const boost::optional<uint8_t> ¤t_state,
const ZTracer::Trace &parent_trace, Context *on_finish);
- bool update_required(uint64_t object_no, uint8_t new_state);
+ bool update_required(const ceph::BitVector<2>::Iterator &it,
+ uint8_t new_state);
};
#include "common/dout.h"
#include "librbd/ImageCtx.h"
#include "librbd/ObjectMap.h"
+#include "librbd/Utils.h"
#include "cls/lock/cls_lock_client.h"
#include <string>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
-#define dout_prefix *_dout << "librbd::object_map::UpdateRequest: "
+#define dout_prefix *_dout << "librbd::object_map::UpdateRequest: " << this \
+ << " " << __func__ << ": "
namespace librbd {
namespace object_map {
+namespace {
+
+// keep aligned to bit_vector 4K block sizes
+const uint64_t MAX_OBJECTS_PER_UPDATE = 256 * (1 << 10);
+
+}
+
template <typename I>
void UpdateRequest<I>::send() {
+ update_object_map();
+}
+
+template <typename I>
+void UpdateRequest<I>::update_object_map() {
assert(m_image_ctx.snap_lock.is_locked());
assert(m_image_ctx.object_map_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
- // safe to update in-memory state first without handling rollback since any
- // failures will invalidate the object map
+ // break very large requests into manageable batches
+ m_update_end_object_no = MIN(
+ m_end_object_no, m_update_start_object_no + MAX_OBJECTS_PER_UPDATE);
+
std::string oid(ObjectMap<>::object_map_name(m_image_ctx.id, m_snap_id));
- ldout(cct, 20) << this << " updating object map"
- << ": ictx=" << &m_image_ctx << ", oid=" << oid << ", ["
- << m_start_object_no << "," << m_end_object_no << ") = "
+ ldout(cct, 20) << "ictx=" << &m_image_ctx << ", oid=" << oid << ", "
+ << "[" << m_update_start_object_no << ","
+ << m_update_end_object_no << ") = "
<< (m_current_state ?
stringify(static_cast<uint32_t>(*m_current_state)) : "")
<< "->" << static_cast<uint32_t>(m_new_state)
if (m_snap_id == CEPH_NOSNAP) {
rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
}
- cls_client::object_map_update(&op, m_start_object_no, m_end_object_no,
- m_new_state, m_current_state);
+ cls_client::object_map_update(&op, m_update_start_object_no,
+ m_update_end_object_no, m_new_state,
+ m_current_state);
- librados::AioCompletion *rados_completion = create_callback_completion();
+ auto rados_completion = librbd::util::create_rados_callback<
+ UpdateRequest<I>, &UpdateRequest<I>::handle_update_object_map>(this);
std::vector<librados::snap_t> snaps;
int r = m_image_ctx.md_ctx.aio_operate(
oid, rados_completion, &op, 0, snaps,
}
template <typename I>
-void UpdateRequest<I>::finish_request() {
- RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
- RWLock::WLocker object_map_locker(m_image_ctx.object_map_lock);
- ldout(m_image_ctx.cct, 20) << this << " on-disk object map updated"
- << dendl;
+void UpdateRequest<I>::handle_update_object_map(int r) {
+ ldout(m_image_ctx.cct, 20) << "r=" << r << dendl;
+
+ {
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ RWLock::WLocker object_map_locker(m_image_ctx.object_map_lock);
+ update_in_memory_object_map();
+
+ if (m_update_end_object_no < m_end_object_no) {
+ m_update_start_object_no = m_update_end_object_no;
+ update_object_map();
+ return;
+ }
+ }
+
+ // no more batch updates to send
+ complete(r);
+}
+
+template <typename I>
+void UpdateRequest<I>::update_in_memory_object_map() {
+ assert(m_image_ctx.snap_lock.is_locked());
+ assert(m_image_ctx.object_map_lock.is_locked());
// rebuilding the object map might update on-disk only
if (m_snap_id == m_image_ctx.snap_id) {
- for (uint64_t object_no = m_start_object_no;
- object_no < MIN(m_end_object_no, m_object_map.size());
- ++object_no) {
- uint8_t state = m_object_map[object_no];
+ ldout(m_image_ctx.cct, 20) << dendl;
+
+ auto it = m_object_map.begin() +
+ MIN(m_update_start_object_no, m_object_map.size());
+ auto end_it = m_object_map.begin() +
+ MIN(m_update_end_object_no, m_object_map.size());
+ for (; it != end_it; ++it) {
+ auto state_ref = *it;
+ uint8_t state = state_ref;
if (!m_current_state || state == *m_current_state ||
(*m_current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN)) {
- m_object_map[object_no] = m_new_state;
+ state_ref = m_new_state;
}
}
}
}
+template <typename I>
+void UpdateRequest<I>::finish_request() {
+}
+
} // namespace object_map
} // namespace librbd
const ZTracer::Trace &parent_trace, Context *on_finish)
: Request(image_ctx, snap_id, on_finish), m_object_map(*object_map),
m_start_object_no(start_object_no), m_end_object_no(end_object_no),
- m_new_state(new_state), m_current_state(current_state),
+ m_update_start_object_no(start_object_no), m_new_state(new_state),
+ m_current_state(current_state),
m_trace(util::create_trace(image_ctx, "update object map", parent_trace))
{
m_trace.event("start");
void finish_request() override;
private:
+ /**
+ * @verbatim
+ *
+ * <start>
+ * |
+ * |/------------------\
+ * v | (repeat in batches)
+ * UPDATE_OBJECT_MAP -----/
+ * |
+ * v
+ * <finish>
+ *
+ * @endverbatim
+ */
+
ceph::BitVector<2> &m_object_map;
uint64_t m_start_object_no;
uint64_t m_end_object_no;
+ uint64_t m_update_start_object_no;
+ uint64_t m_update_end_object_no = 0;
uint8_t m_new_state;
boost::optional<uint8_t> m_current_state;
ZTracer::Trace m_trace;
+
+ void update_object_map();
+ void handle_update_object_map(int r);
+
+ void update_in_memory_object_map();
+
};
} // namespace object_map
using ::testing::_;
using ::testing::DoDefault;
+using ::testing::InSequence;
using ::testing::Return;
using ::testing::StrEq;
class TestMockObjectMapUpdateRequest : public TestMockFixture {
public:
- void expect_update(librbd::ImageCtx *ictx, uint64_t snap_id, int r) {
+ void expect_update(librbd::ImageCtx *ictx, uint64_t snap_id,
+ uint64_t start_object_no, uint64_t end_object_no,
+ uint8_t new_state,
+ const boost::optional<uint8_t>& current_state, int r) {
+ bufferlist bl;
+ ::encode(start_object_no, bl);
+ ::encode(end_object_no, bl);
+ ::encode(new_state, bl);
+ ::encode(current_state, bl);
+
std::string oid(ObjectMap<>::object_map_name(ictx->id, snap_id));
if (snap_id == CEPH_NOSNAP) {
EXPECT_CALL(get_mock_io_ctx(ictx->md_ctx),
if (r < 0) {
EXPECT_CALL(get_mock_io_ctx(ictx->md_ctx),
- exec(oid, _, StrEq("rbd"), StrEq("object_map_update"), _, _, _))
+ exec(oid, _, StrEq("rbd"), StrEq("object_map_update"),
+ ContentsEqual(bl), _, _))
.WillOnce(Return(r));
} else {
EXPECT_CALL(get_mock_io_ctx(ictx->md_ctx),
- exec(oid, _, StrEq("rbd"), StrEq("object_map_update"), _, _, _))
+ exec(oid, _, StrEq("rbd"), StrEq("object_map_update"),
+ ContentsEqual(bl), _, _))
.WillOnce(DoDefault());
}
}
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, acquire_exclusive_lock(*ictx));
- expect_update(ictx, CEPH_NOSNAP, 0);
+ expect_update(ictx, CEPH_NOSNAP, 0, 1, OBJECT_NONEXISTENT, OBJECT_EXISTS, 0);
ceph::BitVector<2> object_map;
object_map.resize(1);
"snap1"));
uint64_t snap_id = ictx->snap_id;
- expect_update(ictx, snap_id, 0);
+ expect_update(ictx, snap_id, 0, 1, OBJECT_NONEXISTENT, OBJECT_EXISTS, 0);
ceph::BitVector<2> object_map;
object_map.resize(1);
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, acquire_exclusive_lock(*ictx));
- expect_update(ictx, CEPH_NOSNAP, -EINVAL);
+ expect_update(ictx, CEPH_NOSNAP, 0, 1, OBJECT_NONEXISTENT, OBJECT_EXISTS,
+ -EINVAL);
expect_invalidate(ictx);
ceph::BitVector<2> object_map;
ASSERT_EQ(CEPH_NOSNAP, ictx->snap_id);
uint64_t snap_id = ictx->snap_info.rbegin()->first;
- expect_update(ictx, snap_id, 0);
+ expect_update(ictx, snap_id, 0, 1, OBJECT_EXISTS_CLEAN,
+ boost::optional<uint8_t>(), 0);
expect_unlock_exclusive_lock(*ictx);
ceph::BitVector<2> object_map;
ASSERT_NE(OBJECT_EXISTS_CLEAN, object_map[0]);
}
+TEST_F(TestMockObjectMapUpdateRequest, BatchUpdate) {
+ REQUIRE_FEATURE(RBD_FEATURE_OBJECT_MAP);
+
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ librbd::NoOpProgressContext no_progress;
+ ASSERT_EQ(0, ictx->operations->resize(712312 * ictx->get_object_size(), false,
+ no_progress));
+ ASSERT_EQ(0, acquire_exclusive_lock(*ictx));
+
+ InSequence seq;
+ expect_update(ictx, CEPH_NOSNAP, 0, 262144, OBJECT_NONEXISTENT, OBJECT_EXISTS,
+ 0);
+ expect_update(ictx, CEPH_NOSNAP, 262144, 524288, OBJECT_NONEXISTENT,
+ OBJECT_EXISTS, 0);
+ expect_update(ictx, CEPH_NOSNAP, 524288, 712312, OBJECT_NONEXISTENT,
+ OBJECT_EXISTS, 0);
+ expect_unlock_exclusive_lock(*ictx);
+
+ ceph::BitVector<2> object_map;
+ object_map.resize(712312);
+
+ C_SaferCond cond_ctx;
+ AsyncRequest<> *req = new UpdateRequest<>(
+ *ictx, &object_map, CEPH_NOSNAP, 0, object_map.size(), OBJECT_NONEXISTENT,
+ OBJECT_EXISTS, {}, &cond_ctx);
+ {
+ RWLock::RLocker snap_locker(ictx->snap_lock);
+ RWLock::WLocker object_map_locker(ictx->object_map_lock);
+ req->send();
+ }
+ ASSERT_EQ(0, cond_ctx.wait());
+}
+
} // namespace object_map
} // namespace librbd