}
if (obc.get() && obc->obs.exists && obc->obs.oi.has_manifest()) {
+ if (recover_adjacent_clones(obc, op)) {
+ return;
+ }
if (maybe_handle_manifest(op,
write_ordered,
obc))
return cnt;
}
+bool PrimaryLogPG::recover_adjacent_clones(ObjectContextRef obc, OpRequestRef op)
+{
+ if (!obc->obs.oi.manifest.is_chunked() || !obc->ssc || !obc->ssc->snapset.clones.size()) {
+ return false;
+ }
+
+ const SnapSet& snapset = obc->ssc->snapset;
+ auto s = std::find(snapset.clones.begin(), snapset.clones.end(), obc->obs.oi.soid.snap);
+ auto is_unreadable_snap = [this, obc, &snapset, op](auto iter) -> bool {
+ hobject_t cid = obc->obs.oi.soid;
+ cid.snap = (iter == snapset.clones.end()) ? snapid_t(CEPH_NOSNAP) : *iter;
+ if (is_unreadable_object(cid)) {
+ dout(10) << __func__ << ": clone " << cid
+ << " is unreadable, waiting" << dendl;
+ wait_for_unreadable_object(cid, op);
+ return true;
+ }
+ return false;
+ };
+ if (s != snapset.clones.begin()) {
+ if (is_unreadable_snap(s - 1)) {
+ return true;
+ }
+ }
+ if (s != snapset.clones.end()) {
+ if (is_unreadable_snap(s + 1)) {
+ return true;
+ }
+ }
+ return false;
+}
+
ObjectContextRef PrimaryLogPG::get_prev_clone_obc(ObjectContextRef obc)
{
auto s = std::find(obc->ssc->snapset.clones.begin(), obc->ssc->snapset.clones.end(),
void dec_refcount(const hobject_t& soid, const object_ref_delta_t& refs);
void dec_refcount_by_dirty(OpContext* ctx);
ObjectContextRef get_prev_clone_obc(ObjectContextRef obc);
+ bool recover_adjacent_clones(ObjectContextRef obc, OpRequestRef op);
void get_adjacent_clones(ObjectContextRef src_obc,
ObjectContextRef& _l, ObjectContextRef& _g);
bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt,