return oid;
}
+ceph::BitVector<2u>::Reference ObjectMap::operator[](uint64_t object_no)
+{
+ assert(m_image_ctx.object_map_lock.is_wlocked());
+ assert(object_no < m_object_map.size());
+ return m_object_map[object_no];
+}
+
uint8_t ObjectMap::operator[](uint64_t object_no) const
{
assert(m_image_ctx.object_map_lock.is_locked());
}
}
+void ObjectMap::aio_save(Context *on_finish)
+{
+ assert(m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP));
+ assert(m_image_ctx.owner_lock.is_locked());
+ RWLock::RLocker object_map_locker(m_image_ctx.object_map_lock);
+
+ librados::ObjectWriteOperation op;
+ if (m_snap_id == CEPH_NOSNAP) {
+ rados::cls::lock::assert_locked(&op, RBD_LOCK_NAME, LOCK_EXCLUSIVE, "", "");
+ }
+ cls_client::object_map_save(&op, m_object_map);
+
+ std::string oid(object_map_name(m_image_ctx.id, m_snap_id));
+ librados::AioCompletion *comp = librados::Rados::aio_create_completion(
+ on_finish, NULL, rados_ctx_cb);
+
+ int r = m_image_ctx.md_ctx.aio_operate(oid, comp, &op);
+ assert(r == 0);
+}
+
void ObjectMap::aio_resize(uint64_t new_size, uint8_t default_object_state,
Context *on_finish) {
assert(m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP));
class C_VerifyObject : public C_AsyncObjectThrottle {
public:
C_VerifyObject(AsyncObjectThrottle &throttle, ImageCtx *image_ctx,
- uint64_t snap_id, uint64_t object_no)
+ uint64_t object_no)
: C_AsyncObjectThrottle(throttle), m_image_ctx(*image_ctx),
- m_snap_id(snap_id), m_object_no(object_no),
- m_oid(m_image_ctx.get_object_name(m_object_no))
+ m_object_no(object_no), m_oid(m_image_ctx.get_object_name(m_object_no))
{
}
virtual void complete(int r) {
if (should_complete(r)) {
- ldout(m_image_ctx.cct, 5) << " C_VerifyObject completed " << dendl;
+ ldout(m_image_ctx.cct, 20) << m_oid << " C_VerifyObject completed "
+ << dendl;
finish(r);
delete this;
}
}
private:
- /**
- * Verifying the object map for a single object follows the following state
- * machine:
- *
- * <start>
- * |
- * v
- * STATE_ASSERT_EXISTS --------> STATE_UPDATE_OBJECT_MAP
- * . |
- * . v
- * . . . . . . . . . . . . . . . . > <finish>
- *
- * The _UPDATE_OBJECT_MAP state is skipped if the object map does not
- * need to be updated.
- */
- enum State {
- STATE_ASSERT_EXISTS,
- STATE_UPDATE_OBJECT_MAP
- };
-
-
ImageCtx &m_image_ctx;
uint64_t m_snap_id;
uint64_t m_object_no;
std::string m_oid;
- State m_state;
-
bool should_complete(int r) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << m_oid << " C_VerifyObject::should_complete: " << " r=" << r
- << dendl;
-
- bool finished = false;
- switch (m_state) {
- case STATE_ASSERT_EXISTS:
- ldout(cct, 5) << "ASSERT_EXISTS" << dendl;
- if (r == 0 || r == -ENOENT) {
- return send_update_object_map(r == 0);
- }
- break;
-
- case STATE_UPDATE_OBJECT_MAP:
- ldout(cct, 5) << "UPDATE_OBJECT_MAP" << dendl;
- finished = true;
- break;
-
- default:
- assert(false);
- break;
- }
-
- if (r < 0) {
- lderr(cct) << "encountered an error: " << cpp_strerror(r) << dendl;
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << m_oid << " C_VerifyObject::should_complete: "
+ << "encountered an error: " << cpp_strerror(r) << dendl;
return true;
}
- return finished;
+
+ ldout(cct, 20) << m_oid << " C_VerifyObject::should_complete: " << " r="
+ << r << dendl;
+ return update_object_map(r == 0);
}
void send_assert_exists() {
- ldout(m_image_ctx.cct, 5) << m_oid << " C_VerifyObject::assert_exists"
+ ldout(m_image_ctx.cct, 5) << m_oid << " C_VerifyObject::send_assert_exists"
<< dendl;
- m_state = STATE_ASSERT_EXISTS;
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
this, NULL, rados_ctx_cb);
assert(r == 0);
}
- bool send_update_object_map(bool exists) {
+ bool update_object_map(bool exists) {
CephContext *cct = m_image_ctx.cct;
bool lost_exclusive_lock = false;
}
if (new_state != state) {
- ldout(cct, 5) << m_oid << " C_VerifyObject::send_update_object_map"
- << dendl;
- bool updating = m_image_ctx.object_map.aio_update(m_object_no,
- new_state, state,
- this);
- assert(updating);
- return false;
+ ldout(cct, 15) << m_oid << " C_VerifyObject::update_object_map "
+ << static_cast<uint32_t>(state) << "->"
+ << static_cast<uint32_t>(new_state) << dendl;
+ m_image_ctx.object_map[m_object_no] = new_state;
}
}
}
case STATE_VERIFY_OBJECTS:
ldout(cct, 5) << "VERIFY_OBJECTS" << dendl;
if (r == 0) {
- return send_update_header();
+ return send_save_object_map();
}
break;
+ case STATE_SAVE_OBJECT_MAP:
+ ldout(cct, 5) << "SAVE_OBJECT_MAP" << dendl;
+ if (r == 0) {
+ return send_update_header();
+ }
+ break;
case STATE_UPDATE_HEADER:
ldout(cct, 5) << "UPDATE_HEADER" << dendl;
if (r == 0) {
m_state = STATE_VERIFY_OBJECTS;
ldout(cct, 5) << this << " send_verify_objects" << dendl;
- uint64_t snap_id;
uint64_t num_objects;
{
RWLock::RLocker l(m_image_ctx.snap_lock);
- snap_id = m_image_ctx.snap_id;
+ uint64_t snap_id = m_image_ctx.snap_id;
num_objects = Striper::get_num_objects(m_image_ctx.layout,
m_image_ctx.get_image_size(snap_id));
}
AsyncObjectThrottle::ContextFactory context_factory(
boost::lambda::bind(boost::lambda::new_ptr<C_VerifyObject>(),
- boost::lambda::_1, &m_image_ctx, snap_id, boost::lambda::_2));
+ boost::lambda::_1, &m_image_ctx, boost::lambda::_2));
AsyncObjectThrottle *throttle = new AsyncObjectThrottle(
*this, context_factory, create_callback_context(), m_prog_ctx, 0,
num_objects);
throttle->start_ops(cct->_conf->rbd_concurrent_management_ops);
}
+bool RebuildObjectMapRequest::send_save_object_map() {
+ CephContext *cct = m_image_ctx.cct;
+ bool lost_exclusive_lock = false;
+
+ m_state = STATE_SAVE_OBJECT_MAP;
+ {
+ RWLock::RLocker l(m_image_ctx.owner_lock);
+ if (m_image_ctx.image_watcher->is_lock_supported() &&
+ !m_image_ctx.image_watcher->is_lock_owner()) {
+ ldout(cct, 1) << "lost exclusive lock during object map save" << dendl;
+ lost_exclusive_lock = true;
+ } else {
+ ldout(cct, 5) << this << " send_save_object_map" << dendl;
+ m_image_ctx.object_map.aio_save(create_callback_context());
+ return false;
+ }
+ }
+
+ if (lost_exclusive_lock) {
+ complete(-ERESTART);
+ return false;
+ }
+ return true;
+}
+
bool RebuildObjectMapRequest::send_update_header() {
CephContext *cct = m_image_ctx.cct;
bool lost_exclusive_lock = false;