// 'flatten' child image by copying all parent's blocks
int flatten(ImageCtx *ictx, ProgressContext &prog_ctx)
{
- ldout(ictx->cct, 20) << "flatten" << dendl;
+ CephContext *cct = ictx->cct;
+ ldout(cct, 20) << "flatten" << dendl;
if (ictx->read_only)
return -EROFS;
int r;
// ictx_check also updates parent data
if ((r = ictx_check(ictx)) < 0) {
- lderr(ictx->cct) << "ictx_check failed" << dendl;
+ lderr(cct) << "ictx_check failed" << dendl;
return r;
}
uint64_t overlap;
uint64_t overlap_periods;
uint64_t overlap_objects;
+ ::SnapContext snapc;
{
RWLock::RLocker l(ictx->md_lock);
// can't flatten a non-clone
if (ictx->parent_md.spec.pool_id == -1) {
- lderr(ictx->cct) << "image has no parent" << dendl;
+ lderr(cct) << "image has no parent" << dendl;
return -EINVAL;
}
if (ictx->snap_id != CEPH_NOSNAP || ictx->read_only) {
- lderr(ictx->cct) << "snapshots cannot be flattened" << dendl;
+ lderr(cct) << "snapshots cannot be flattened" << dendl;
return -EROFS;
}
+ snapc = ictx->snapc;
assert(ictx->parent != NULL);
assert(ictx->parent_md.overlap <= ictx->size);
overlap_objects = overlap_periods * ictx->get_stripe_count();
}
- char *buf = new char[object_size];
+ SimpleThrottle throttle(cct->_conf->rbd_concurrent_management_ops, false);
for (uint64_t ono = 0; ono < overlap_objects; ono++) {
- prog_ctx.update_progress(ono, overlap_objects);
+ {
+ RWLock::RLocker l(ictx->parent_lock);
+ // stop early if the parent went away - it just means
+ // another flatten finished first, so this one is useless.
+ if (!ictx->parent) {
+ r = 0;
+ goto err;
+ }
+ }
// map child object onto the parent
vector<pair<uint64_t,uint64_t> > objectx;
- Striper::extent_to_file(ictx->cct, &ictx->layout,
+ Striper::extent_to_file(cct, &ictx->layout,
ono, 0, object_size,
objectx);
uint64_t object_overlap = ictx->prune_parent_extents(objectx, overlap);
assert(object_overlap <= object_size);
- RWLock::RLocker l(ictx->parent_lock);
- // stop early if the parent went away - it just means
- // another flatten finished first, so this one is useless.
- if (!ictx->parent) {
- r = 0;
- goto err;
- }
- if ((r = read(ictx->parent, objectx, buf, NULL)) < 0) {
- lderr(ictx->cct) << "reading from parent failed" << dendl;
+ bufferlist bl;
+ string oid = ictx->get_object_name(ono);
+ throttle.start_op();
+ Context *comp = new C_SimpleThrottle(&throttle);
+ AioWrite *req = new AioWrite(ictx, oid, ono, 0, objectx, object_overlap,
+ bl, snapc, CEPH_NOSNAP, comp);
+ r = req->send();
+ if (r < 0) {
+ lderr(cct) << "failed to flatten object " << oid << dendl;
goto err;
}
- // for actual amount read, if data is all zero, don't bother with block
- if (!buf_is_zero(buf, r)) {
- bufferlist bl;
- string oid = ictx->get_object_name(ono);
- bl.append(buf, r);
- r = cls_client::copyup(&ictx->data_ctx, oid, bl);
- if (r < 0) {
- lderr(ictx->cct) << "failed to copy block to child" << dendl;
- goto err;
- }
- }
+ prog_ctx.update_progress(ono, overlap_objects);
+ }
+
+ r = throttle.wait_for_ret();
+ if (r < 0) {
+ lderr(cct) << "failed to flatten at least one object: "
+ << cpp_strerror(r) << dendl;
+ goto err;
}
// remove parent from this (base) image
r = cls_client::remove_parent(&ictx->md_ctx, ictx->header_oid);
if (r < 0) {
- lderr(ictx->cct) << "error removing parent" << dendl;
+ lderr(cct) << "error removing parent" << dendl;
return r;
}
// will be removed when the last snap goes away)
ictx->snap_lock.get_read();
if (ictx->snaps.empty()) {
- ldout(ictx->cct, 2) << "removing child from children list..." << dendl;
+ ldout(cct, 2) << "removing child from children list..." << dendl;
int r = cls_client::remove_child(&ictx->md_ctx, RBD_CHILDREN,
ictx->parent_md.spec, ictx->id);
if (r < 0) {
- lderr(ictx->cct) << "error removing child from children list" << dendl;
+ lderr(cct) << "error removing child from children list" << dendl;
ictx->snap_lock.put_read();
return r;
}
ictx->snap_lock.put_read();
notify_change(ictx->md_ctx, ictx->header_oid, NULL, ictx);
- ldout(ictx->cct, 20) << "finished flattening" << dendl;
+ ldout(cct, 20) << "finished flattening" << dendl;
+
+ return 0;
err:
- delete[] buf;
+ throttle.wait_for_ret();
return r;
}