cancel_flush(fop, false);
}
- // construct a SnapContext appropriate for this clone/head
- SnapContext dsnapc;
- dsnapc.seq = 0;
- SnapContext snapc;
- if (soid.snap == CEPH_NOSNAP) {
- snapc.seq = snapset.seq;
- snapc.snaps = snapset.snaps;
+ /**
+ * In general, we need to send two deletes and a copyfrom.
+ * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]
+ * where 4 is marked as clean. To flush 10, we have to:
+ * 1) delete 4:[4,3,2] -- ensure head is created at cloneid 4
+ * 2) delete (8-1):[4,3,2] -- ensure that the object does not exist at 8
+ * 3) copyfrom 8:[8,4,3,2] -- flush object excluding snap 8
+ *
+ * The second delete is required in case at some point in the past
+ * there had been a clone 7(7,6), which we had flushed. Without
+ * the second delete, the object would appear in the base pool to
+ * have existed.
+ */
- if (!snapset.clones.empty() && snapset.clones.back() != snapset.seq) {
- dsnapc.seq = snapset.clones.back();
- vector<snapid_t>::iterator p = snapset.snaps.begin();
- while (p != snapset.snaps.end() && *p > dsnapc.seq)
- ++p;
- dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+ SnapContext snapc, dsnapc, dsnapc2;
+ if (snapset.seq != 0) {
+ if (soid.snap == CEPH_NOSNAP) {
+ snapc.seq = snapset.seq;
+ snapc.snaps = snapset.snaps;
+ } else {
+ snapid_t min_included_snap = oi.snaps.back();
+ snapc = snapset.get_ssc_as_of(min_included_snap - 1);
}
- } else {
- vector<snapid_t>::iterator citer = std::find(
- snapset.clones.begin(),
- snapset.clones.end(),
- soid.snap);
- assert(citer != snapset.clones.end());
- snapid_t prev_snapc = (citer == snapset.clones.begin()) ?
- snapid_t(0) : *(citer - 1);
-
- vector<snapid_t>::iterator p = snapset.snaps.begin();
- while (p != snapset.snaps.end() && *p >= oi.snaps.back())
- ++p;
- snapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
- vector<snapid_t>::iterator dnewest = p;
-
- // we may need to send a delete first
- while (p != snapset.snaps.end() && *p > prev_snapc)
- ++p;
- dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+ snapid_t prev_snapc = 0;
+ for (vector<snapid_t>::reverse_iterator citer = snapset.clones.rbegin();
+ citer != snapset.clones.rend();
+ ++citer) {
+ if (*citer < soid.snap) {
+ prev_snapc = *citer;
+ break;
+ }
+ }
- if (p == dnewest) {
- // no snaps between the oldest in this clone and prev_snapc
- snapc.seq = prev_snapc;
- } else {
- // snaps between oldest in this clone and prev_snapc, send delete
- dsnapc.seq = prev_snapc;
- snapc.seq = oi.snaps.back() - 1;
+ if (prev_snapc != snapc.seq) {
+ dsnapc = snapset.get_ssc_as_of(prev_snapc);
+ snapid_t first_snap_after_prev_snapc =
+ snapset.get_first_snap_after(prev_snapc, snapc.seq);
+ dsnapc2 = snapset.get_ssc_as_of(
+ first_snap_after_prev_snapc - 1);
}
}
object_locator_t base_oloc(soid);
base_oloc.pool = pool.info.tier_of;
- if (dsnapc.seq > 0) {
+ if (dsnapc.seq > 0 && dsnapc.seq < snapc.seq) {
ObjectOperation o;
o.remove();
osd->objecter_lock.Lock();
osd->objecter_lock.Unlock();
}
+ if (dsnapc2.seq > dsnapc.seq && dsnapc2.seq < snapc.seq) {
+ ObjectOperation o;
+ o.remove();
+ osd->objecter->mutate(
+ soid.oid,
+ base_oloc,
+ o,
+ dsnapc2,
+ oi.mtime,
+ (CEPH_OSD_FLAG_IGNORE_OVERLAY |
+ CEPH_OSD_FLAG_ORDERSNAP |
+ CEPH_OSD_FLAG_ENFORCE_SNAPC),
+ NULL,
+ NULL /* no callback, we'll rely on the ordering w.r.t the next op */);
+ }
+
FlushOpRef fop(new FlushOp);
fop->obc = obc;
fop->flushed_version = oi.user_version;