ldout(image_ctx.cct, 10) << "removing (with copyup) " << oid << dendl;
AioObjectRequest<> *req = new AioObjectTrim(&image_ctx, oid, m_object_no,
- m_snapc, this);
+ m_snapc, this, false);
req->send();
return 0;
}
RWLock::RLocker owner_lock(image_ctx.owner_lock);
switch (m_state) {
+ case STATE_PRE_COPYUP:
+ ldout(cct, 5) << " PRE_COPYUP" << dendl;
+ send_copyup_objects();
+ break;
+
case STATE_COPYUP_OBJECTS:
ldout(cct, 5) << " COPYUP_OBJECTS" << dendl;
+ send_post_copyup();
+ break;
+
+ case STATE_POST_COPYUP:
+ ldout(cct, 5) << " POST_COPYUP" << dendl;
send_pre_remove();
break;
template <typename I>
void TrimRequest<I>::send() {
- send_copyup_objects();
+ send_pre_copyup();
}
-template <typename I>
+template<typename I>
void TrimRequest<I>::send_copyup_objects() {
I &image_ctx = this->m_image_ctx;
assert(image_ctx.owner_lock.is_locked());
- assert(image_ctx.exclusive_lock == nullptr ||
- image_ctx.exclusive_lock->is_lock_owner());
- if (m_delete_start >= m_num_objects) {
- send_clean_boundary();
- return;
- }
+ ldout(image_ctx.cct, 5) << this << " send_copyup_objects: "
+ << " start object=" << m_copyup_start << ", "
+ << " end object=" << m_copyup_end << dendl;
+ m_state = STATE_COPYUP_OBJECTS;
::SnapContext snapc;
- bool has_snapshots;
- uint64_t parent_overlap;
{
RWLock::RLocker snap_locker(image_ctx.snap_lock);
RWLock::RLocker parent_locker(image_ctx.parent_lock);
-
snapc = image_ctx.snapc;
- has_snapshots = !image_ctx.snaps.empty();
- int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &parent_overlap);
- assert(r == 0);
}
- // copyup is only required for portion of image that overlaps parent
- uint64_t copyup_end = Striper::get_num_objects(image_ctx.layout,
- parent_overlap);
- // TODO: protect against concurrent shrink and snap create?
- if (copyup_end <= m_delete_start || !has_snapshots) {
- send_pre_remove();
- return;
- }
-
- uint64_t copyup_start = m_delete_start;
- m_delete_start = copyup_end;
-
- ldout(image_ctx.cct, 5) << this << " send_copyup_objects: "
- << " start object=" << copyup_start << ", "
- << " end object=" << copyup_end << dendl;
- m_state = STATE_COPYUP_OBJECTS;
-
Context *ctx = this->create_callback_context();
typename AsyncObjectThrottle<I>::ContextFactory context_factory(
boost::lambda::bind(boost::lambda::new_ptr<C_CopyupObject<I> >(),
boost::lambda::_1, &image_ctx, snapc, boost::lambda::_2));
AsyncObjectThrottle<I> *throttle = new AsyncObjectThrottle<I>(
- this, image_ctx, context_factory, ctx, &m_prog_ctx, copyup_start,
- copyup_end);
+ this, image_ctx, context_factory, ctx, &m_prog_ctx, m_copyup_start,
+ m_copyup_end);
throttle->start_ops(image_ctx.concurrent_management_ops);
}
throttle->start_ops(image_ctx.concurrent_management_ops);
}
+template<typename I>
+void TrimRequest<I>::send_pre_copyup() {
+ I &image_ctx = this->m_image_ctx;
+ assert(image_ctx.owner_lock.is_locked());
+
+ if (m_delete_start >= m_num_objects) {
+ send_clean_boundary();
+ return;
+ }
+
+ bool has_snapshots;
+ uint64_t parent_overlap;
+ {
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ RWLock::RLocker parent_locker(image_ctx.parent_lock);
+
+ has_snapshots = !image_ctx.snaps.empty();
+ int r = image_ctx.get_parent_overlap(CEPH_NOSNAP, &parent_overlap);
+ assert(r == 0);
+ }
+
+ // copyup is only required for portion of image that overlaps parent
+ m_copyup_end = Striper::get_num_objects(image_ctx.layout, parent_overlap);
+
+ // TODO: protect against concurrent shrink and snap create?
+ // skip to remove if no copyup is required.
+ if (m_copyup_end <= m_delete_start || !has_snapshots) {
+ send_pre_remove();
+ return;
+ }
+
+ m_copyup_start = m_delete_start;
+ m_delete_start = m_copyup_end;
+
+ bool copyup_objects = false;
+ {
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ if (image_ctx.object_map == nullptr) {
+ copyup_objects = true;
+ } else {
+ ldout(image_ctx.cct, 5) << this << " send_pre_copyup: "
+ << " copyup_start=" << m_copyup_start
+ << " copyup_end=" << m_copyup_end << dendl;
+ m_state = STATE_PRE_COPYUP;
+
+ assert(image_ctx.exclusive_lock->is_lock_owner());
+
+ Context *ctx = this->create_callback_context();
+ RWLock::WLocker object_map_locker(image_ctx.object_map_lock);
+ if (!image_ctx.object_map->aio_update(m_copyup_start, m_copyup_end,
+ OBJECT_PENDING, OBJECT_EXISTS, ctx)) {
+ delete ctx;
+ copyup_objects = true;
+ }
+ }
+ }
+
+ if (copyup_objects) {
+ send_copyup_objects();
+ }
+}
+
template <typename I>
void TrimRequest<I>::send_pre_remove() {
I &image_ctx = this->m_image_ctx;
}
}
+template<typename I>
+void TrimRequest<I>::send_post_copyup() {
+ I &image_ctx = this->m_image_ctx;
+ assert(image_ctx.owner_lock.is_locked());
+
+ bool pre_remove_objects = false;
+ {
+ RWLock::RLocker snap_locker(image_ctx.snap_lock);
+ if (image_ctx.object_map == nullptr) {
+ pre_remove_objects = true;
+ } else {
+ ldout(image_ctx.cct, 5) << this << " send_post_copyup:"
+ << " copyup_start=" << m_copyup_start
+ << " copyup_end=" << m_copyup_end << dendl;
+ m_state = STATE_POST_COPYUP;
+
+ assert(image_ctx.exclusive_lock->is_lock_owner());
+
+ Context *ctx = this->create_callback_context();
+ RWLock::WLocker object_map_locker(image_ctx.object_map_lock);
+ if (!image_ctx.object_map->aio_update(m_copyup_start, m_copyup_end,
+ OBJECT_NONEXISTENT, OBJECT_PENDING, ctx)) {
+ delete ctx;
+ pre_remove_objects = true;
+ }
+ }
+ }
+
+ if (pre_remove_objects) {
+ send_pre_remove();
+ }
+}
+
template <typename I>
void TrimRequest<I>::send_post_remove() {
I &image_ctx = this->m_image_ctx;
AioObjectRequest<> *req;
if (p->offset == 0) {
req = new AioObjectTrim(&image_ctx, p->oid.name, p->objectno, snapc,
- req_comp);
+ req_comp, true);
} else {
req = new AioObjectTruncate(&image_ctx, p->oid.name, p->objectno,
p->offset, snapc, req_comp);