From 3c72caf1d2ebfa0530b90ea469f8f9fdb24977ce Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 22 May 2009 18:17:46 -0700 Subject: [PATCH] osd: break apart write stages, transactions We break the write preparation into three stages. First we run the ops vector and build the op_t transaction. If it is non-empty, we build a clone_t transaction to run before it, and a local_t that updates the osd's PG log and metadata. Take care to preserve old exists, size, and version values before running the ops vector as those are clobbered but need to be send to the replica osds. --- src/osd/ReplicatedPG.cc | 79 +++++++++++++++++++++++++---------------- src/osd/ReplicatedPG.h | 9 +++-- 2 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4631f9f9db277..6a0fc96782fb7 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -894,7 +894,7 @@ void ReplicatedPG::_make_clone(ObjectStore::Transaction& t, t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv); } -void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& stats, +void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector& log, osd_reqid_t reqid, pg_stat_t& stats, sobject_t soid, loff_t old_size, object_info_t& oi, eversion_t& at_version, SnapContext& snapc) { @@ -944,9 +944,8 @@ void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, dout(10) << "cloning v " << oi.version << " to " << coid << " v " << at_version << " snaps=" << snaps << dendl; - Log::Entry cloneentry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime); - ::encode(snaps, cloneentry.snaps); - add_log_entry(cloneentry, logbl); + log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime)); + ::encode(snaps, log.back().snaps); at_version.version++; } @@ -1268,7 +1267,6 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size, eversion_t trim_to) { - bufferlist log_bl; eversion_t log_version = ctx->at_version; assert(!ctx->ops.empty()); @@ -1278,22 +1276,23 @@ void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size vector& ops = ctx->ops; object_info_t *poi = ctx->poi; - // apply ops - bool did_snap = false; + + // prepare the actual mutation bufferlist::iterator bp = ctx->data.begin(); - for (unsigned i=0; it, log_bl, ctx->reqid, info.stats, soid, size, *poi, - ctx->at_version, ctx->snapc); - did_snap = true; - } - prepare_simple_op(ctx->t, ctx->reqid, info.stats, soid, size, exists, *poi, + for (unsigned i=0; iop_t, ctx->reqid, info.stats, soid, size, exists, *poi, ops, i, bp, ctx->snapc); - } - // finish. + // FIXME FIXME + if (ctx->op_t.empty()) + return; + + // clone? + if (soid.snap) + prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi, + ctx->at_version, ctx->snapc); + + // finish and log the op. poi->version = ctx->at_version; if (exists) { poi->version = ctx->at_version; @@ -1308,19 +1307,24 @@ void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size bufferlist bv(sizeof(*poi)); ::encode(*poi, bv); - ctx->t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv); + ctx->op_t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv); } // append to log int logopcode = Log::Entry::MODIFY; if (!exists) logopcode = Log::Entry::DELETE; - Log::Entry logentry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime); - add_log_entry(logentry, log_bl); + ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime)); + + // update the local pg, pg log + write_info(ctx->local_t); - // write pg info, log to disk - write_info(ctx->t); - append_log(ctx->t, log_bl, log_version, trim_to); + bufferlist log_bl; + for (vector::iterator p = ctx->log.begin(); + p != ctx->log.end(); + p++) + add_log_entry(*p, log_bl); + append_log(ctx->local_t, log_bl, log_version, trim_to); } @@ -1377,12 +1381,19 @@ void ReplicatedPG::apply_repop(RepGather *repop) assert(!repop->applied); Context *oncommit = new C_OSD_ModifyCommit(this, repop); - unsigned r = osd->store->apply_transaction(repop->ctx->t, oncommit); + + list tls; + tls.push_back(&repop->ctx->clone_t); + tls.push_back(&repop->ctx->op_t); + tls.push_back(&repop->ctx->local_t); + unsigned r = osd->store->apply_transactions(tls, oncommit); if (r) dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl; // discard my reference to the buffer repop->ctx->op->get_data().clear(); + tls.clear(); + repop->ctx->op_t.clear_data(); repop->applied = true; @@ -1506,7 +1517,8 @@ void ReplicatedPG::eval_repop(RepGather *repop) } -void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) +void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now, + bool old_exists, __u64 old_size, eversion_t old_version) { const sobject_t& soid = repop->ctx->poi->soid; dout(7) << " issue_repop rep_tid " << repop->rep_tid @@ -1521,9 +1533,9 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) osd->osdmap->get_epoch(), repop->rep_tid, repop->ctx->at_version); wr->mtime = repop->ctx->mtime; - wr->old_exists = repop->obc->exists; - wr->old_size = repop->obc->size; - wr->old_version = repop->obc->oi.version; + wr->old_exists = old_exists; + wr->old_size = old_size; + wr->old_version = old_version; wr->snapset = repop->obc->oi.snapset; wr->snapc = repop->ctx->snapc; wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist @@ -1657,7 +1669,7 @@ int ReplicatedPG::find_object_context(object_t oid, snapid_t snapid, // head? if (snapid > hobc->oi.snapset.seq) { - dout(10) << "get_object_context " << head + dout(10) << "find_object_context " << head << " want " << snapid << " > snapset seq " << hobc->oi.snapset.seq << " -- HIT" << dendl; *pobc = hobc; @@ -1850,6 +1862,11 @@ void ReplicatedPG::op_modify(MOSDOp *op, ObjectContext *obc) if (log.top.version - log.bottom.version > info.stats.num_objects) trim_to = peers_complete_thru; + // note some basic context for op replication that prepare_transaction may clobber + bool old_exists = obc->exists; + __u64 old_size = obc->size; + eversion_t old_version = obc->oi.version; + // we are acker. if (!noop) { // log and update later. @@ -1857,7 +1874,7 @@ void ReplicatedPG::op_modify(MOSDOp *op, ObjectContext *obc) } for (unsigned i=1; i log; int data_off; // FIXME: we may want to kill this msgr hint off at some point! @@ -314,7 +316,8 @@ protected: void apply_repop(RepGather *repop); void eval_repop(RepGather*); - void issue_repop(RepGather *repop, int dest, utime_t now); + void issue_repop(RepGather *repop, int dest, utime_t now, + bool old_exists, __u64 old_size, eversion_t old_version); RepGather *new_repop(OpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid); void repop_ack(RepGather *repop, int result, int ack_type, @@ -363,7 +366,7 @@ protected: void _make_clone(ObjectStore::Transaction& t, sobject_t head, sobject_t coid, eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector& snaps); - void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& st, + void prepare_clone(ObjectStore::Transaction& t, vector& log, osd_reqid_t reqid, pg_stat_t& st, sobject_t poid, loff_t old_size, object_info_t& oi, eversion_t& at_version, SnapContext& snapc); void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st); -- 2.39.5