From 0602e555eea50c000e3913c385963cde0421d61c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 May 2009 09:52:24 -0700 Subject: [PATCH] osd: ProjectedObject -> ObjectContext; object access mode state notes --- src/osd/ReplicatedPG.cc | 84 ++++++++++++++++++++--------------------- src/osd/ReplicatedPG.h | 61 ++++++++++++++++++++++++------ 2 files changed, 91 insertions(+), 54 deletions(-) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d969a433f8e16..c2c564755b43a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1441,8 +1441,8 @@ void ReplicatedPG::apply_repop(RepGather *repop) repop->applied = true; - put_projected_object(repop->pinfo); - repop->pinfo = 0; + put_object_context(repop->obc); + repop->obc = 0; update_stats(); @@ -1574,10 +1574,10 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) osd->osdmap->get_epoch(), repop->rep_tid, repop->ctx->at_version); wr->mtime = repop->ctx->mtime; - wr->old_exists = repop->pinfo->exists; - wr->old_size = repop->pinfo->size; - wr->old_version = repop->pinfo->oi.version; - wr->snapset = repop->pinfo->oi.snapset; + wr->old_exists = repop->obc->exists; + wr->old_size = repop->obc->size; + wr->old_version = repop->obc->oi.version; + wr->snapset = repop->obc->oi.snapset; wr->snapc = repop->ctx->snapc; wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist if (osd->osdmap->get_pg_size(info.pgid) == acting.size()) @@ -1586,12 +1586,12 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now) osd->messenger->send_message(wr, osd->osdmap->get_inst(dest)); } -ReplicatedPG::RepGather *ReplicatedPG::new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo, +ReplicatedPG::RepGather *ReplicatedPG::new_repop(WriteOpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid) { dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl; - RepGather *repop = new RepGather(ctx, pinfo, noop, rep_tid, info.last_complete); + RepGather *repop = new RepGather(ctx, obc, noop, rep_tid, info.last_complete); // initialize gather sets for (unsigned i=0; iref++; + ObjectContext *obc = &object_contexts[poid]; + obc->ref++; - if (pinfo->ref > 1) { - dout(10) << "get_projected_object " << poid << " " - << (pinfo->ref-1) << " -> " << pinfo->ref << dendl; - return pinfo; // already had it + if (obc->ref > 1) { + dout(10) << "get_object_context " << poid << " " + << (obc->ref-1) << " -> " << obc->ref << dendl; + return obc; // already had it } // pull info off disk - pinfo->poid = poid; + obc->poid = poid; struct stat st; int r = osd->store->stat(info.pgid.to_coll(), poid, &st); if (r == 0) { - pinfo->exists = true; - pinfo->size = st.st_size; + obc->exists = true; + obc->size = st.st_size; bufferlist bv; r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv); assert(r >= 0); - pinfo->oi.decode(bv); + obc->oi.decode(bv); } else { - pinfo->oi.soid = poid; - pinfo->exists = false; - pinfo->size = 0; + obc->oi.soid = poid; + obc->exists = false; + obc->size = 0; } - dout(10) << "get_projected_object " << poid << " read " << pinfo->oi << dendl; - return pinfo; + dout(10) << "get_object_context " << poid << " read " << obc->oi << dendl; + return obc; } -void ReplicatedPG::put_projected_object(ProjectedObjectInfo *pinfo) +void ReplicatedPG::put_object_context(ObjectContext *obc) { - dout(10) << "put_projected_object " << pinfo->poid << " " - << pinfo->ref << " -> " << (pinfo->ref-1) << dendl; + dout(10) << "put_object_context " << obc->poid << " " + << obc->ref << " -> " << (obc->ref-1) << dendl; - --pinfo->ref; - if (pinfo->ref == 0) { - projected_objects.erase(pinfo->poid); + --obc->ref; + if (obc->ref == 0) { + object_contexts.erase(obc->poid); - if (projected_objects.empty()) + if (object_contexts.empty()) kick(); } } @@ -1742,15 +1742,15 @@ void ReplicatedPG::op_modify(MOSDOp *op) #endif // get existing object info - ProjectedObjectInfo *pinfo = get_projected_object(soid); - ctx->poi = &pinfo->oi; + ObjectContext *obc = get_object_context(soid); + ctx->poi = &obc->oi; // --- locking --- // wrlock? if (!ctx->ops.empty() && // except noop; we just want to flush - block_if_wrlocked(op, pinfo->oi)) { - put_projected_object(pinfo); + block_if_wrlocked(op, obc->oi)) { + put_object_context(obc); delete ctx; return; // op will be handled later, after the object unlocks } @@ -1788,18 +1788,18 @@ void ReplicatedPG::op_modify(MOSDOp *op) dout(10) << "op_modify " << opname << " " << soid - << " ov " << pinfo->oi.version << " av " << ctx->at_version + << " ov " << obc->oi.version << " av " << ctx->at_version << " snapc " << ctx->snapc - << " snapset " << pinfo->oi.snapset + << " snapset " << obc->oi.snapset << dendl; // verify snap ordering if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) && - ctx->snapc.seq < pinfo->oi.snapset.seq) { + ctx->snapc.seq < obc->oi.snapset.seq) { dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq - << " < snapset seq " << pinfo->oi.snapset.seq + << " < snapset seq " << obc->oi.snapset.seq << " on " << soid << dendl; - put_projected_object(pinfo); + put_object_context(obc); delete ctx; osd->reply_op_error(op, -EOLDSNAPC); return; @@ -1827,7 +1827,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) // issue replica writes tid_t rep_tid = osd->get_tid(); - RepGather *repop = new_repop(ctx, pinfo, noop, rep_tid); + RepGather *repop = new_repop(ctx, obc, noop, rep_tid); for (unsigned i=1; iexists, pinfo->size, trim_to); + prepare_transaction(ctx, obc->exists, obc->size, trim_to); } // keep peer_info up to date diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 1814043af8aa6..4a5c03db66c8f 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -39,13 +39,51 @@ public: }; + /* + object access states: + + - idle + - no in-progress or waiting writes. + - read: ok + - write: move to 'delayed' or 'rmw' + - rmw: move to 'rmw' + + - delayed + - delayed write in progress. delay write application on primary. + - when done, move to 'idle' + - read: ok + - write: ok + - rmw: move to 'delayed-flushing' + + - delayed-flushing + - waiting for delayed writes to flush, then move to 'rmw' + - read, write, rmw: wait + + - rmw + - rmw cycles in flight. applied immediately at primary. + - when done, move to 'idle' + - read: same client ok. otherwise, move to 'rmw-flushing' + - write: ok + - rmw: same client ok. otherwise, wait for rmw to flush + + - rmw-flushing + - waiting for rmw to flush, then move to 'idle' + - read, write, rmw: wait + + */ + + /* * keep tabs on object modifications that are in flight. * we need to know the projected existence, size, snapset, * etc., because we don't send writes down to disk until after * replicas ack. */ - struct ProjectedObjectInfo { + struct ObjectContext { + enum { + IDLE, DELAYED, DELAYED_FLUSHING, RMW, RMW_FLUSHING + } state; + int ref; sobject_t poid; @@ -54,7 +92,7 @@ public: object_info_t oi; - ProjectedObjectInfo() : ref(0), exists(false), size(0), oi(poid) {} + ObjectContext() : state(IDLE), ref(0), exists(false), size(0), oi(poid) {} }; /* @@ -71,7 +109,6 @@ public: SnapContext snapc; // writer snap context - //ProjectedObjectInfo *pinfo; // projected object state object_info_t *poi; eversion_t at_version; // pg's current version pointer @@ -92,7 +129,7 @@ public: int nref; WriteOpContext *ctx; - ProjectedObjectInfo *pinfo; + ObjectContext *obc; tid_t rep_tid; bool noop; @@ -109,11 +146,11 @@ public: eversion_t pg_local_last_complete; map pg_complete_thru; - RepGather(WriteOpContext *c, ProjectedObjectInfo *pi, bool noop_, tid_t rt, + RepGather(WriteOpContext *c, ObjectContext *pi, bool noop_, tid_t rt, eversion_t lc) : queue_item(this), nref(1), - ctx(c), pinfo(pi), + ctx(c), obc(pi), rep_tid(rt), noop(noop_), applied(false), aborted(false), @@ -145,7 +182,7 @@ public: void put() { assert(nref > 0); if (--nref == 0) { - assert(!pinfo); + assert(!obc); delete ctx; delete this; //generic_dout(0) << "deleting " << this << dendl; @@ -164,20 +201,20 @@ protected: void apply_repop(RepGather *repop); void eval_repop(RepGather*); void issue_repop(RepGather *repop, int dest, utime_t now); - RepGather *new_repop(WriteOpContext *ctx, ProjectedObjectInfo *pinfo, bool noop, tid_t rep_tid); + RepGather *new_repop(WriteOpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid); void repop_ack(RepGather *repop, int result, int ack_type, int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); // projected object info - map projected_objects; + map object_contexts; - ProjectedObjectInfo *get_projected_object(sobject_t poid); - void put_projected_object(ProjectedObjectInfo *pinfo); + ObjectContext *get_object_context(sobject_t poid); + void put_object_context(ObjectContext *obc); bool is_write_in_progress() { - return !projected_objects.empty(); + return !object_contexts.empty(); } // load balancing -- 2.39.5