From e6cc012c0f045020fc9ed754421392bcf6e81447 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 25 May 2009 13:18:14 -0700 Subject: [PATCH] osd: track ObjectContext for cloned objects This orders access to a newly cloned object. This is really only important when you have a racing clone creation and a clone read. The read will look at the head's context and expect the clone to be there, but we may not have applied the write to disk yet. So, we set up an obc for the cloned object too (with the same mode as the head). --- src/osd/ReplicatedPG.cc | 92 ++++++++++++++++++++++++++++------------- src/osd/ReplicatedPG.h | 42 ++++++++++++------- 2 files changed, 89 insertions(+), 45 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 446d50f73e754..9ca127bceaf70 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -409,7 +409,8 @@ void ReplicatedPG::do_op(MOSDOp *op) } const sobject_t& soid = obc->soid; - OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(), &obc->oi); + OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(), + obc->state, &obc->oi); bool noop = false; if (ctx->ops.empty()) { @@ -1207,25 +1208,23 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, void ReplicatedPG::_make_clone(ObjectStore::Transaction& t, sobject_t head, pobject_t coid, - eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector& snaps) + object_info_t *poi) { - object_info_t pi(coid); - pi.version = v; - pi.prior_version = ov; - pi.last_reqid = reqid; - pi.mtime = mtime; - pi.snaps.swap(snaps); bufferlist bv; - ::encode(pi, bv); + ::encode(*poi, bv); t.clone(info.pgid.to_coll(), head, coid); t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv); } -void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector& log, osd_reqid_t reqid, pg_stat_t& stats, - sobject_t soid, loff_t old_size, object_info_t& oi, - eversion_t& at_version, SnapContext& snapc) +void ReplicatedPG::prepare_clone(OpContext *ctx, loff_t old_size, eversion_t old_version, + utime_t old_mtime, osd_reqid_t old_last_reqid) { + object_info_t& oi = *ctx->poi; + const sobject_t& soid = oi.soid; + SnapContext& snapc = ctx->snapc; + ObjectStore::Transaction& t = ctx->clone_t; + // clone? assert(soid.snap == CEPH_NOSNAP); dout(20) << "snapset=" << oi.snapset << " snapc=" << snapc << dendl;; @@ -1252,7 +1251,21 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector snaps[i] = snapc.snaps[i]; // prepare clone - _make_clone(t, soid, coid, oi.version, at_version, reqid, oi.mtime, snaps); + ctx->clone_obc = new ObjectContext; + ctx->clone_obc->state = ctx->mode; // take state from head obc's + ctx->clone_obc->soid = coid; + ctx->clone_obc->oi.soid = coid; + ctx->clone_obc->oi.version = ctx->at_version; + ctx->clone_obc->oi.prior_version = old_version; + ctx->clone_obc->oi.last_reqid = old_last_reqid; + ctx->clone_obc->oi.mtime = old_mtime; + ctx->clone_obc->oi.snaps = snaps; + + ctx->clone_obc->force_start_write(); + if (is_primary()) + register_object_context(ctx->clone_obc); + + _make_clone(t, soid, coid, &ctx->clone_obc->oi); // add to snap bound collections coll_t fc = make_snap_collection(t, snaps[0]); @@ -1262,20 +1275,20 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector t.collection_add(lc, info.pgid.to_coll(), coid); } - stats.num_objects++; - stats.num_object_clones++; + info.stats.num_objects++; + info.stats.num_object_clones++; oi.snapset.clones.push_back(coid.snap); oi.snapset.clone_size[coid.snap] = old_size; oi.snapset.clone_overlap[coid.snap].insert(0, old_size); // log clone dout(10) << "cloning v " << oi.version - << " to " << coid << " v " << at_version + << " to " << coid << " v " << ctx->at_version << " snaps=" << snaps << dendl; - log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime)); - ::encode(snaps, log.back().snaps); + ctx->log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, ctx->at_version, old_version, ctx->reqid, oi.mtime)); + ::encode(snaps, ctx->log.back().snaps); - at_version.version++; + ctx->at_version.version++; } // update snapset with latest snap context @@ -1599,22 +1612,27 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size) { assert(!ctx->ops.empty()); - eversion_t old_version = ctx->poi->version; - - const sobject_t& soid = ctx->poi->soid; object_info_t *poi = ctx->poi; + const sobject_t& soid = poi->soid; + + // we'll need this to log + eversion_t old_version = poi->version; + + // set these values aside, in case we need to clone + __u64 old_size = size; + utime_t old_mtime = poi->mtime; + osd_reqid_t old_last_reqid = poi->last_reqid; // prepare the actual mutation bufferlist::iterator bp = ctx->indata.begin(); int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata, exists, size); if (result < 0 || ctx->op_t.empty()) - return result; + return result; // error, or read op. // clone? if (soid.snap) - prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi, - ctx->at_version, ctx->snapc); + prepare_clone(ctx, old_size, old_version, old_mtime, old_last_reqid); // finish and log the op. poi->version = ctx->at_version; @@ -1732,6 +1750,12 @@ void ReplicatedPG::apply_repop(RepGather *repop) repop->applied = true; + if (repop->ctx->clone_obc) { + repop->ctx->clone_obc->finish_write(); + put_object_context(repop->ctx->clone_obc); + repop->ctx->clone_obc = 0; + } + repop->obc->finish_write(); put_object_context(repop->obc); @@ -2278,7 +2302,7 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) oi.version = op->old_version; oi.snapset = op->snapset; - OpContext ctx(op, op->reqid, op->ops, op->get_data(), &oi); + OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &oi); if (!op->noop) { ctx.mtime = op->mtime; @@ -3097,10 +3121,20 @@ int ReplicatedPG::recover_primary(int max) dout(10) << "recover_primary cloning " << head << " v" << latest->prior_version << " to " << soid << " v" << latest->version << " snaps " << latest->snaps << dendl; - vector snaps; - ::decode(snaps, latest->snaps); ObjectStore::Transaction t; - _make_clone(t, head, soid, latest->prior_version, latest->version, latest->reqid, latest->mtime, snaps); + + ObjectContext *headobc = get_object_context(head); + + object_info_t oi(soid); + oi.version = latest->version; + oi.prior_version = latest->prior_version; + oi.last_reqid = headobc->oi.last_reqid; + oi.mtime = headobc->oi.mtime; + ::decode(oi.snaps, latest->snaps); + _make_clone(t, head, soid, &oi); + + put_object_context(headobc); + osd->store->apply_transaction(t); missing.got(latest->soid, latest->version); missing_loc.erase(latest->soid); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index e9d2c38a30f13..05b4ffe4658f4 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -65,20 +65,13 @@ public: * replicas ack. */ struct ObjectContext { - sobject_t soid; - int ref; - bool registered; - - void get() { ++ref; } - - enum { + typedef enum { IDLE, DELAYED, RMW, DELAYED_FLUSHING, RMW_FLUSHING - } state; - + } state_t; static const char *get_state_name(int s) { switch (s) { case IDLE: return "idle"; @@ -90,6 +83,12 @@ public: } } + sobject_t soid; + int ref; + bool registered; + + state_t state; + int num_wr, num_rmw; entity_inst_t client; list waiting; @@ -99,6 +98,9 @@ public: __u64 size; object_info_t oi; + + + void get() { ++ref; } bool is_delayed_mode() { return state == DELAYED || state == DELAYED_FLUSHING; @@ -168,6 +170,9 @@ public: num_wr++; assert(state == DELAYED || state == RMW); } + void force_start_write() { + num_wr++; + } void finish_write() { assert(num_wr > 0); --num_wr; @@ -228,6 +233,7 @@ public: bufferlist& indata; bufferlist outdata; + ObjectContext::state_t mode; // DELAYED or RMW (or _FLUSHING variant?) object_info_t *poi; utime_t mtime; @@ -237,12 +243,17 @@ public: ObjectStore::Transaction op_t, clone_t, local_t; vector log; + ObjectContext *clone_obc; // if we created a clone + int data_off; // FIXME: we may want to kill this msgr hint off at some point! OpContext(Message *_op, osd_reqid_t _reqid, vector& _ops, bufferlist& _data, - object_info_t *_poi) : - op(_op), reqid(_reqid), ops(_ops), indata(_data), poi(_poi), - data_off(0) {} + ObjectContext::state_t _mode, object_info_t *_poi) : + op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), poi(_poi), + clone_obc(0), data_off(0) {} + ~OpContext() { + assert(!clone_obc); + } }; /* @@ -380,10 +391,9 @@ protected: void _make_clone(ObjectStore::Transaction& t, sobject_t head, sobject_t coid, - eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector& snaps); - void prepare_clone(ObjectStore::Transaction& t, vector& log, osd_reqid_t reqid, pg_stat_t& st, - sobject_t poid, loff_t old_size, object_info_t& oi, - eversion_t& at_version, SnapContext& snapc); + object_info_t *poi); + void prepare_clone(OpContext *ctx, loff_t old_size, + eversion_t old_version, utime_t old_mtime, osd_reqid_t old_last_reqid); void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st); int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st, sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi, -- 2.39.5