]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: remove OpContext from Repop
authorSamuel Just <sjust@redhat.com>
Tue, 19 Jan 2016 23:33:02 +0000 (15:33 -0800)
committerSamuel Just <sjust@redhat.com>
Thu, 25 Feb 2016 18:56:41 +0000 (10:56 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 43d5bae9accd524683458e582e6a8892b6546d19..394a07ce0e7bfd25fe3cd4a63b486c0153f994f8 100644 (file)
@@ -3019,16 +3019,17 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
        ctx->op ? ctx->op->get_req()->get_connection() :
        ConnectionRef());
     });
+  ctx->register_on_finish(
+    [ctx, this]() {
+      delete ctx;
+    });
 
   // issue replica writes
   ceph_tid_t rep_tid = osd->get_tid();
-  RepGather *repop = new_repop(ctx, obc, rep_tid);  // new repop claims our obc, src_obc refs
-  // note: repop now owns ctx AND ctx->op
-
-  repop->src_obc.swap(src_obc); // and src_obc.
 
-  issue_repop(repop);
+  RepGather *repop = new_repop(ctx, obc, rep_tid);
 
+  issue_repop(repop, ctx);
   eval_repop(repop);
   repop->put();
 }
@@ -8262,8 +8263,8 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version)
 void ReplicatedPG::eval_repop(RepGather *repop)
 {
   MOSDOp *m = NULL;
-  if (repop->ctx->op)
-    m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
+  if (repop->op)
+    m = static_cast<MOSDOp *>(repop->op->get_req());
 
   if (m)
     dout(10) << "eval_repop " << *repop
@@ -8293,7 +8294,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
             waiting_for_ondisk[repop->v].begin();
           i != waiting_for_ondisk[repop->v].end();
           ++i) {
-       osd->reply_op_error(i->first, 0, repop->ctx->at_version,
+       osd->reply_op_error(i->first, 0, repop->v,
                            i->second);
       }
       waiting_for_ondisk.erase(repop->v);
@@ -8325,7 +8326,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
           ++i) {
        MOSDOp *m = static_cast<MOSDOp*>(i->first->get_req());
        MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
-       reply->set_reply_versions(repop->ctx->at_version,
+       reply->set_reply_versions(repop->v,
                                  i->second);
        reply->add_flags(CEPH_OSD_FLAG_ACK);
        osd->send_message_osd_client(reply, m->get_connection());
@@ -8359,9 +8360,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   }
 }
 
-void ReplicatedPG::issue_repop(RepGather *repop)
+void ReplicatedPG::issue_repop(RepGather *repop, OpContext *ctx)
 {
-  OpContext *ctx = repop->ctx;
   const hobject_t& soid = ctx->obs->oi.soid;
   if (ctx->op &&
     ((static_cast<MOSDOp *>(
@@ -8387,22 +8387,22 @@ void ReplicatedPG::issue_repop(RepGather *repop)
     }
   }
 
-  repop->obc->ondisk_write_lock();
-  if (repop->ctx->clone_obc)
-    repop->ctx->clone_obc->ondisk_write_lock();
+  ctx->obc->ondisk_write_lock();
+  if (ctx->clone_obc)
+    ctx->clone_obc->ondisk_write_lock();
 
   bool unlock_snapset_obc = false;
-  if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
-      repop->obc->obs.oi.soid) {
-    repop->ctx->snapset_obc->ondisk_write_lock();
+  if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid !=
+      ctx->obc->obs.oi.soid) {
+    ctx->snapset_obc->ondisk_write_lock();
     unlock_snapset_obc = true;
   }
 
-  repop->ctx->apply_pending_attrs();
+  ctx->apply_pending_attrs();
 
   if (pool.info.require_rollback()) {
-    for (vector<pg_log_entry_t>::iterator i = repop->ctx->log.begin();
-        i != repop->ctx->log.end();
+    for (vector<pg_log_entry_t>::iterator i = ctx->log.begin();
+        i != ctx->log.end();
         ++i) {
       assert(i->mod_desc.can_rollback());
       assert(!i->mod_desc.empty());
@@ -8412,23 +8412,23 @@ void ReplicatedPG::issue_repop(RepGather *repop)
   Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
   Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
   Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
-    repop->obc,
-    repop->ctx->clone_obc,
-    unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
+    ctx->obc,
+    ctx->clone_obc,
+    unlock_snapset_obc ? ctx->snapset_obc : ObjectContextRef());
   pgbackend->submit_transaction(
     soid,
-    repop->ctx->at_version,
-    std::move(repop->ctx->op_t),
+    ctx->at_version,
+    std::move(ctx->op_t),
     pg_trim_to,
     min_last_complete_ondisk,
-    repop->ctx->log,
-    repop->ctx->updated_hset_history,
+    ctx->log,
+    ctx->updated_hset_history,
     onapplied_sync,
     on_all_applied,
     on_all_commit,
     repop->rep_tid,
-    repop->ctx->reqid,
-    repop->ctx->op);
+    ctx->reqid,
+    ctx->op);
 }
 
 ReplicatedPG::RepGather *ReplicatedPG::new_repop(
@@ -8440,7 +8440,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(
   else
     dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
 
-  RepGather *repop = new RepGather(ctx, obc, rep_tid, info.last_complete);
+  RepGather *repop = new RepGather(ctx, rep_tid, info.last_complete);
 
   repop->start = ceph_clock_now(cct);
 
@@ -8455,12 +8455,6 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(
 void ReplicatedPG::remove_repop(RepGather *repop)
 {
   dout(20) << __func__ << " " << *repop << dendl;
-  assert(repop->ctx->obc);
-  dout(20) << " obc " << *repop->ctx->obc << dendl;
-  if (repop->ctx->clone_obc)
-    dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl;
-  if (repop->ctx->snapset_obc)
-    dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl;
 
   for (auto p = repop->on_finish.begin();
        p != repop->on_finish.end();
@@ -8469,8 +8463,7 @@ void ReplicatedPG::remove_repop(RepGather *repop)
   }
 
   release_object_locks(
-    repop->ctx->obc->obs.oi.soid.get_head(),
-    repop->ctx->lock_manager);
+    repop->lock_manager);
   repop->put();
 
   osd->logger->dec(l_osd_op_wip);
@@ -8492,7 +8485,7 @@ void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
 {
   RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid);
   dout(20) << __func__ << " " << repop << dendl;
-  issue_repop(repop);
+  issue_repop(repop, ctx.get());
   eval_repop(repop);
   repop->put();
 }
@@ -9619,10 +9612,10 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     repop->on_success.clear();
 
     if (requeue) {
-      if (repop->ctx->op) {
-       dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl;
-       rq.push_back(repop->ctx->op);
-       repop->ctx->op = OpRequestRef();
+      if (repop->op) {
+       dout(10) << " requeuing " << *repop->op->get_req() << dendl;
+       rq.push_back(repop->op);
+       repop->op = OpRequestRef();
       }
 
       // also requeue any dups, interleaved into position
index d2b77f3f7f7855f6e6505ec74cf72afb0e37bb3c..b04ec26262f4facfd1bfbc1d7c713214b65b9859 100644 (file)
@@ -696,15 +696,13 @@ public:
    */
   class RepGather {
   public:
+    hobject_t hoid;
+    OpRequestRef op;
     xlist<RepGather*>::item queue_item;
     int nref;
 
     eversion_t v;
 
-    OpContext *ctx;
-    ObjectContextRef obc;
-    map<hobject_t,ObjectContextRef, hobject_t::BitwiseComparator> src_obc;
-
     ceph_tid_t rep_tid;
 
     bool rep_aborted, rep_done;
@@ -716,20 +714,24 @@ public:
     
     eversion_t          pg_local_last_complete;
 
+    ObcLockManager lock_manager;
+
     list<std::function<void()>> on_applied;
     list<std::function<void()>> on_committed;
     list<std::function<void()>> on_success;
     list<std::function<void()>> on_finish;
     
-    RepGather(OpContext *c, ObjectContextRef pi, ceph_tid_t rt,
+    RepGather(OpContext *c, ceph_tid_t rt,
              eversion_t lc) :
+      hoid(c->obc->obs.oi.soid),
+      op(c->op),
       queue_item(this),
       nref(1),
-      ctx(c), obc(pi),
       rep_tid(rt), 
       rep_aborted(false), rep_done(false),
       all_applied(false), all_committed(false),
       pg_local_last_complete(lc),
+      lock_manager(std::move(c->lock_manager)),
       on_applied(std::move(c->on_applied)),
       on_committed(std::move(c->on_committed)),
       on_success(std::move(c->on_success)),
@@ -742,7 +744,6 @@ public:
     void put() {
       assert(nref > 0);
       if (--nref == 0) {
-       delete ctx; // must already be unlocked
        assert(on_applied.empty());
        delete this;
        //generic_dout(0) << "deleting " << this << dendl;
@@ -860,7 +861,7 @@ protected:
   void repop_all_applied(RepGather *repop);
   void repop_all_committed(RepGather *repop);
   void eval_repop(RepGather*);
-  void issue_repop(RepGather *repop);
+  void issue_repop(RepGather *repop, OpContext *ctx);
   RepGather *new_repop(
     OpContext *ctx,
     ObjectContextRef obc,
@@ -1636,10 +1637,6 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
       << " rep_tid=" << repop.rep_tid 
       << " committed?=" << repop.all_committed
       << " applied?=" << repop.all_applied;
-  if (repop.ctx->lock_type != ObjectContext::RWState::RWNONE)
-    out << " lock=" << (int)repop.ctx->lock_type;
-  if (repop.ctx->op)
-    out << " op=" << *(repop.ctx->op->get_req());
   out << ")";
   return out;
 }