cancel_flush(fop, false);
}
- // construct a SnapContext appropriate for this clone/head
+ /**
+ * In general, we need to send two deletes and a copyfrom.
+ * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]
+ * where 4 is marked as clean. To flush 10, we have to:
+ * 1) delete 4:[4,3,2] -- ensure head is created at cloneid 4
+ * 2) delete (8-1):[4,3,2] -- ensure that the object does not exist at 8
+ * 3) copyfrom 8:[8,4,3,2] -- flush object excluding snap 8
+ *
+ * The second delete is required in case at some point in the past
+ * there had been a clone 7(7,6), which we had flushed. Without
+ * the second delete, the object would appear in the base pool to
+ * have existed.
+ */
SnapContext dsnapc;
dsnapc.seq = 0;
SnapContext snapc;
vector<snapid_t>::iterator dnewest = p;
// we may need to send a delete first
- while (p != snapset.snaps.end() && *p > prev_snapc)
- ++p;
- dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+ if (prev_snapc + 1 < *dnewest) {
+ while (p != snapset.snaps.end() && *p > prev_snapc)
+ ++p;
+ dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
- if (p == dnewest) {
- // no snaps between the oldest in this clone and prev_snapc
- snapc.seq = prev_snapc;
- } else {
- // snaps between oldest in this clone and prev_snapc, send delete
dsnapc.seq = prev_snapc;
snapc.seq = oi.snaps.back() - 1;
+ } else {
+ snapc.seq = prev_snapc;
}
}
NULL,
NULL /* no callback, we'll rely on the ordering w.r.t the next op */);
osd->objecter_lock.Unlock();
+
+ // do we need to send the second delete?
+ SnapContext dsnapc2;
+ vector<snapid_t>::reverse_iterator rp = snapset.snaps.rbegin();
+
+ // advance rp to the smallest snap not contained by the last flushed clone
+ while (rp != snapset.snaps.rend() && *rp <= dsnapc.seq)
+ ++rp;
+
+ // set dnsnapc2.seq to be the snap prior to that snap (the object did not
+ // exist at *rq, so it must have been deleted prior to that).
+ dsnapc2.seq = (rp == snapset.snaps.rend()) ? snapset.seq : *rp;
+ if (dsnapc2.seq > 0)
+ dsnapc2.seq.val -= 1;
+
+ if (dsnapc2.seq != dsnapc.seq) {
+ dsnapc2.snaps = dsnapc.snaps;
+
+ ObjectOperation o2;
+ o2.remove();
+ osd->objecter_lock.Lock();
+ osd->objecter->mutate(
+ soid.oid,
+ base_oloc,
+ o2,
+ dsnapc2,
+ oi.mtime,
+ (CEPH_OSD_FLAG_IGNORE_OVERLAY |
+ CEPH_OSD_FLAG_ORDERSNAP |
+ CEPH_OSD_FLAG_ENFORCE_SNAPC),
+ NULL,
+ NULL /* no callback, we'll rely on the ordering w.r.t the next op */);
+ osd->objecter_lock.Unlock();
+ }
}
FlushOpRef fop(new FlushOp);