start_copy(cb, obc, obc->obs.oi.soid, oloc, 0,
CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE,
+ obc->obs.oi.soid.snap == CEPH_NOSNAP,
temp_target);
assert(obc->is_blocked());
ctx->copy_cb = cb;
start_copy(cb, ctx->obc, src, src_oloc, src_version,
op.copy_from.flags,
+ false,
temp_target);
result = -EINPROGRESS;
} else {
void ReplicatedPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
hobject_t src, object_locator_t oloc,
version_t version, unsigned flags,
+ bool mirror_snapset,
const hobject_t& temp_dest_oid)
{
const hobject_t& dest = obc->obs.oi.soid;
dout(10) << __func__ << " " << dest
<< " from " << src << " " << oloc << " v" << version
<< " flags " << flags
+ << (mirror_snapset ? " mirror_snapset" : "")
<< dendl;
+ assert(!mirror_snapset || (src.snap == CEPH_NOSNAP ||
+ src.snap == CEPH_SNAPDIR));
+
// cancel a previous in-progress copy?
if (copy_ops.count(dest)) {
// FIXME: if the src etc match, we could avoid restarting from the
cancel_copy(cop, false);
}
- CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, flags, temp_dest_oid));
+ CopyOpRef cop(new CopyOp(cb, obc, src, oloc, version, flags,
+ mirror_snapset, temp_dest_oid));
copy_ops[dest] = cop;
obc->start_block();
void ReplicatedPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
{
dout(10) << __func__ << " " << obc << " " << cop << dendl;
+
+ unsigned flags = 0;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_FLUSH)
+ flags |= CEPH_OSD_FLAG_FLUSH;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE)
+ flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY)
+ flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+
+ C_GatherBuilder gather(g_ceph_context);
+
+ if (cop->cursor.is_initial() && cop->mirror_snapset) {
+ // list snaps too.
+ assert(cop->src.snap == CEPH_NOSNAP);
+ ObjectOperation op;
+ op.list_snaps(&cop->results->snapset, NULL);
+ osd->objecter_lock.Lock();
+ tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
+ CEPH_SNAPDIR, NULL,
+ flags, gather.new_sub(), NULL);
+ cop->objecter_tid2 = tid;
+ osd->objecter_lock.Unlock();
+ }
+
ObjectOperation op;
if (cop->results->user_version) {
op.assert_version(cop->results->user_version);
C_Copyfrom *fin = new C_Copyfrom(this, obc->obs.oi.soid,
get_last_peering_reset());
-
- unsigned flags = 0;
- if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_FLUSH)
- flags |= CEPH_OSD_FLAG_FLUSH;
- if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE)
- flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
- if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY)
- flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+ gather.set_finisher(new C_OnFinisher(fin,
+ &osd->objecter_finisher));
osd->objecter_lock.Lock();
tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
cop->src.snap, NULL,
flags,
- new C_OnFinisher(fin,
- &osd->objecter_finisher),
+ gather.new_sub(),
// discover the object version if we don't know it yet
cop->results->user_version ? NULL : &cop->results->user_version);
fin->tid = tid;
cop->objecter_tid = tid;
+ gather.activate();
osd->objecter_lock.Unlock();
}
return;
}
cop->objecter_tid = 0;
+ cop->objecter_tid2 = 0; // assume this ordered before us (if it happened)
ObjectContextRef& cobc = cop->obc;
if (r >= 0) {
if (soid.snap < CEPH_NOSNAP)
++tctx->delta_stats.num_object_clones;
tctx->new_obs.exists = true;
- tctx->new_snapset.head_exists = true;
if (whiteout) {
// create a whiteout
tctx->new_obs.oi.snaps = results->snaps;
}
+ if (results->mirror_snapset) {
+ assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
+ tctx->new_snapset.from_snap_set(results->snapset);
+ }
+ tctx->new_snapset.head_exists = true;
+ dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
+
// take RWWRITE lock for duration of our local write
if (!obc->rwstate.get_write_lock()) {
assert(0 == "problem!");
if (cop->objecter_tid) {
Mutex::Locker l(osd->objecter_lock);
osd->objecter->op_cancel(cop->objecter_tid);
+ if (cop->objecter_tid2) {
+ osd->objecter->op_cancel(cop->objecter_tid2);
+ }
}
copy_ops.erase(cop->obc->obs.oi.soid);
version_t user_version; ///< The copy source's user version
bool should_requeue; ///< op should be requeued on cancel
vector<snapid_t> snaps; ///< src's snaps (if clone)
+ librados::snap_set_t snapset; ///< src snapset (if head)
+ bool mirror_snapset;
CopyResults() : object_size(0), started_temp_obj(false),
- user_version(0), should_requeue(false) {}
+ user_version(0), should_requeue(false),
+ mirror_snapset(false) {}
};
struct CopyOp {
hobject_t src;
object_locator_t oloc;
unsigned flags;
+ bool mirror_snapset;
CopyResults *results;
tid_t objecter_tid;
+ tid_t objecter_tid2;
object_copy_cursor_t cursor;
map<string,bufferlist> attrs;
object_locator_t l,
version_t v,
unsigned f,
+ bool ms,
const hobject_t& dest)
: cb(cb_), obc(_obc), src(s), oloc(l), flags(f),
+ mirror_snapset(ms),
results(NULL),
objecter_tid(0),
+ objecter_tid2(0),
rval(-1),
temp_oid(dest)
{
results = new CopyResults();
results->user_version = v;
+ results->mirror_snapset = mirror_snapset;
}
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
*/
void start_copy(CopyCallback *cb, ObjectContextRef obc, hobject_t src,
object_locator_t oloc, version_t version, unsigned flags,
+ bool mirror_snapset,
const hobject_t& temp_dest_oid);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);