m->get_pg().ps(),
m->get_object_locator().get_pool(),
m->get_object_locator().nspace);
- int r = find_object_context(oid, &obc, can_create, &missing_oid);
+ int r = find_object_context(
+ oid, &obc, can_create,
+ m->get_flags() & CEPH_OSD_FLAG_MAP_SNAP_CLONE,
+ &missing_oid);
if (r == -EAGAIN) {
// If we're not the primary of this OSD, and we have
if (src_oid.is_head() && is_missing_object(src_oid)) {
wait_for_unreadable_object(src_oid, op);
} else if ((r = find_object_context(
- src_oid, &sobc, false, &wait_oid)) == -EAGAIN) {
+ src_oid, &sobc, false, false,
+ &wait_oid)) == -EAGAIN) {
// missing the specific snap we need; requeue and wait.
wait_for_unreadable_object(wait_oid, op);
} else if (r) {
oloc.pool = pool.info.tier_of;
start_copy(cb, obc, obc->obs.oi.soid, oloc, 0,
CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
- CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE,
+ CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE |
+ CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE,
obc->obs.oi.soid.snap == CEPH_NOSNAP);
assert(obc->is_blocked());
int ret = find_object_context(
hobject_t(soid.oid, soid.get_key(), snapid, soid.hash, info.pgid.pool(),
soid.get_namespace()),
- &rollback_to, false, &missing_oid);
+ &rollback_to, false, false, &missing_oid);
if (ret == -EAGAIN) {
/* clone must be missing */
assert(is_missing_object(missing_oid));
flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY)
flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE)
+ flags |= CEPH_OSD_FLAG_MAP_SNAP_CLONE;
C_GatherBuilder gather(g_ceph_context);
o.copy_from(soid.oid.name, soid.snap, oloc, oi.user_version,
CEPH_OSD_COPY_FROM_FLAG_FLUSH |
CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
- CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE);
+ CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE |
+ CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE);
}
C_Flush *fin = new C_Flush(this, soid, get_last_peering_reset());
object_locator_t base_oloc(soid);
int ReplicatedPG::find_object_context(const hobject_t& oid,
ObjectContextRef *pobc,
bool can_create,
+ bool map_snapid_to_clone,
hobject_t *pmissing)
{
hobject_t head(oid.oid, oid.get_key(), CEPH_NOSNAP, oid.hash,
}
// we want a snap
- if (pool.info.is_removed_snap(oid.snap)) {
+ if (!map_snapid_to_clone && pool.info.is_removed_snap(oid.snap)) {
dout(10) << __func__ << " snap " << oid.snap << " is removed" << dendl;
return -ENOENT;
}
return -ENOENT;
}
+ if (map_snapid_to_clone) {
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " map_snapid_to_clone=true" << dendl;
+ if (oid.snap > ssc->snapset.seq) {
+ // already must be readable
+ ObjectContextRef obc = get_object_context(head, false);
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " maps to head" << dendl;
+ *pobc = obc;
+ put_snapset_context(ssc);
+ return (obc && obc->obs.exists) ? 0 : -ENOENT;
+ } else {
+ vector<snapid_t>::const_iterator citer = std::find(
+ ssc->snapset.clones.begin(),
+ ssc->snapset.clones.end(),
+ oid.snap);
+ if (citer == ssc->snapset.clones.end()) {
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " maps to nothing" << dendl;
+ put_snapset_context(ssc);
+ return -ENOENT;
+ }
+
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " maps to " << oid << dendl;
+
+ if (pg_log.get_missing().is_missing(oid)) {
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " " << oid << " is missing" << dendl;
+ if (pmissing)
+ *pmissing = oid;
+ put_snapset_context(ssc);
+ return -EAGAIN;
+ }
+
+ ObjectContextRef obc = get_object_context(oid, false);
+ if (!obc || !obc->obs.exists) {
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " " << oid << " is not present" << dendl;
+ if (pmissing)
+ *pmissing = oid;
+ put_snapset_context(ssc);
+ return -ENOENT;
+ }
+ dout(10) << "find_object_context " << oid << " @" << oid.snap
+ << " snapset " << ssc->snapset
+ << " " << oid << " HIT" << dendl;
+ *pobc = obc;
+ put_snapset_context(ssc);
+ return 0;
+ }
+ assert(0); //unreachable
+ }
+
dout(10) << "find_object_context " << oid << " @" << oid.snap
<< " snapset " << ssc->snapset << dendl;