From b82ba820a5d90b49b4c09bcc1d421531a2bdf46a Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Fri, 21 May 2010 14:30:03 -0700 Subject: [PATCH] osd: implement rollback functionality --- src/osd/ReplicatedPG.cc | 62 +++++++++++++++++++++++++++++++++++++++++ src/osd/ReplicatedPG.h | 1 + 2 files changed, 63 insertions(+) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ee81199b3819f..27edc3e800d3c 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1282,6 +1282,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, } break; + case CEPH_OSD_OP_ROLLBACK : + _rollback_to(ctx, op); + break; + case CEPH_OSD_OP_ZERO: { // zero assert(op.extent.length); @@ -1702,7 +1706,65 @@ inline void ReplicatedPG::_delete_head(OpContext *ctx) info.stats.num_wr++; } +void ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) +{ + SnapSetContext *ssc = ctx->obs->ssc; + const sobject_t& soid = ctx->obs->oi.soid; + ObjectStore::Transaction& t = ctx->op_t; + ObjectContext *rollback_to; + int ret=find_object_context(soid.oid, op.snap.snapid, &rollback_to, false); + sobject_t& rollback_to_sobject = rollback_to->obs.oi.soid; + if (ret) { + if (-ENOENT == ret) { + // there's no snapshot here, or there's no object. + // if there's no snapshot, we delete the object; otherwise, do nothing. + _delete_head(ctx); + } else if (-EAGAIN == ret) { + /* a different problem, like degraded pool + * with not-yet-restored object. We shouldn't have been able + * to get here; recovery should have completed first! */ + assert(0); + } else { + // ummm....huh? It *can't* return anything else at time of writing. + assert(0); + } + } else { //we got our context, let's use it to do the rollback! + if (ctx->clone_obc && + (ctx->clone_obc->obs.oi.prior_version == ctx->obs->oi.prior_version)) + ; //just cloned the rollback target, we don't need to do anything! + + else { + /* 1) Delete current head + * 2) Clone correct snapshot into head + * 3) Calculate clone_overlaps by following overlaps + * forward from rollback snapshot */ + sobject_t new_head = get_object_context(ctx->obs->oi.soid) + ->obs.oi.soid; + + _delete_head(ctx); + ctx->obs->exists = true; //we're about to recreate it + + map attrs; + t.clone(coll_t::build_pg_coll(info.pgid), + rollback_to_sobject, new_head); + osd->store->getattrs(coll_t::build_pg_coll(info.pgid), + rollback_to_sobject, attrs, false); + t.setattrs(coll_t::build_pg_coll(info.pgid), new_head, attrs); + ssc->snapset.head_exists = true; + + map >::iterator iter = + ssc->snapset.clone_overlap.lower_bound(op.snap.snapid); + interval_set overlaps = iter->second; + for ( ; + iter != ssc->snapset.clone_overlap.end(); + ++iter) { + overlaps.intersection_of(iter->second); + } + ssc->snapset.clone_overlap[new_head.snap] = overlaps; + } + } +} void ReplicatedPG::_make_clone(ObjectStore::Transaction& t, const sobject_t& head, const sobject_t& coid, diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 91c7e7e3f3028..5e9b094dbd96c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -585,6 +585,7 @@ public: bufferlist& odata); private: void _delete_head(OpContext *ctx); + void _rollback_to(OpContext *ctx, ceph_osd_op& op); public: bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); -- 2.39.5