From: Samuel Just Date: Tue, 19 Jan 2016 23:33:02 +0000 (-0800) Subject: ReplicatedPG: remove OpContext from Repop X-Git-Tag: v10.1.0~277^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=257225d301b088e7dfb15a4dc110d1e30727c8b3;p=ceph.git ReplicatedPG: remove OpContext from Repop Signed-off-by: Samuel Just --- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 43d5bae9accd..394a07ce0e7b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -3019,16 +3019,17 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) ctx->op ? ctx->op->get_req()->get_connection() : ConnectionRef()); }); + ctx->register_on_finish( + [ctx, this]() { + delete ctx; + }); // issue replica writes ceph_tid_t rep_tid = osd->get_tid(); - RepGather *repop = new_repop(ctx, obc, rep_tid); // new repop claims our obc, src_obc refs - // note: repop now owns ctx AND ctx->op - - repop->src_obc.swap(src_obc); // and src_obc. - issue_repop(repop); + RepGather *repop = new_repop(ctx, obc, rep_tid); + issue_repop(repop, ctx); eval_repop(repop); repop->put(); } @@ -8262,8 +8263,8 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version) void ReplicatedPG::eval_repop(RepGather *repop) { MOSDOp *m = NULL; - if (repop->ctx->op) - m = static_cast(repop->ctx->op->get_req()); + if (repop->op) + m = static_cast(repop->op->get_req()); if (m) dout(10) << "eval_repop " << *repop @@ -8293,7 +8294,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) waiting_for_ondisk[repop->v].begin(); i != waiting_for_ondisk[repop->v].end(); ++i) { - osd->reply_op_error(i->first, 0, repop->ctx->at_version, + osd->reply_op_error(i->first, 0, repop->v, i->second); } waiting_for_ondisk.erase(repop->v); @@ -8325,7 +8326,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) ++i) { MOSDOp *m = static_cast(i->first->get_req()); MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true); - reply->set_reply_versions(repop->ctx->at_version, + reply->set_reply_versions(repop->v, i->second); reply->add_flags(CEPH_OSD_FLAG_ACK); osd->send_message_osd_client(reply, m->get_connection()); @@ -8359,9 +8360,8 @@ void ReplicatedPG::eval_repop(RepGather *repop) } } -void ReplicatedPG::issue_repop(RepGather *repop) +void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx) { - OpContext *ctx = repop->ctx; const hobject_t& soid = ctx->obs->oi.soid; if (ctx->op && ((static_cast( @@ -8387,22 +8387,22 @@ void ReplicatedPG::issue_repop(RepGather *repop) } } - repop->obc->ondisk_write_lock(); - if (repop->ctx->clone_obc) - repop->ctx->clone_obc->ondisk_write_lock(); + ctx->obc->ondisk_write_lock(); + if (ctx->clone_obc) + ctx->clone_obc->ondisk_write_lock(); bool unlock_snapset_obc = false; - if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid != - repop->obc->obs.oi.soid) { - repop->ctx->snapset_obc->ondisk_write_lock(); + if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid != + ctx->obc->obs.oi.soid) { + ctx->snapset_obc->ondisk_write_lock(); unlock_snapset_obc = true; } - repop->ctx->apply_pending_attrs(); + ctx->apply_pending_attrs(); if (pool.info.require_rollback()) { - for (vector::iterator i = repop->ctx->log.begin(); - i != repop->ctx->log.end(); + for (vector::iterator i = ctx->log.begin(); + i != ctx->log.end(); ++i) { assert(i->mod_desc.can_rollback()); assert(!i->mod_desc.empty()); @@ -8412,23 +8412,23 @@ void ReplicatedPG::issue_repop(RepGather *repop) Context *on_all_commit = new C_OSD_RepopCommit(this, repop); Context *on_all_applied = new C_OSD_RepopApplied(this, repop); Context *onapplied_sync = new C_OSD_OndiskWriteUnlock( - repop->obc, - repop->ctx->clone_obc, - unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef()); + ctx->obc, + ctx->clone_obc, + unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef()); pgbackend->submit_transaction( soid, - repop->ctx->at_version, - std::move(repop->ctx->op_t), + ctx->at_version, + std::move(ctx->op_t), pg_trim_to, min_last_complete_ondisk, - repop->ctx->log, - repop->ctx->updated_hset_history, + ctx->log, + ctx->updated_hset_history, onapplied_sync, on_all_applied, on_all_commit, repop->rep_tid, - repop->ctx->reqid, - repop->ctx->op); + ctx->reqid, + ctx->op); } ReplicatedPG::RepGather *ReplicatedPG::new_repop( @@ -8440,7 +8440,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop( else dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl; - RepGather *repop = new RepGather(ctx, obc, rep_tid, info.last_complete); + RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete); repop->start = ceph_clock_now(cct); @@ -8455,12 +8455,6 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop( void ReplicatedPG::remove_repop(RepGather *repop) { dout(20) << __func__ << " " << *repop << dendl; - assert(repop->ctx->obc); - dout(20) << " obc " << *repop->ctx->obc << dendl; - if (repop->ctx->clone_obc) - dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl; - if (repop->ctx->snapset_obc) - dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl; for (auto p = repop->on_finish.begin(); p != repop->on_finish.end(); @@ -8469,8 +8463,7 @@ void ReplicatedPG::remove_repop(RepGather *repop) } release_object_locks( - repop->ctx->obc->obs.oi.soid.get_head(), - repop->ctx->lock_manager); + repop->lock_manager); repop->put(); osd->logger->dec(l_osd_op_wip); @@ -8492,7 +8485,7 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx) { RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid); dout(20) << __func__ << " " << repop << dendl; - issue_repop(repop); + issue_repop(repop, ctx.get()); eval_repop(repop); repop->put(); } @@ -9619,10 +9612,10 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) repop->on_success.clear(); if (requeue) { - if (repop->ctx->op) { - dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl; - rq.push_back(repop->ctx->op); - repop->ctx->op = OpRequestRef(); + if (repop->op) { + dout(10) << " requeuing " << *repop->op->get_req() << dendl; + rq.push_back(repop->op); + repop->op = OpRequestRef(); } // also requeue any dups, interleaved into position diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index d2b77f3f7f78..b04ec26262f4 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -696,15 +696,13 @@ public: */ class RepGather { public: + hobject_t hoid; + OpRequestRef op; xlist::item queue_item; int nref; eversion_t v; - OpContext *ctx; - ObjectContextRef obc; - map src_obc; - ceph_tid_t rep_tid; bool rep_aborted, rep_done; @@ -716,20 +714,24 @@ public: eversion_t pg_local_last_complete; + ObcLockManager lock_manager; + list> on_applied; list> on_committed; list> on_success; list> on_finish; - RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt, + RepGather(OpContext *c, ceph_tid_t rt, eversion_t lc) : + hoid(c->obc->obs.oi.soid), + op(c->op), queue_item(this), nref(1), - ctx(c), obc(pi), rep_tid(rt), rep_aborted(false), rep_done(false), all_applied(false), all_committed(false), pg_local_last_complete(lc), + lock_manager(std::move(c->lock_manager)), on_applied(std::move(c->on_applied)), on_committed(std::move(c->on_committed)), on_success(std::move(c->on_success)), @@ -742,7 +744,6 @@ public: void put() { assert(nref > 0); if (--nref == 0) { - delete ctx; // must already be unlocked assert(on_applied.empty()); delete this; //generic_dout(0) << "deleting " << this << dendl; @@ -860,7 +861,7 @@ protected: void repop_all_applied(RepGather *repop); void repop_all_committed(RepGather *repop); void eval_repop(RepGather*); - void issue_repop(RepGather *repop); + void issue_repop(RepGather *repop, OpContext *ctx); RepGather *new_repop( OpContext *ctx, ObjectContextRef obc, @@ -1636,10 +1637,6 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) << " rep_tid=" << repop.rep_tid << " committed?=" << repop.all_committed << " applied?=" << repop.all_applied; - if (repop.ctx->lock_type != ObjectContext::RWState::RWNONE) - out << " lock=" << (int)repop.ctx->lock_type; - if (repop.ctx->op) - out << " op=" << *(repop.ctx->op->get_req()); out << ")"; return out; }