From 1f87d007e2f86eb3aaf494257c84e42006f84f75 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 28 May 2009 11:23:43 -0700 Subject: [PATCH] osd: ship transaction (not op) to replicas This simplifies the code path on the replica. It avoids duplicating computation, which can be a win even for trivial ops when snaps are involved. For active objects, it will avoid the computation as well. Currently parallel execution will still happen if you specify a flag. However, an object method should probably also be able to specify || execution is better in cases where the operation is deterministic and || exec will mean less data over the network. Disable unused vector template, since it confuses the vector encoder. --- src/include/encoding.h | 4 +- src/include/rados.h | 1 + src/messages/MOSDSubOp.h | 23 ++++--- src/os/ObjectStore.h | 2 + src/osd/PG.cc | 3 +- src/osd/ReplicatedPG.cc | 126 ++++++++++++++++++++++++--------------- src/osd/ReplicatedPG.h | 2 +- 7 files changed, 100 insertions(+), 61 deletions(-) diff --git a/src/include/encoding.h b/src/include/encoding.h index 59dc460fff956..4115da802940f 100644 --- a/src/include/encoding.h +++ b/src/include/encoding.h @@ -253,7 +253,7 @@ inline void decode(std::set& s, bufferlist::iterator& p) } // vector (pointers) -template +/*template inline void encode(const std::vector& v, bufferlist& bl) { __u32 n = v.size(); @@ -270,7 +270,7 @@ inline void decode(std::vector& v, bufferlist::iterator& p) for (__u32 i=0; i inline void encode(const std::vector& v, bufferlist& bl) diff --git a/src/include/rados.h b/src/include/rados.h index b7ca1423fe020..019fe608981c7 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -320,6 +320,7 @@ enum { CEPH_OSD_FLAG_ORDERSNAP = 64, /* EOLDSNAP if snapc is out of order */ CEPH_OSD_FLAG_PEERSTAT = 128, /* msg includes osd_peer_stat */ CEPH_OSD_FLAG_BALANCE_READS = 256, + CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */ }; #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index a18b3b7d7a040..320ae202556ab 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -35,21 +35,26 @@ public: pobject_t poid; __u8 acks_wanted; + + // op to exec vector ops; utime_t mtime; bool noop; - // subop metadata - tid_t rep_tid; - eversion_t version; - bool old_exists; __u64 old_size; eversion_t old_version; SnapSet snapset; SnapContext snapc; + + // transaction to exec + bufferlist logbl; + // subop metadata + tid_t rep_tid; + eversion_t version; + // piggybacked osd/og state eversion_t pg_trim_to; // primary->replica: trim to here osd_peer_stat_t peer_stat; @@ -76,6 +81,7 @@ public: ::decode(old_version, p); ::decode(snapset, p); ::decode(snapc, p); + ::decode(logbl, p); ::decode(pg_trim_to, p); ::decode(peer_stat, p); ::decode(attrset, p); @@ -99,6 +105,7 @@ public: ::encode(old_version, payload); ::encode(snapset, payload); ::encode(snapc, payload); + ::encode(logbl, payload); ::encode(pg_trim_to, payload); ::encode(peer_stat, payload); ::encode(attrset, payload); @@ -111,7 +118,7 @@ public: } - MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, vector& o, bool noop_, int aw, + MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, bool noop_, int aw, epoch_t mape, tid_t rtid, eversion_t v) : Message(MSG_OSD_SUBOP), map_epoch(mape), @@ -119,10 +126,10 @@ public: pgid(p), poid(po), acks_wanted(aw), - ops(o), noop(noop_), + noop(noop_), + old_exists(false), old_size(0), rep_tid(rtid), - version(v), - old_exists(false), old_size(0) + version(v) { memset(&peer_stat, 0, sizeof(peer_stat)); } diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index fc9999dab6bf5..f62ee66c8d29e 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -485,4 +485,6 @@ public: }; +WRITE_CLASS_ENCODER(ObjectStore::Transaction) + #endif diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 57e0e1b5784ff..826b10f1a55c6 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2078,8 +2078,9 @@ void PG::scrub() pobject_t poid; eversion_t v; osd_reqid_t reqid; - MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, scrub, false, 0, + MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0, osd->osdmap->get_epoch(), osd->get_tid(), v); + subop->ops = scrub; osd->messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()), osd->osdmap->get_inst(acting[i])); } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index ce3524b0057cb..3380c1881b29c 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -495,7 +495,7 @@ void ReplicatedPG::do_op(MOSDOp *op) assert(op->may_write()); - log_op(ctx); + log_op(ctx->log, ctx->local_t); } // continuing on to write path, make sure object context is registered @@ -1349,12 +1349,12 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) return result; } -void ReplicatedPG::log_op(OpContext *ctx) +void ReplicatedPG::log_op(vector& logv, ObjectStore::Transaction& t) { - dout(10) << "log_op " << ctx->reqid << " " << ctx->log << dendl; + dout(10) << "log_op " << log << dendl; // update the local pg, pg log - write_info(ctx->local_t); + write_info(t); // trim log? eversion_t trim_to = is_clean() ? peers_complete_thru : eversion_t(); @@ -1362,11 +1362,11 @@ void ReplicatedPG::log_op(OpContext *ctx) trim_to = peers_complete_thru; bufferlist log_bl; - for (vector::iterator p = ctx->log.begin(); - p != ctx->log.end(); + for (vector::iterator p = logv.begin(); + p != logv.end(); p++) add_log_entry(*p, log_bl); - append_log(ctx->local_t, log_bl, ctx->log[0].version, trim_to); + append_log(t, log_bl, logv[0].version, trim_to); } @@ -1576,19 +1576,32 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now, << " to osd" << dest << dendl; + MOSDOp *op = (MOSDOp *)repop->ctx->op; + // forward the write/update/whatever int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid, - repop->ctx->ops, repop->noop, acks_wanted, + repop->noop, acks_wanted, osd->osdmap->get_epoch(), repop->rep_tid, repop->ctx->at_version); - wr->mtime = repop->ctx->mtime; - wr->old_exists = old_exists; - wr->old_size = old_size; - wr->old_version = old_version; - wr->snapset = repop->obc->obs.oi.snapset; - wr->snapc = repop->ctx->snapc; - wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist + + if (op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) { + // replicate original op for parallel execution on replica + wr->ops = repop->ctx->ops; + wr->mtime = repop->ctx->mtime; + wr->old_exists = old_exists; + wr->old_size = old_size; + wr->old_version = old_version; + wr->snapset = repop->obc->obs.oi.snapset; + wr->snapc = repop->ctx->snapc; + wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist + } else { + // ship resulting transaction and log entries + wr->ops = repop->ctx->ops; // just fyi + ::encode(repop->ctx->op_t, wr->get_data()); + ::encode(repop->ctx->log, wr->logbl); + } + if (osd->osdmap->get_pg_size(info.pgid) == acting.size()) wr->pg_trim_to = peers_complete_thru; wr->peer_stat = osd->get_my_stat_for(now, dest); @@ -1974,49 +1987,64 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) // we better not be missing this. assert(!missing.is_missing(soid)); - // prepare our transaction - ObjectStore::Transaction t; - - // do op int ackerosd = acting[0]; osd->logger->inc(l_osd_r_wr); osd->logger->inc(l_osd_r_wrb, op->get_data().length()); - ObjectState obs(op->poid); - obs.oi.version = op->old_version; - obs.oi.snapset = op->snapset; - obs.exists = op->old_exists; - obs.size = op->old_size; - - OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &obs); + list tls; + OpContext *ctx = 0; if (!op->noop) { - ctx.mtime = op->mtime; - ctx.at_version = op->version; - ctx.snapc = op->snapc; + if (op->logbl.length()) { + // shipped transaction and log entries + ObjectStore::Transaction opt, localt; + vector log; + + bufferlist::iterator p = op->get_data().begin(); + ::decode(opt, p); + p = op->logbl.begin(); + ::decode(log, p); + + log_op(log, localt); + } else { + // do op + ObjectState obs(op->poid); + obs.oi.version = op->old_version; + obs.oi.snapset = op->snapset; + obs.exists = op->old_exists; + obs.size = op->old_size; + + ctx = new OpContext(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &obs); + + ctx->mtime = op->mtime; + ctx->at_version = op->version; + ctx->snapc = op->snapc; + + prepare_transaction(ctx); + log_op(ctx->log, ctx->local_t); - prepare_transaction(&ctx); - log_op(&ctx); + tls.push_back(&ctx->op_t); + tls.push_back(&ctx->local_t); + } } - - C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete); - - // apply log update. and possibly update itself. - list tls; - tls.push_back(&ctx.op_t); - tls.push_back(&ctx.local_t); + + C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, + info.last_complete); unsigned r = osd->store->apply_transactions(tls, oncommit); if (r) { derr(0) << "error applying transaction: r = " << r << dendl; } - + + // ack myself. + oncommit->ack(); + + delete ctx; + // send ack to acker MOSDSubOpReply *ack = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); ack->set_peer_stat(osd->get_my_stat_for(g_clock.now(), ackerosd)); osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd)); - // ack myself. - oncommit->ack(); } void ReplicatedPG::sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete) @@ -2235,10 +2263,10 @@ bool ReplicatedPG::pull(sobject_t soid) // send op osd_reqid_t rid; tid_t tid = osd->get_tid(); - vector pull(1); - pull[0].op = CEPH_OSD_OP_PULL; - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, pull, false, CEPH_OSD_FLAG_ACK, + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, CEPH_OSD_FLAG_ACK, osd->osdmap->get_epoch(), tid, v); + subop->ops = vector(1); + subop->ops[0].op = CEPH_OSD_OP_PULL; subop->data_subset.swap(data_subset); // do not include clone_subsets in pull request; we will recalculate this // when the object is pushed back. @@ -2373,12 +2401,12 @@ void ReplicatedPG::push(sobject_t soid, int peer, // send osd_reqid_t rid; // useless? - vector push(1); - push[0].op = CEPH_OSD_OP_PUSH; - push[0].offset = 0; - push[0].length = size; - MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, push, false, 0, + MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, osd->osdmap->get_epoch(), osd->get_tid(), oi.version); + subop->ops = vector(1); + subop->ops[0].op = CEPH_OSD_OP_PUSH; + subop->ops[0].offset = 0; + subop->ops[0].length = size; subop->data_subset.swap(data_subset); subop->clone_subsets.swap(clone_subsets); subop->set_data(bl); // note: claims bl, set length above here! diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 07b48363a85b7..da3855359354c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -404,7 +404,7 @@ protected: void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st); int prepare_transaction(OpContext *ctx); - void log_op(OpContext *ctx); + void log_op(vector& log, ObjectStore::Transaction& t); friend class C_OSD_OpCommit; friend class C_OSD_RepModifyCommit; -- 2.39.5