#include "librbd/AioCompletion.h"
#include "librbd/AioRequest.h"
+#include "librbd/AsyncObjectThrottle.h"
#include "librbd/CopyupRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
#include "librbd/ObjectMap.h"
#include <boost/bind.hpp>
+#include <boost/lambda/bind.hpp>
+#include <boost/lambda/construct.hpp>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
namespace librbd {
+namespace {
+
+class UpdateObjectMap : public C_AsyncObjectThrottle {
+public:
+ UpdateObjectMap(AsyncObjectThrottle &throttle, ImageCtx *image_ctx,
+ uint64_t object_no, const std::vector<uint64_t> *snap_ids,
+ size_t snap_id_idx)
+ : C_AsyncObjectThrottle(throttle), m_image_ctx(*image_ctx),
+ m_object_no(object_no), m_snap_ids(*snap_ids), m_snap_id_idx(snap_id_idx)
+ {
+ }
+
+ virtual int send() {
+ uint64_t snap_id = m_snap_ids[m_snap_id_idx];
+ if (snap_id == CEPH_NOSNAP) {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ RWLock::WLocker object_map_locker(m_image_ctx.object_map_lock);
+ assert(m_image_ctx.image_watcher->is_lock_owner());
+ bool sent = m_image_ctx.object_map.aio_update(m_object_no, OBJECT_EXISTS,
+ boost::optional<uint8_t>(),
+ this);
+ return (sent ? 0 : 1);
+ }
+
+ uint8_t state = OBJECT_EXISTS;
+ if (m_image_ctx.test_features(RBD_FEATURE_DEEP_FLATTEN) &&
+ m_snap_id_idx + 1 < m_snap_ids.size()) {
+ state = OBJECT_EXISTS_CLEAN;
+ }
+
+ RWLock::RLocker object_map_locker(m_image_ctx.object_map_lock);
+ m_image_ctx.object_map.aio_update(snap_id, m_object_no, m_object_no + 1,
+ state, boost::optional<uint8_t>(), this);
+ return 0;
+ }
+
+private:
+ ImageCtx &m_image_ctx;
+ uint64_t m_object_no;
+ const std::vector<uint64_t> &m_snap_ids;
+ size_t m_snap_id_idx;
+};
+
+} // anonymous namespace
+
+
CopyupRequest::CopyupRequest(ImageCtx *ictx, const std::string &oid,
uint64_t objectno,
vector<pair<uint64_t,uint64_t> >& image_extents)
void CopyupRequest::complete(int r)
{
if (should_complete(r)) {
+ complete_requests(r);
delete this;
}
}
case STATE_OBJECT_MAP:
ldout(cct, 20) << "OBJECT_MAP" << dendl;
- if (r == 0) {
- return send_copyup();
- }
- break;
+ assert(r == 0);
+ return send_copyup();
case STATE_COPYUP:
// invoked via a finisher in librados, so thread safe
pending_copyups = m_pending_copyups.dec();
ldout(cct, 20) << "COPYUP (" << pending_copyups << " pending)"
<< dendl;
- if (pending_copyups == 0 || r < 0) {
+ if (r < 0) {
complete_requests(r);
- return (pending_copyups == 0);
}
- break;
+ return (pending_copyups == 0);
default:
lderr(cct) << "invalid state: " << m_state << dendl;
assert(false);
break;
}
-
- if (r < 0) {
- complete_requests(r);
- return true;
- }
- return false;
+ return (r < 0);
}
void CopyupRequest::remove_from_list()
}
bool CopyupRequest::send_object_map() {
- bool copyup = false;
{
RWLock::RLocker l(m_ictx->owner_lock);
- if (!m_ictx->object_map.enabled()) {
- copyup = true;
- } else if (!m_ictx->image_watcher->is_lock_owner()) {
- ldout(m_ictx->cct, 20) << "exclusive lock not held for copyup request"
- << dendl;
- assert(m_pending_requests.empty());
- return true;
- } else {
+ if (m_ictx->object_map.enabled()) {
+ if (!m_ictx->image_watcher->is_lock_owner()) {
+ ldout(m_ictx->cct, 20) << "exclusive lock not held for copyup request"
+ << dendl;
+ assert(m_pending_requests.empty());
+ return true;
+ }
+
+ RWLock::RLocker snap_locker(m_ictx->snap_lock);
RWLock::WLocker object_map_locker(m_ictx->object_map_lock);
- if (m_ictx->object_map[m_object_no] != OBJECT_EXISTS) {
- ldout(m_ictx->cct, 20) << __func__ << " " << this
- << ": oid " << m_oid
- << ", extents " << m_image_extents
- << dendl;
- m_state = STATE_OBJECT_MAP;
-
- Context *ctx = create_callback_context();
- bool sent = m_ictx->object_map.aio_update(m_object_no, OBJECT_EXISTS,
- boost::optional<uint8_t>(),
- ctx);
- assert(sent);
- } else {
- copyup = true;
+ if (m_ictx->object_map[m_object_no] != OBJECT_EXISTS ||
+ !m_ictx->snaps.empty()) {
+ m_snap_ids.push_back(CEPH_NOSNAP);
+ m_snap_ids.insert(m_snap_ids.end(), m_ictx->snaps.begin(),
+ m_ictx->snaps.end());
}
}
}
// avoid possible recursive lock attempts
- if (copyup) {
+ if (m_snap_ids.empty()) {
// no object map update required
return send_copyup();
+ } else {
+ // update object maps for HEAD and all existing snapshots
+ ldout(m_ictx->cct, 20) << __func__ << " " << this
+ << ": oid " << m_oid
+ << dendl;
+ m_state = STATE_OBJECT_MAP;
+
+ AsyncObjectThrottle::ContextFactory context_factory(
+ boost::lambda::bind(boost::lambda::new_ptr<UpdateObjectMap>(),
+ boost::lambda::_1, m_ictx, m_object_no, &m_snap_ids,
+ boost::lambda::_2));
+ AsyncObjectThrottle *throttle = new AsyncObjectThrottle(
+ NULL, context_factory, create_callback_context(), NULL, 0,
+ m_snap_ids.size());
+ throttle->start_ops(m_ictx->concurrent_management_ops);
}
return false;
}