if (ctx->obs->exists)
filter_snapc(snapc);
- if (ctx->obs->exists && // head exist(ed)
+ if ((ctx->obs->exists && !ctx->obs->oi.is_whiteout()) && // head exist(ed)
snapc.snaps.size() && // there are snaps
snapc.snaps[0] > ctx->new_snapset.seq) { // existing object is old
// clone
cancel_flush(fop, false);
}
+ // construct a SnapContext appropriate for this clone/head
+ SnapContext dsnapc;
+ SnapContext snapc;
+ if (soid.snap == CEPH_NOSNAP) {
+ snapc.seq = snapset.seq;
+ snapc.snaps = snapset.snaps;
+
+ if (!snapset.clones.empty() && snapset.clones.back() != snapset.seq) {
+ dsnapc.seq = snapset.clones.back();
+ vector<snapid_t>::iterator p = snapset.snaps.begin();
+ while (p != snapset.snaps.end() && *p > dsnapc.seq)
+ ++p;
+ dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+ }
+ } else {
+ vector<snapid_t>::iterator citer = std::find(
+ snapset.clones.begin(),
+ snapset.clones.end(),
+ soid.snap);
+ assert(citer != snapset.clones.end());
+ snapid_t prev_snapc = (citer == snapset.clones.begin()) ?
+ snapid_t(0) : *(citer - 1);
+
+ vector<snapid_t>::iterator p = snapset.snaps.begin();
+ while (p != snapset.snaps.end() && *p >= oi.snaps.back())
+ ++p;
+ snapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+
+ // we may need to send a delete first
+ while (p != snapset.snaps.end() && *p > prev_snapc)
+ ++p;
+ dsnapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
+
+ if (dsnapc.snaps.empty()) {
+ snapc.seq = prev_snapc;
+ } else {
+ dsnapc.seq = prev_snapc;
+ snapc.seq = oi.snaps.back() - 1;
+ }
+ }
+
+ object_locator_t base_oloc(soid);
+ base_oloc.pool = pool.info.tier_of;
+
+ if (!dsnapc.snaps.empty()) {
+ ObjectOperation o;
+ o.remove();
+ osd->objecter_lock.Lock();
+ osd->objecter->mutate(
+ soid.oid,
+ base_oloc,
+ o,
+ dsnapc,
+ oi.mtime,
+ CEPH_OSD_FLAG_IGNORE_OVERLAY | CEPH_OSD_FLAG_ORDERSNAP,
+ NULL,
+ NULL /* no callback, we'll rely on the ordering w.r.t the next op */);
+ osd->objecter_lock.Unlock();
+ }
+
FlushOpRef fop(new FlushOp);
fop->ctx = ctx;
fop->flushed_version = oi.user_version;
CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE);
}
C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
- object_locator_t base_oloc(soid);
- base_oloc.pool = pool.info.tier_of;
-
- // construct a SnapContext appropriate for this clone/head
- SnapContext snapc;
- if (soid.snap == CEPH_NOSNAP) {
- snapc.seq = snapset.seq;
- snapc.snaps = snapset.snaps;
- } else {
- // we want to only include snaps that are older than the oldest
- // snap for which we are defined, so that the object appears to
- // have been written before that.
- vector<snapid_t>::iterator p = snapset.snaps.begin();
- while (p != snapset.snaps.end() && *p >= oi.snaps.back())
- ++p;
- snapc.snaps = vector<snapid_t>(p, snapset.snaps.end());
- snapc.seq = oi.snaps.back() - 1;
- }
osd->objecter_lock.Lock();
ceph_tid_t tid = osd->objecter->mutate(soid.oid, base_oloc, o, snapc, oi.mtime,