]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: move client op handling into ReplicatedBackend
authorSamuel Just <sam.just@inktank.com>
Thu, 21 Nov 2013 00:17:36 +0000 (16:17 -0800)
committerSamuel Just <sam.just@inktank.com>
Wed, 22 Jan 2014 22:39:15 +0000 (14:39 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 30804fdbc41dbbdc3e4dedda0d27dd3b5e9a474f..1d7a682fdac9d3be703a756e8f4abd422d67868b 100644 (file)
        const hobject_t &hoid,
        map<string, bufferptr> &attrs) = 0;
 
-     virtual void op_applied_replica(
+     virtual void op_applied(
        const eversion_t &applied_version) = 0;
 
      virtual bool should_send_op(
        int peer,
        const hobject_t &hoid) = 0;
 
+     virtual void log_operation(
+       vector<pg_log_entry_t> &logv,
+       const eversion_t &trim_to,
+       bool update_snaps,
+       ObjectStore::Transaction *t) = 0;
+
+     virtual void update_peer_last_complete_ondisk(
+       int fromosd,
+       eversion_t lcod) = 0;
+
+     virtual void update_last_complete_ondisk(
+       eversion_t lcod) = 0;
+
+     virtual void update_stats(
+       const pg_stat_t &stat) = 0;
+
      virtual ~Listener() {}
    };
    Listener *parent;
index 6d893b4e3e8f558231dc4b2e14afd47450c7a96b..a7fdeb0e5556b19f63dc356c4f6bb4f8275f76b0 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 #include "ReplicatedBackend.h"
+#include "messages/MOSDOp.h"
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
 #include "messages/MOSDPGPush.h"
@@ -148,6 +149,8 @@ bool ReplicatedBackend::handle_message(
       default:
        break;
       }
+    } else {
+      sub_op_modify(op);
     }
     break;
   }
@@ -162,6 +165,8 @@ bool ReplicatedBackend::handle_message(
        sub_op_push_reply(op);
        return true;
       }
+    } else {
+      sub_op_modify_reply(op);
     }
     break;
   }
@@ -192,6 +197,14 @@ void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
     t->remove(get_temp_coll(t), *i);
   }
   temp_contents.clear();
+  for (map<tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
+       i != in_progress_ops.end();
+       in_progress_ops.erase(i++)) {
+    if (i->second.on_commit)
+      delete i->second.on_commit;
+    if (i->second.on_applied)
+      delete i->second.on_applied;
+  }
   clear_state();
 }
 
@@ -480,6 +493,9 @@ public:
   bool empty() const {
     return t->empty();
   }
+  uint64_t get_bytes_written() const {
+    return t->get_encoded_bytes();
+  }
   ~RPGTransaction() { delete t; }
 };
 
@@ -488,13 +504,202 @@ PGBackend::PGTransaction *ReplicatedBackend::get_transaction()
   return new RPGTransaction(coll, get_temp_coll());
 }
 
+class C_OSD_OnOpCommit : public Context {
+  ReplicatedBackend *pg;
+  ReplicatedBackend::InProgressOp *op;
+public:
+  C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
+    : pg(pg), op(op) {}
+  void finish(int) {
+    pg->op_commit(op);
+  }
+};
+
+class C_OSD_OnOpApplied : public Context {
+  ReplicatedBackend *pg;
+  ReplicatedBackend::InProgressOp *op;
+public:
+  C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
+    : pg(pg), op(op) {}
+  void finish(int) {
+    pg->op_applied(op);
+  }
+};
+
 void ReplicatedBackend::submit_transaction(
+  const hobject_t &soid,
+  const eversion_t &at_version,
   PGTransaction *_t,
+  const eversion_t &trim_to,
   vector<pg_log_entry_t> &log_entries,
+  Context *on_local_applied_sync,
   Context *on_all_acked,
   Context *on_all_commit,
-  tid_t tid)
+  tid_t tid,
+  osd_reqid_t reqid,
+  OpRequestRef orig_op)
+{
+  RPGTransaction *t = dynamic_cast<RPGTransaction*>(_t);
+  ObjectStore::Transaction *op_t = t->get_transaction();
+
+  assert(t->get_temp_added().size() <= 1);
+  assert(t->get_temp_cleared().size() <= 1);
+
+  assert(!in_progress_ops.count(tid));
+  InProgressOp &op = in_progress_ops.insert(
+    make_pair(
+      tid,
+      InProgressOp(
+       tid, on_all_commit, on_all_acked,
+       orig_op, at_version)
+      )
+    ).first->second;
+
+  issue_op(
+    soid,
+    at_version,
+    tid,
+    reqid,
+    trim_to,
+    t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(),
+    t->get_temp_cleared().size() ?
+      *(t->get_temp_cleared().begin()) :hobject_t(),
+    log_entries,
+    &op,
+    op_t);
+
+  // add myself to gather set
+  op.waiting_for_applied.insert(osd->whoami);
+  op.waiting_for_commit.insert(osd->whoami);
+
+  ObjectStore::Transaction local_t;
+  if (t->get_temp_added().size()) {
+    get_temp_coll(&local_t);
+    temp_contents.insert(t->get_temp_added().begin(), t->get_temp_added().end());
+  }
+  for (set<hobject_t>::const_iterator i = t->get_temp_cleared().begin();
+       i != t->get_temp_cleared().end();
+       ++i) {
+    temp_contents.erase(*i);
+  }
+  parent->log_operation(log_entries, trim_to, true, &local_t);
+  local_t.append(*op_t);
+  local_t.swap(*op_t);
+  
+  op_t->register_on_applied_sync(on_local_applied_sync);
+  op_t->register_on_applied(
+    parent->bless_context(
+      new C_OSD_OnOpApplied(this, &op)));
+  op_t->register_on_applied(
+    new ObjectStore::C_DeleteTransaction(op_t));
+  op_t->register_on_commit(
+    parent->bless_context(
+      new C_OSD_OnOpCommit(this, &op)));
+      
+  parent->queue_transaction(op_t, op.op);
+  delete t;
+}
+
+void ReplicatedBackend::op_applied(
+  InProgressOp *op)
 {
-  //RPGTransaction *t = dynamic_cast<RPGTransaction*>(_t);
-  return;
+  dout(10) << __func__ << ": " << op->tid << dendl;
+  if (op->op)
+    op->op->mark_event("op_applied");
+
+  op->waiting_for_applied.erase(osd->whoami);
+  parent->op_applied(op->v);
+
+  if (op->waiting_for_applied.empty()) {
+    op->on_applied->complete(0);
+    op->on_applied = 0;
+  }
+  if (op->done()) {
+    assert(!op->on_commit && !op->on_applied);
+    in_progress_ops.erase(op->tid);
+  }
+}
+
+void ReplicatedBackend::op_commit(
+  InProgressOp *op)
+{
+  dout(10) << __func__ << ": " << op->tid << dendl;
+  if (op->op)
+    op->op->mark_event("op_commit");
+
+  op->waiting_for_commit.erase(osd->whoami);
+
+  if (op->waiting_for_commit.empty()) {
+    op->on_commit->complete(0);
+    op->on_commit = 0;
+  }
+  if (op->done()) {
+    assert(!op->on_commit && !op->on_applied);
+    in_progress_ops.erase(op->tid);
+  }
+}
+
+void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
+{
+  MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
+  assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
+
+  op->mark_started();
+
+  // must be replication.
+  tid_t rep_tid = r->get_tid();
+  int fromosd = r->get_source().num();
+
+  if (in_progress_ops.count(rep_tid)) {
+    map<tid_t, InProgressOp>::iterator iter =
+      in_progress_ops.find(rep_tid);
+    InProgressOp &ip_op = iter->second;
+    MOSDOp *m;
+    if (ip_op.op)
+      m = static_cast<MOSDOp *>(ip_op.op->get_req());
+
+    if (m)
+      dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
+             << " ack_type " << r->ack_type
+             << " from osd." << fromosd
+             << dendl;
+    else
+      dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
+             << " ack_type " << r->ack_type
+             << " from osd." << fromosd
+             << dendl;
+
+    // oh, good.
+
+    if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
+      assert(ip_op.waiting_for_commit.count(fromosd));
+      ip_op.waiting_for_commit.erase(fromosd);
+      if (ip_op.op)
+       ip_op.op->mark_event("sub_op_commit_rec");
+    } else {
+      assert(ip_op.waiting_for_applied.count(fromosd));
+      if (ip_op.op)
+       ip_op.op->mark_event("sub_op_applied_rec");
+    }
+    ip_op.waiting_for_applied.erase(fromosd);
+
+    parent->update_peer_last_complete_ondisk(
+      fromosd,
+      r->get_last_complete_ondisk());
+
+    if (ip_op.waiting_for_applied.empty() &&
+        ip_op.on_applied) {
+      ip_op.on_applied->complete(0);
+      ip_op.on_applied = 0;
+    }
+    if (ip_op.waiting_for_commit.empty() &&
+        ip_op.on_commit) {
+      ip_op.on_commit->complete(0);
+      ip_op.on_commit= 0;
+    }
+    if (ip_op.done()) {
+      assert(!ip_op.on_commit && !ip_op.on_applied);
+      in_progress_ops.erase(iter);
+    }
+  }
 }
index 6960d223e2a92c72e6f63b6eb53c217948fd61be..166826251e3445e2dfd5d28029c526b098e838d0 100644 (file)
@@ -172,18 +172,6 @@ public:
     const string &attr,
     bufferlist *out);
 
-  /**
-   * Client IO
-   */
-  PGTransaction *get_transaction();
-  void submit_transaction(
-    PGTransaction *t,
-    vector<pg_log_entry_t> &log_entries,
-    Context *on_all_acked,
-    Context *on_all_commit,
-    tid_t tid
-    );
-
 private:
   // push
   struct PushInfo {
@@ -340,6 +328,99 @@ private:
     const ObjectRecoveryInfo& recovery_info,
     SnapSetContext *ssc
     );
+
+  /**
+   * Client IO
+   */
+  struct InProgressOp {
+    tid_t tid;
+    set<int> waiting_for_commit;
+    set<int> waiting_for_applied;
+    Context *on_commit;
+    Context *on_applied;
+    OpRequestRef op;
+    eversion_t v;
+    InProgressOp(
+      tid_t tid, Context *on_commit, Context *on_applied,
+      OpRequestRef op, eversion_t v)
+      : tid(tid), on_commit(on_commit), on_applied(on_applied),
+       op(op), v(v) {}
+    bool done() const {
+      return waiting_for_commit.empty() &&
+       waiting_for_applied.empty();
+    }
+  };
+  map<tid_t, InProgressOp> in_progress_ops;
+public:
+  PGTransaction *get_transaction();
+  friend class C_OSD_OnOpCommit;
+  friend class C_OSD_OnOpApplied;
+  void submit_transaction(
+    const hobject_t &hoid,
+    const eversion_t &at_version,
+    PGTransaction *t,
+    const eversion_t &trim_to,
+    vector<pg_log_entry_t> &log_entries,
+    Context *on_local_applied_sync,
+    Context *on_all_applied,
+    Context *on_all_commit,
+    tid_t tid,
+    osd_reqid_t reqid,
+    OpRequestRef op
+    );
+private:
+  void issue_op(
+    const hobject_t &soid,
+    const eversion_t &at_version,
+    tid_t tid,
+    osd_reqid_t reqid,
+    eversion_t pg_trim_to,
+    hobject_t new_temp_oid,
+    hobject_t discard_temp_oid,
+    vector<pg_log_entry_t> &log_entries,
+    InProgressOp *op,
+    ObjectStore::Transaction *op_t);
+  void op_applied(InProgressOp *op);
+  void op_commit(InProgressOp *op);
+  void sub_op_modify_reply(OpRequestRef op);
+  void sub_op_modify(OpRequestRef op);
+
+  struct RepModify {
+    OpRequestRef op;
+    bool applied, committed;
+    int ackerosd;
+    eversion_t last_complete;
+    epoch_t epoch_started;
+
+    uint64_t bytes_written;
+
+    ObjectStore::Transaction opt, localt;
+    
+    RepModify() : applied(false), committed(false), ackerosd(-1),
+                 epoch_started(0), bytes_written(0) {}
+  };
+  typedef std::tr1::shared_ptr<RepModify> RepModifyRef;
+
+  struct C_OSD_RepModifyApply : public Context {
+    ReplicatedBackend *pg;
+    RepModifyRef rm;
+    C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
+      : pg(pg), rm(r) {}
+    void finish(int r) {
+      pg->sub_op_modify_applied(rm);
+    }
+  };
+  struct C_OSD_RepModifyCommit : public Context {
+    ReplicatedBackend *pg;
+    RepModifyRef rm;
+    C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
+      : pg(pg), rm(r) {}
+    void finish(int r) {
+      pg->sub_op_modify_commit(rm);
+    }
+  };
+  void sub_op_modify_applied(RepModifyRef rm);
+  void sub_op_modify_commit(RepModifyRef rm);
 };
 
 #endif
index 332fbdcd067c2e1bc1cbca3d71d84f8812d3de8e..52667289de0033af42ed52154b9eab51c7f174bd 100644 (file)
@@ -1301,6 +1301,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
   OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
                                 &obc->obs, obc->ssc, 
                                 this);
+  ctx->op_t = pgbackend->get_transaction();
   ctx->obc = obc;
   if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) {
     dout(20) << __func__ << ": skipping rw locks" << dendl;
@@ -1496,8 +1497,8 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
 
   // this method must be idempotent since we may call it several times
   // before we finally apply the resulting transaction.
-  ctx->op_t = ObjectStore::Transaction();
-  ctx->local_t = ObjectStore::Transaction();
+  delete ctx->op_t;
+  ctx->op_t = pgbackend->get_transaction();
 
   if (op->may_write() || op->may_cache()) {
     // dup/replay?
@@ -1618,7 +1619,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
   // possible to construct an operation that does a read, does a guard
   // check (e.g., CMPXATTR), and then a write.  Then we either succeed
   // with the write, or return a CMPXATTR and the read value.
-  if ((ctx->op_t.empty() && !ctx->modify) || result < 0) {
+  if ((ctx->op_t->empty() && !ctx->modify) || result < 0) {
     // read.
     ctx->reply->claim_op_out_data(ctx->ops);
     ctx->reply->get_header().data_off = ctx->data_off;
@@ -1632,7 +1633,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
   ctx->reply->set_result(result);
 
   // read or error?
-  if (ctx->op_t.empty() || result < 0) {
+  if (ctx->op_t->empty() || result < 0) {
     MOSDOpReply *reply = ctx->reply;
     ctx->reply = NULL;
 
@@ -1660,8 +1661,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
   // trim log?
   calc_trim_to();
 
-  append_log(ctx->log, pg_trim_to, ctx->local_t);
-  
   // verify that we are doing this in order?
   if (cct->_conf->osd_debug_op_order && m->get_source().is_client()) {
     map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid];
@@ -1798,8 +1797,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
       return;
     }
   }
-
-  sub_op_modify(op);
 }
 
 void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
@@ -1814,8 +1811,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
       return;
     }
   }
-
-  sub_op_modify_reply(op);
 }
 
 void ReplicatedPG::do_scan(
@@ -2134,7 +2129,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
   OpContext *ctx = repop->ctx;
   ctx->at_version = get_next_version();
 
-  ObjectStore::Transaction *t = &ctx->op_t;
+  PGBackend::PGTransaction *t = ctx->op_t;
   set<snapid_t> new_snaps;
   for (set<snapid_t>::iterator i = old_snaps.begin();
        i != old_snaps.end();
@@ -2147,7 +2142,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     // remove clone
     dout(10) << coid << " snaps " << old_snaps << " -> "
             << new_snaps << " ... deleting" << dendl;
-    t->remove(coll, coid);
+    t->remove(coid);
 
     // ...from snapset
     snapid_t last = coid.snap;
@@ -2207,7 +2202,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
     coi.version = ctx->at_version;
     bl.clear();
     ::encode(coi, bl);
-    t->setattr(coll, coid, OI_ATTR, bl);
+    t->setattr(coid, OI_ATTR, bl);
 
     ctx->log.push_back(
       pg_log_entry_t(
@@ -2246,7 +2241,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
       );
     ctx->snapset_obc->obs.exists = false;
 
-    t->remove(coll, snapoid);
+    t->remove(snapoid);
   } else {
     dout(10) << coid << " updating snapset on " << snapoid << dendl;
     ctx->log.push_back(
@@ -2266,11 +2261,11 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
 
     bl.clear();
     ::encode(snapset, bl);
-    t->setattr(coll, snapoid, SS_ATTR, bl);
+    t->setattr(snapoid, SS_ATTR, bl);
 
     bl.clear();
     ::encode(ctx->snapset_obc->obs.oi, bl);
-    t->setattr(coll, snapoid, OI_ATTR, bl);
+    t->setattr(snapoid, OI_ATTR, bl);
   }
 
   return repop;
@@ -2687,7 +2682,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
   bool first_read = true;
 
-  ObjectStore::Transaction& t = ctx->op_t;
+  PGBackend::PGTransaction* t = ctx->op_t;
 
   dout(10) << "do_osd_op " << soid << " " << ops << dendl;
 
@@ -3393,7 +3388,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          if (obs.exists && !oi.is_whiteout()) {
            dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
                     << ", truncating to " << op.extent.truncate_size << dendl;
-           t.truncate(coll, soid, op.extent.truncate_size);
+           t->truncate(soid, op.extent.truncate_size);
            oi.truncate_seq = op.extent.truncate_seq;
            oi.truncate_size = op.extent.truncate_size;
            if (op.extent.truncate_size != oi.size) {
@@ -3411,7 +3406,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
        if (result < 0)
          break;
-       t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata);
+       t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
        write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges,
                                    op.extent.offset, op.extent.length, true);
        if (!obs.exists) {
@@ -3432,12 +3427,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        if (result < 0)
          break;
        if (obs.exists) {
-         t.truncate(coll, soid, 0);
+         t->truncate(soid, 0);
        } else {
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
-       t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata);
+       t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
        interval_set<uint64_t> ch;
        if (oi.size > 0)
          ch.insert(0, oi.size);
@@ -3465,7 +3460,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        assert(op.extent.length);
        if (obs.exists && !oi.is_whiteout()) {
-         t.zero(coll, soid, op.extent.offset, op.extent.length);
+         t->zero(soid, op.extent.offset, op.extent.length);
          interval_set<uint64_t> ch;
          ch.insert(op.extent.offset, op.extent.length);
          ctx->modified_ranges.union_of(ch);
@@ -3503,7 +3498,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            }
          }
          if (result >= 0 && !obs.exists) {
-           t.touch(coll, soid);
+           t->touch(soid);
            ctx->delta_stats.num_objects++;
            obs.exists = true;
          }
@@ -3542,7 +3537,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          oi.truncate_size = op.extent.truncate_size;
        }
 
-       t.truncate(coll, soid, op.extent.offset);
+       t->truncate(soid, op.extent.offset);
        if (oi.size > op.extent.offset) {
          interval_set<uint64_t> trim;
          trim.insert(op.extent.offset, oi.size-op.extent.offset);
@@ -3573,7 +3568,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       ++ctx->num_write;
       {
        if (!obs.exists) {
-         t.touch(coll, obs.oi.soid);
+         t->touch(obs.oi.soid);
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
@@ -3584,7 +3579,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EINVAL;
          break;
        }
-       t.clone_range(coll, src_obc->obs.oi.soid,
+       t->clone_range(src_obc->obs.oi.soid,
                      obs.oi.soid, op.clonerange.src_offset,
                      op.clonerange.length, op.clonerange.offset);
                      
@@ -3616,7 +3611,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          } else {
            dout(10) << " registered new watch " << w << " by " << entity << dendl;
            oi.watchers[make_pair(cookie, entity)] = w;
-           t.nop();  // make sure update the object_info on disk!
+           t->nop();  // make sure update the object_info on disk!
          }
          ctx->watch_connects.push_back(w);
         } else {
@@ -3626,7 +3621,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            dout(10) << " removed watch " << oi_iter->second << " by "
                     << entity << dendl;
             oi.watchers.erase(oi_iter);
-           t.nop();  // update oi on disk
+           t->nop();  // update oi on disk
            ctx->watch_disconnects.push_back(w);
          } else {
            dout(10) << " can't remove: no watch by " << entity << dendl;
@@ -3647,7 +3642,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        if (!obs.exists) {
-         t.touch(coll, soid);
+         t->touch(soid);
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
@@ -3656,7 +3651,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        string name = "_" + aname;
        bufferlist bl;
        bp.copy(op.xattr.value_len, bl);
-       t.setattr(coll, soid, name, bl);
+       t->setattr(soid, name, bl);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -3667,7 +3662,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        string aname;
        bp.copy(op.xattr.name_len, aname);
        string name = "_" + aname;
-       t.rmattr(coll, soid, name);
+       t->rmattr(soid, name);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -3692,7 +3687,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       break;
 
     case CEPH_OSD_OP_STARTSYNC:
-      t.start_sync();
+      // TODOSAM: either nop this or fix it
+      //t.start_sync();
       break;
 
 
@@ -3951,7 +3947,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
-       t.touch(coll, soid);
+       t->touch(soid);
        map<string, bufferlist> to_set;
        try {
          ::decode(to_set, bp);
@@ -3966,7 +3962,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
             ++i) {
          dout(20) << "\t" << i->first << dendl;
        }
-       t.omap_setkeys(coll, soid, to_set);
+       t->omap_setkeys(soid, to_set);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -3978,8 +3974,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          ctx->delta_stats.num_objects++;
          obs.exists = true;
        }
-       t.touch(coll, soid);
-       t.omap_setheader(coll, soid, osd_op.indata);
+       t->touch(soid);
+       t->omap_setheader(soid, osd_op.indata);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -3991,8 +3987,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
-       t.touch(coll, soid);
-       t.omap_clear(coll, soid);
+       t->touch(soid);
+       t->omap_clear(soid);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -4004,7 +4000,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -ENOENT;
          break;
        }
-       t.touch(coll, soid);
+       t->touch(soid);
        set<string> to_rm;
        try {
          ::decode(to_rm, bp);
@@ -4013,7 +4009,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EINVAL;
          goto fail;
        }
-       t.omap_rmkeys(coll, soid, to_rm);
+       t->omap_rmkeys(soid, to_rm);
        ctx->delta_stats.num_wr++;
       }
       break;
@@ -4123,11 +4119,13 @@ inline int ReplicatedPG::_delete_head(OpContext *ctx, bool no_whiteout)
   ObjectState& obs = ctx->new_obs;
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
-  ObjectStore::Transaction& t = ctx->op_t;
+  PGBackend::PGTransaction* t = ctx->op_t;
 
   if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout))
     return -ENOENT;
   
+  t->remove(soid);
+
   if (oi.size > 0) {
     interval_set<uint64_t> ch;
     ch.insert(0, oi.size);
@@ -4143,13 +4141,10 @@ inline int ReplicatedPG::_delete_head(OpContext *ctx, bool no_whiteout)
     dout(20) << __func__ << " setting whiteout on " << soid << dendl;
     oi.set_flag(object_info_t::FLAG_WHITEOUT);
     ctx->delta_stats.num_whiteouts++;
-    t.truncate(coll, soid, 0);
-    t.omap_clear(coll, soid);
-    t.rmattrs(coll, soid);
+    t->touch(soid);
     return 0;
   }
 
-  t.remove(coll, soid);
   ctx->delta_stats.num_objects--;
   if (oi.is_dirty())
     ctx->delta_stats.num_objects_dirty--;
@@ -4166,7 +4161,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   ObjectState& obs = ctx->new_obs;
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
-  ObjectStore::Transaction& t = ctx->op_t;
+  PGBackend::PGTransaction* t = ctx->op_t;
   snapid_t snapid = (uint64_t)op.snap.snapid;
   hobject_t missing_oid;
 
@@ -4225,10 +4220,9 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
               << " and rolling back to old snap" << dendl;
 
       if (obs.exists)
-       t.remove(coll, soid);
+       t->remove(soid);
       
-      t.clone(coll,
-             rollback_to_sobject, soid);
+      t->clone(rollback_to_sobject, soid);
       snapset.head_exists = true;
 
       map<snapid_t, interval_set<uint64_t> >::iterator iter =
@@ -4263,23 +4257,23 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   return ret;
 }
 
-void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
+void ReplicatedPG::_make_clone(PGBackend::PGTransaction* t,
                               const hobject_t& head, const hobject_t& coid,
                               object_info_t *poi)
 {
   bufferlist bv;
   ::encode(*poi, bv);
 
-  t.clone(coll, head, coid);
-  t.setattr(coll, coid, OI_ATTR, bv);
-  t.rmattr(coll, coid, SS_ATTR);
+  t->clone(head, coid);
+  t->setattr(coid, OI_ATTR, bv);
+  t->rmattr(coid, SS_ATTR);
 }
 
 void ReplicatedPG::make_writeable(OpContext *ctx)
 {
   const hobject_t& soid = ctx->obs->oi.soid;
   SnapContext& snapc = ctx->snapc;
-  ObjectStore::Transaction t;
+  PGBackend::PGTransaction *t = pgbackend->get_transaction();
 
   // clone?
   assert(soid.snap == CEPH_NOSNAP);
@@ -4384,8 +4378,9 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
   }
   
   // prepend transaction to op_t
-  t.append(ctx->op_t);
-  t.swap(ctx->op_t);
+  t->append(ctx->op_t);
+  delete ctx->op_t;
+  ctx->op_t = t;
 
   // update snapset with latest snap context
   ctx->new_snapset.seq = snapc.seq;
@@ -4524,8 +4519,6 @@ hobject_t ReplicatedPG::generate_temp_object()
   ostringstream ss;
   ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq);
   hobject_t hoid = hobject_t::make_temp(ss.str());
-  // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
-  pgbackend->add_temp_obj(hoid);
   dout(20) << __func__ << " " << hoid << dendl;
   return hoid;
 }
@@ -4552,7 +4545,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     do_osd_op_effects(ctx);
 
   // read-op?  done?
-  if (ctx->op_t.empty() && !ctx->modify) {
+  if (ctx->op_t->empty() && !ctx->modify) {
     unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
     return result;
   }
@@ -4600,7 +4593,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
 
        ctx->snapset_obc = get_object_context(snapoid, false);
        if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
-         ctx->op_t.remove(coll, snapoid);
+         ctx->op_t->remove(snapoid);
          dout(10) << " removing old " << snapoid << dendl;
 
          ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid,
@@ -4631,9 +4624,9 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
 
       bufferlist bv(sizeof(ctx->new_obs.oi));
       ::encode(ctx->snapset_obc->obs.oi, bv);
-      ctx->op_t.touch(coll, snapoid);
-      ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv);
-      ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss);
+      ctx->op_t->touch(snapoid);
+      ctx->op_t->setattr(snapoid, OI_ATTR, bv);
+      ctx->op_t->setattr(snapoid, SS_ATTR, bss);
       ctx->at_version.version++;
     }
   }
@@ -4650,7 +4643,7 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
       ctx->user_at_version = ctx->at_version.version;
     ctx->new_obs.oi.user_version = ctx->user_at_version;
   }
-  ctx->bytes_written = ctx->op_t.get_encoded_bytes();
+  ctx->bytes_written = ctx->op_t->get_bytes_written();
  
   if (ctx->new_obs.exists) {
     // on the head object
@@ -4666,12 +4659,12 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
 
     bufferlist bv(sizeof(ctx->new_obs.oi));
     ::encode(ctx->new_obs.oi, bv);
-    ctx->op_t.setattr(coll, soid, OI_ATTR, bv);
+    ctx->op_t->setattr(soid, OI_ATTR, bv);
 
     if (soid.snap == CEPH_NOSNAP) {
       dout(10) << " final snapset " << ctx->new_snapset
               << " in " << soid << dendl;
-      ctx->op_t.setattr(coll, soid, SS_ATTR, bss);
+      ctx->op_t->setattr(soid, SS_ATTR, bss);   
     } else {
       dout(10) << " no snapset (this is a clone)" << dendl;
     }
@@ -4989,6 +4982,8 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
     }
   }
 
+  assert(cop->rval >= 0);
+
   if (!cop->cursor.is_complete()) {
     // write out what we have so far
     if (cop->temp_cursor.is_initial()) {
@@ -5002,7 +4997,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
     if (cop->temp_cursor.is_initial()) {
       repop->ctx->new_temp_oid = cop->results.temp_oid;
     }
-    _write_copy_chunk(cop, &repop->ctx->op_t);
+    _write_copy_chunk(cop, repop->ctx->op_t);
     simple_repop_submit(repop);
     dout(10) << __func__ << " fetching more" << dendl;
     _copy_some(cobc, cop);
@@ -5010,6 +5005,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
   }
 
   dout(20) << __func__ << " success; committing" << dendl;
+  cop->results.final_tx = pgbackend->get_transaction();
   _build_finish_copy_transaction(cop, cop->results.final_tx);
 
  out:
@@ -5022,7 +5018,7 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
   kick_object_context_blocked(cobc);
 }
 
-void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
           << " " << cop->attrs.size() << " attrs"
@@ -5030,51 +5026,51 @@ void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
           << " " << cop->omap.size() << " keys"
           << dendl;
   if (!cop->temp_cursor.attr_complete) {
-    t->touch(cop->results.temp_coll, cop->results.temp_oid);
+    t->touch(cop->results.temp_oid);
     for (map<string,bufferlist>::iterator p = cop->attrs.begin();
-        p != cop->attrs.end(); ++p)
-      t->setattr(cop->results.temp_coll, cop->results.temp_oid,
-                string("_") + p->first, p->second);
+        p != cop->attrs.end();
+        ++p)
+      t->setattr(
+       cop->results.temp_oid,
+       string("_") + p->first, p->second);
     cop->attrs.clear();
   }
   if (!cop->temp_cursor.data_complete) {
-    t->write(cop->results.temp_coll, cop->results.temp_oid,
-            cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+    t->write(
+      cop->results.temp_oid,
+      cop->temp_cursor.data_offset, cop->data.length(), cop->data);
     cop->data.clear();
   }
   if (!cop->temp_cursor.omap_complete) {
     if (cop->omap_header.length()) {
-      t->omap_setheader(cop->results.temp_coll, cop->results.temp_oid,
-                       cop->omap_header);
+      t->omap_setheader(
+       cop->results.temp_oid,
+       cop->omap_header);
       cop->omap_header.clear();
     }
-    t->omap_setkeys(cop->results.temp_coll, cop->results.temp_oid, cop->omap);
+    t->omap_setkeys(cop->results.temp_oid, cop->omap);
     cop->omap.clear();
   }
   cop->temp_cursor = cop->cursor;
 }
 
 void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
-                                                  ObjectStore::Transaction& t)
+                                                  PGBackend::PGTransaction* t)
 {
   ObjectState& obs = cop->obc->obs;
 
   if (obs.exists) {
-    t.remove(coll, obs.oi.soid);
+    t->remove(obs.oi.soid);
   }
 
   if (cop->temp_cursor.is_initial()) {
     // write directly to final object
     cop->results.temp_oid = obs.oi.soid;
-    _write_copy_chunk(cop, &t);
+    _write_copy_chunk(cop, t);
   } else {
     // finish writing to temp object, then move into place
-    _write_copy_chunk(cop, &t);
-    t.collection_move_rename(
-      cop->results.temp_coll, cop->results.temp_oid, coll, obs.oi.soid);
-
-    // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
-    pgbackend->clear_temp_obj(cop->results.temp_oid);
+    _write_copy_chunk(cop, t);
+    t->rename(cop->results.temp_oid, obs.oi.soid);
   }
 }
 
@@ -5091,8 +5087,9 @@ void ReplicatedPG::finish_copyfrom(OpContext *ctx)
   if (cb->is_temp_obj_used()) {
     ctx->discard_temp_oid = cb->results->temp_oid;
   }
-  ctx->op_t.swap(cb->results->final_tx);
-  ctx->op_t.append(cb->results->final_tx);
+  ctx->op_t->append(cb->results->final_tx);
+  delete cb->results->final_tx;
+  cb->results->final_tx = NULL;
 
   // CopyFromCallback fills this in for us
   obs.oi.user_version = ctx->user_at_version;
@@ -5172,12 +5169,14 @@ void ReplicatedPG::finish_promote(int r, OpRequestRef op,
 
   if (whiteout) {
     // create a whiteout
-    tctx->op_t.touch(coll, soid);
+    tctx->op_t->touch(soid);
     tctx->new_obs.oi.set_flag(object_info_t::FLAG_WHITEOUT);
     ++tctx->delta_stats.num_whiteouts;
     dout(20) << __func__ << " creating whiteout" << dendl;
   } else {
-    tctx->op_t.swap(results->final_tx);
+    tctx->op_t->append(results->final_tx);
+    delete results->final_tx;
+    results->final_tx = NULL;
     if (results->started_temp_obj) {
       tctx->discard_temp_oid = results->temp_oid;
     }
@@ -5577,100 +5576,67 @@ void ReplicatedPG::cancel_flush_ops(bool requeue)
 // ========================================================================
 // rep op gather
 
-class C_OSD_OpApplied : public Context {
-public:
+class C_OSD_RepopApplied : public Context {
   ReplicatedPGRef pg;
-  ReplicatedPG::RepGather *repop;
-
-  C_OSD_OpApplied(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
-    pg(p), repop(rg) {
-    repop->get();
-  }
-  void finish(int r) {
-    pg->op_applied(repop);
-  }
-};
-
-class C_OSD_OpCommit : public Context {
+  boost::intrusive_ptr<ReplicatedPG::RepGather> repop;
 public:
-  ReplicatedPGRef pg;
-  ReplicatedPG::RepGather *repop;
-
-  C_OSD_OpCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
-    pg(p), repop(rg) {
-    repop->get();
-  }
-  void finish(int r) {
-    pg->op_commit(repop);
+  C_OSD_RepopApplied(ReplicatedPG *pg, ReplicatedPG::RepGather *repop)
+  : pg(pg), repop(repop) {}
+  void finish(int) {
+    pg->repop_all_applied(repop.get());
   }
 };
 
-void ReplicatedPG::apply_repop(RepGather *repop)
-{
-  dout(10) << "apply_repop  applying update on " << *repop << dendl;
-  assert(!repop->applying);
-  assert(!repop->applied);
-
-  repop->applying = true;
-
-  repop->tls.push_back(&repop->ctx->local_t);
-  repop->tls.push_back(&repop->ctx->op_t);
-
-  repop->obc->ondisk_write_lock();
-  if (repop->ctx->clone_obc)
-    repop->ctx->clone_obc->ondisk_write_lock();
 
-  bool unlock_snapset_obc = false;
-  if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
-      repop->obc->obs.oi.soid) {
-    repop->ctx->snapset_obc->ondisk_write_lock();
-    unlock_snapset_obc = true;
+void ReplicatedPG::repop_all_applied(RepGather *repop)
+{
+  dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied "
+          << dendl;
+  repop->all_applied = true;
+  if (!repop->rep_aborted) {
+    eval_repop(repop);
   }
+}
 
-  Context *oncommit = new C_OSD_OpCommit(this, repop);
-  Context *onapplied = new C_OSD_OpApplied(this, repop);
-  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
-    repop->obc,
-    repop->ctx->clone_obc,
-    unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
-  int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
-  if (r) {
-    derr << "apply_repop  queue_transactions returned " << r << " on " << *repop << dendl;
-    assert(0);
+class C_OSD_RepopCommit : public Context {
+  ReplicatedPGRef pg;
+  boost::intrusive_ptr<ReplicatedPG::RepGather> repop;
+public:
+  C_OSD_RepopCommit(ReplicatedPG *pg, ReplicatedPG::RepGather *repop)
+    : pg(pg), repop(repop) {}
+  void finish(int) {
+    pg->repop_all_committed(repop.get());
   }
-}
+};
 
-void ReplicatedPG::op_applied(RepGather *repop)
+void ReplicatedPG::repop_all_committed(RepGather *repop)
 {
-  lock();
-  dout(10) << "op_applied " << *repop << dendl;
-  if (repop->ctx->op)
-    repop->ctx->op->mark_event("op_applied");
-  
-  repop->applying = false;
-  repop->applied = true;
-
-  // (logical) local ack.
-  int whoami = osd->get_nodeid();
-
-  if (!repop->aborted) {
-    assert(repop->waitfor_ack.count(whoami) ||
-          repop->waitfor_disk.count(whoami) == 0);  // commit before ondisk
-    repop->waitfor_ack.erase(whoami);
+  dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed "
+          << dendl;
+  repop->all_committed = true;
 
+  if (!repop->rep_aborted) {
     if (repop->v != eversion_t()) {
-      assert(info.last_update >= repop->v);
-      assert(last_update_applied < repop->v);
-      last_update_applied = repop->v;
+      last_update_ondisk = repop->v;
+      last_complete_ondisk = repop->pg_local_last_complete;
     }
+    eval_repop(repop);
+  }
+}
 
-    // chunky scrub
+void ReplicatedPG::op_applied(const eversion_t &applied_version)
+{
+  dout(10) << "op_applied on primary on version " << applied_version << dendl;
+  if (applied_version == eversion_t())
+    return;
+  assert(applied_version > last_update_applied);
+  assert(applied_version <= info.last_update);
+  last_update_applied = applied_version;
+  if (is_primary()) {
     if (scrubber.active && scrubber.is_chunky) {
       if (last_update_applied == scrubber.subset_last_update) {
         osd->scrub_wq.queue(this);
       }
-
-    // classic scrub
     } else if (last_update_applied == info.last_update && scrubber.block_writes) {
       dout(10) << "requeueing scrub for cleanup" << dendl;
       scrubber.finalizing = true;
@@ -5679,49 +5645,17 @@ void ReplicatedPG::op_applied(RepGather *repop)
       scrubber.waiting_on_whom.insert(osd->whoami);
       osd->scrub_wq.queue(this);
     }
-  }
-
-  if (!repop->aborted)
-    eval_repop(repop);
-
-  repop->put();
-  unlock();
-}
-
-void ReplicatedPG::op_commit(RepGather *repop)
-{
-  lock();
-  if (repop->ctx->op)
-    repop->ctx->op->mark_event("op_commit");
-
-  if (repop->aborted) {
-    dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
-  } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
-    dout(10) << "op_commit " << *repop << " -- already marked ondisk" << dendl;
   } else {
-    dout(10) << "op_commit " << *repop << dendl;
-    int whoami = osd->get_nodeid();
-
-    repop->waitfor_disk.erase(whoami);
-
-    // remove from ack waitfor list too.  sub_op_modify_commit()
-    // behaves the same in that the COMMIT implies and ACK and there
-    // is no separate reply sent.
-    repop->waitfor_ack.erase(whoami);
-    
-    if (repop->v != eversion_t()) {
-      last_update_ondisk = repop->v;
-      last_complete_ondisk = repop->pg_local_last_complete;
+    dout(10) << "op_applied on replica on version " << applied_version << dendl;
+    if (scrubber.active_rep_scrub) {
+      if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
+       osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
+       scrubber.active_rep_scrub = 0;
+      }
     }
-    eval_repop(repop);
   }
-
-  repop->put();
-  unlock();
 }
 
-
-
 void ReplicatedPG::eval_repop(RepGather *repop)
 {
   MOSDOp *m = NULL;
@@ -5731,27 +5665,23 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   if (m)
     dout(10) << "eval_repop " << *repop
             << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"")
-            << (repop->done() ? " DONE" : "")
+            << (repop->rep_done ? " DONE" : "")
             << dendl;
   else
     dout(10) << "eval_repop " << *repop << " (no op)"
-            << (repop->done() ? " DONE" : "")
+            << (repop->rep_done ? " DONE" : "")
             << dendl;
 
-  if (repop->done())
+  if (repop->rep_done)
     return;
 
-  // apply?
-  if (!repop->applied && !repop->applying)
-    apply_repop(repop);
-  
   if (m) {
 
     // an 'ondisk' reply implies 'ack'. so, prefer to send just one
     // ondisk instead of ack followed by ondisk.
 
     // ondisk?
-    if (repop->waitfor_disk.empty()) {
+    if (repop->all_committed) {
 
       release_op_ctx_locks(repop->ctx);
 
@@ -5795,7 +5725,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
     }
 
     // applied?
-    if (repop->waitfor_ack.empty()) {
+    if (repop->all_applied) {
 
       // send dup acks, in order
       if (waiting_for_ack.count(repop->v)) {
@@ -5840,9 +5770,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
   }
 
   // done.
-  if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty() &&
-      repop->applied) {
-    repop->mark_done();
+  if (repop->all_applied && repop->all_committed) {
+    repop->rep_done = true;
 
     calc_min_last_complete_ondisk();
 
@@ -5868,74 +5797,124 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
 {
   OpContext *ctx = repop->ctx;
   const hobject_t& soid = ctx->obs->oi.soid;
-
+  if (ctx->op &&
+    ((static_cast<MOSDOp *>(
+       ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
+    // replicate original op for parallel execution on replica
+    assert(0 == "broken implementation, do not use");
+  }
   dout(7) << "issue_repop rep_tid " << repop->rep_tid
           << " o " << soid
           << dendl;
 
   repop->v = ctx->at_version;
 
-  // add myself to gather set
-  repop->waitfor_ack.insert(acting[0]);
-  repop->waitfor_disk.insert(acting[0]);
+  for (vector<int>::iterator i = actingbackfill.begin() + 1;
+       i != actingbackfill.end();
+       ++i) {
+    pg_info_t &pinfo = peer_info[*i];
+    // keep peer_info up to date
+    if (pinfo.last_complete == pinfo.last_update)
+      pinfo.last_complete = ctx->at_version;
+    pinfo.last_update = ctx->at_version;
+  }
+
+  repop->obc->ondisk_write_lock();
+  if (repop->ctx->clone_obc)
+    repop->ctx->clone_obc->ondisk_write_lock();
+
+  bool unlock_snapset_obc = false;
+  if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
+      repop->obc->obs.oi.soid) {
+    repop->ctx->snapset_obc->ondisk_write_lock();
+    unlock_snapset_obc = true;
+  }
 
+  Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
+  Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
+  Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
+    repop->obc,
+    repop->ctx->clone_obc,
+    unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
+  pgbackend->submit_transaction(
+    soid,
+    repop->ctx->at_version,
+    repop->ctx->op_t,
+    pg_trim_to,
+    repop->ctx->log,
+    onapplied_sync,
+    on_all_applied,
+    on_all_commit,
+    repop->rep_tid,
+    repop->ctx->reqid,
+    repop->ctx->op);
+  repop->ctx->op_t = NULL;
+}
+    
+void ReplicatedBackend::issue_op(
+  const hobject_t &soid,
+  const eversion_t &at_version,
+  tid_t tid,
+  osd_reqid_t reqid,
+  eversion_t pg_trim_to,
+  hobject_t new_temp_oid,
+  hobject_t discard_temp_oid,
+  vector<pg_log_entry_t> &log_entries,
+  InProgressOp *op,
+  ObjectStore::Transaction *op_t)
+{
   int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
 
-  assert(actingbackfill.size() > 0);
-  if (ctx->op && actingbackfill.size() > 1) {
+  if (parent->get_actingbackfill().size() > 1) {
     ostringstream ss;
-    ss << "waiting for subops from " << vector<int>(actingbackfill.begin() + 1, actingbackfill.end());
-    ctx->op->mark_sub_op_sent(ss.str());
+    ss << "waiting for subops from " << 
+      vector<int>(
+       parent->get_actingbackfill().begin() + 1,
+       parent->get_actingbackfill().end());
+    if (op->op)
+      op->op->mark_sub_op_sent(ss.str());
   }
-  for (unsigned i=1; i<actingbackfill.size(); i++) {
-    int peer = actingbackfill[i];
-    pg_info_t &pinfo = peer_info[peer];
+  for (unsigned i=1; i<parent->get_actingbackfill().size(); i++) {
+    int peer = parent->get_actingbackfill()[i];
+    const pg_info_t &pinfo = parent->get_peer_info().find(peer)->second;
 
-    repop->waitfor_ack.insert(peer);
-    repop->waitfor_disk.insert(peer);
+    op->waiting_for_applied.insert(peer);
+    op->waiting_for_commit.insert(peer);
 
     // forward the write/update/whatever
-    MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
-                                 false, acks_wanted,
-                                 get_osdmap()->get_epoch(),
-                                 repop->rep_tid, repop->ctx->at_version);
-    if (ctx->op &&
-       ((static_cast<MOSDOp *>(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
-      // replicate original op for parallel execution on replica
-      assert(0 == "broken implementation, do not use");
-    }
+    MOSDSubOp *wr = new MOSDSubOp(
+      reqid, get_info().pgid, soid,
+      false, acks_wanted,
+      get_osdmap()->get_epoch(),
+      tid, at_version);
 
     // ship resulting transaction, log entries, and pg_stats
-    if (!should_send_op(peer, soid)) {
+    if (!parent->should_send_op(peer, soid)) {
       dout(10) << "issue_repop shipping empty opt to osd." << peer
               <<", object " << soid
               << " beyond MAX(last_backfill_started "
-              << last_backfill_started << ", pinfo.last_backfill "
+              << ", pinfo.last_backfill "
               << pinfo.last_backfill << ")" << dendl;
       ObjectStore::Transaction t;
       ::encode(t, wr->get_data());
     } else {
-      ::encode(repop->ctx->op_t, wr->get_data());
+      ::encode(*op_t, wr->get_data());
     }
 
-    ::encode(repop->ctx->log, wr->logbl);
+    ::encode(log_entries, wr->logbl);
 
-    if (is_backfill_targets(peer))
+    if (pinfo.is_incomplete())
       wr->pg_stats = pinfo.stats;  // reflects backfill progress
     else
-      wr->pg_stats = info.stats;
+      wr->pg_stats = get_info().stats;
     
     wr->pg_trim_to = pg_trim_to;
 
-    wr->new_temp_oid = repop->ctx->new_temp_oid;
-    wr->discard_temp_oid = repop->ctx->discard_temp_oid;
+    wr->new_temp_oid = new_temp_oid;
+    wr->discard_temp_oid = discard_temp_oid;
 
     osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
 
-    // keep peer_info up to date
-    if (pinfo.last_complete == pinfo.last_update)
-      pinfo.last_update = ctx->at_version;
-    pinfo.last_update = ctx->at_version;
   }
 }
 
@@ -5970,53 +5949,6 @@ void ReplicatedPG::remove_repop(RepGather *repop)
   osd->logger->set(l_osd_op_wip, repop_map.size());
 }
 
-void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
-                            int fromosd, eversion_t peer_lcod)
-{
-  MOSDOp *m = NULL;
-
-  if (repop->ctx->op)
-    m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
-
-  if (m)
-    dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
-           << " result " << result
-           << " ack_type " << ack_type
-           << " from osd." << fromosd
-           << dendl;
-  else
-    dout(7) << "repop_ack rep_tid " << repop->rep_tid << " (no op) "
-           << " result " << result
-           << " ack_type " << ack_type
-           << " from osd." << fromosd
-           << dendl;
-  
-  if (ack_type & CEPH_OSD_FLAG_ONDISK) {
-    if (repop->ctx->op)
-      repop->ctx->op->mark_event("sub_op_commit_rec");
-    // disk
-    if (repop->waitfor_disk.count(fromosd)) {
-      repop->waitfor_disk.erase(fromosd);
-      //repop->waitfor_nvram.erase(fromosd);
-      repop->waitfor_ack.erase(fromosd);
-      peer_last_complete_ondisk[fromosd] = peer_lcod;
-    }
-/*} else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) {
-    // nvram
-    repop->waitfor_nvram.erase(fromosd);
-    repop->waitfor_ack.erase(fromosd);*/
-  } else {
-    // ack
-    if (repop->ctx->op)
-      repop->ctx->op->mark_event("sub_op_applied_rec");
-    repop->waitfor_ack.erase(fromosd);
-  }
-
-  if (!repop->aborted)
-    eval_repop(repop);
-}
-
-
 ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
 {
   dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
@@ -6025,6 +5957,7 @@ ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
   OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
                                 &obc->obs, obc->ssc, this);
+  ctx->op_t = pgbackend->get_transaction();
   ctx->mtime = ceph_clock_now(g_ceph_context);
   ctx->obc = obc;
   RepGather *repop = new_repop(ctx, obc, rep_tid);
@@ -6034,17 +5967,11 @@ ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
 void ReplicatedPG::simple_repop_submit(RepGather *repop)
 {
   dout(20) << __func__ << " " << repop << dendl;
-  if (!repop->ctx->log.empty())
-    append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
   issue_repop(repop, repop->ctx->mtime);
   eval_repop(repop);
   repop->put();
 }
 
-
-
-
-
 // -------------------------------------------------------
 
 void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
@@ -6174,6 +6101,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
   OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
                                 &obc->obs, obc->ssc, this);
+  ctx->op_t = pgbackend->get_transaction();
   ctx->mtime = ceph_clock_now(cct);
   ctx->at_version = get_next_version();
 
@@ -6181,7 +6109,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
 
   RepGather *repop = new_repop(ctx, obc, rep_tid);
 
-  ObjectStore::Transaction *t = &ctx->op_t;
+  PGBackend::PGTransaction *t = ctx->op_t;
 
   ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
                                    ctx->at_version,
@@ -6193,9 +6121,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
   obc->obs.oi.version = ctx->at_version;
   bufferlist bl;
   ::encode(obc->obs.oi, bl);
-  t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
-
-  append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
+  t->setattr(obc->obs.oi.soid, OI_ATTR, bl);
 
   // obc ref swallowed by repop!
   issue_repop(repop, repop->ctx->mtime);
@@ -6587,7 +6513,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
 
 // sub op modify
 
-void ReplicatedPG::sub_op_modify(OpRequestRef op)
+void ReplicatedBackend::sub_op_modify(OpRequestRef op)
 {
   MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
   assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -6611,23 +6537,19 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
           << dendl;  
 
   // sanity checks
-  assert(m->map_epoch >= info.history.same_interval_since);
-  assert(is_active());
+  assert(m->map_epoch >= get_info().history.same_interval_since);
   
   // we better not be missing this.
-  assert(!pg_log.get_missing().is_missing(soid));
+  assert(!parent->get_log().get_missing().is_missing(soid));
 
-  int ackerosd = acting[0];
+  int ackerosd = m->get_source().num();
   
   op->mark_started();
 
-  RepModify *rm = new RepModify;
-  rm->pg = this;
-  get("RepModify");
+  RepModifyRef rm(new RepModify);
   rm->op = op;
-  rm->ctx = 0;
   rm->ackerosd = ackerosd;
-  rm->last_complete = info.last_complete;
+  rm->last_complete = get_info().last_complete;
   rm->epoch_started = get_osdmap()->get_epoch();
 
   if (!m->noop) {
@@ -6639,14 +6561,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
 
     if (m->new_temp_oid != hobject_t()) {
       dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
-      // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
-      pgbackend->add_temp_obj(m->new_temp_oid);
+      add_temp_obj(m->new_temp_oid);
       get_temp_coll(&rm->localt);
     }
     if (m->discard_temp_oid != hobject_t()) {
       dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
-      // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
-      pgbackend->clear_temp_obj(m->discard_temp_oid);
+      clear_temp_obj(m->discard_temp_oid);
     }
 
     ::decode(rm->opt, p);
@@ -6659,13 +6579,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
          i != log.end();
          ++i) {
        if (!i->soid.is_max() && i->soid.pool == -1)
-         i->soid.pool = info.pgid.pool();
+         i->soid.pool = get_info().pgid.pool();
       }
-      rm->opt.set_pool_override(info.pgid.pool());
+      rm->opt.set_pool_override(get_info().pgid.pool());
     }
     rm->opt.set_replica();
 
-    info.stats = m->pg_stats;
     bool update_snaps = false;
     if (!rm->opt.empty()) {
       // If the opt is non-empty, we infer we are before
@@ -6674,150 +6593,81 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
       // collections now.  Otherwise, we do it later on push.
       update_snaps = true;
     }
-    append_log(log, m->pg_trim_to, rm->localt, update_snaps);
-
-    rm->tls.push_back(&rm->localt);
-    rm->tls.push_back(&rm->opt);
-    
+    parent->update_stats(m->pg_stats);
+    parent->log_operation(
+      log,
+      m->pg_trim_to,
+      update_snaps,
+      &(rm->localt));
+      
     rm->bytes_written = rm->opt.get_encoded_bytes();
 
   } else {
+    assert(0);
+    #if 0
     // just trim the log
     if (m->pg_trim_to != eversion_t()) {
       pg_log.trim(m->pg_trim_to, info);
       dirty_info = true;
       write_if_dirty(rm->localt);
-      rm->tls.push_back(&rm->localt);
     }
+    #endif
   }
   
   op->mark_started();
 
-  Context *oncommit = new C_OSD_RepModifyCommit(rm);
-  Context *onapply = new C_OSD_RepModifyApply(rm);
-  int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op);
-  if (r) {
-    dout(0) << "error applying transaction: r = " << r << dendl;
-    assert(0);
-  }
+  rm->localt.append(rm->opt);
+  rm->localt.register_on_commit(
+    parent->bless_context(
+      new C_OSD_RepModifyCommit(this, rm)));
+  rm->localt.register_on_applied(
+    parent->bless_context(
+      new C_OSD_RepModifyApply(this, rm)));
+  parent->queue_transaction(&(rm->localt), op);
   // op is cleaned up by oncommit/onapply when both are executed
 }
 
-void ReplicatedPG::op_applied_replica(
-  const eversion_t &applied_version)
+void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
 {
-  dout(10) << "op_applied_replica on version " << applied_version << dendl;
-  if (applied_version != eversion_t()) {
-    assert(info.last_update >= applied_version);
-    assert(last_update_applied < applied_version);
-    last_update_applied = applied_version;
-  }
-  if (scrubber.active_rep_scrub) {
-    if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
-      osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
-      scrubber.active_rep_scrub = 0;
-    }
-  }
-}
-
-void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
-{
-  lock();
   rm->op->mark_event("sub_op_applied");
   rm->applied = true;
 
-  if (!pg_has_reset_since(rm->epoch_started)) {
-    dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl;
-    MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
-    assert(m->get_header().type == MSG_OSD_SUBOP);
-    
-    if (!rm->committed) {
-      // send ack to acker only if we haven't sent a commit already
-      MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
-      ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
-      osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
-    }
-    
-    op_applied_replica(m->version);
-  } else {
-    dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req()
-            << " from epoch " << rm->epoch_started << " < last_peering_reset "
-            << last_peering_reset << dendl;
-  }
-
-  bool done = rm->applied && rm->committed;
-  unlock();
-  if (done) {
-    delete rm->ctx;
-    delete rm;
-    put("RepModify");
+  dout(10) << "sub_op_modify_applied on " << rm << " op "
+          << *rm->op->get_req() << dendl;
+  MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+  
+  if (!rm->committed) {
+    // send ack to acker only if we haven't sent a commit already
+    MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+    ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+    osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
   }
+  
+  parent->op_applied(m->version);
 }
 
-void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
+void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
 {
-  lock();
   rm->op->mark_commit_sent();
   rm->committed = true;
 
-  if (!pg_has_reset_since(rm->epoch_started)) {
-    // send commit.
-    dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
-            << ", sending commit to osd." << rm->ackerosd
-            << dendl;
-    
-    if (get_osdmap()->is_up(rm->ackerosd)) {
-      last_complete_ondisk = rm->last_complete;
-      MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-      commit->set_last_complete_ondisk(rm->last_complete);
-      commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
-      osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
-    }
-  } else {
-    dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req()
-            << " from epoch " << rm->epoch_started << " < last_peering_reset "
-            << last_peering_reset << dendl;
-  }
+  // send commit.
+  dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
+          << ", sending commit to osd." << rm->ackerosd
+          << dendl;
   
-  log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
-  bool done = rm->applied && rm->committed;
-  unlock();
-  if (done) {
-    delete rm->ctx;
-    delete rm;
-    put("RepModify");
-  }
-}
-
-void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
-{
-  MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
-  assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
-
-  op->mark_started();
-
-  // must be replication.
-  tid_t rep_tid = r->get_tid();
-  int fromosd = r->get_source().num();
+  assert(get_osdmap()->is_up(rm->ackerosd));
+  get_parent()->update_last_complete_ondisk(rm->last_complete);
+  MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+  commit->set_last_complete_ondisk(rm->last_complete);
+  commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+  osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
   
-  if (repop_map.count(rep_tid)) {
-    // oh, good.
-    repop_ack(repop_map[rep_tid], 
-             r->get_result(), r->ack_type,
-             fromosd, 
-             r->get_last_complete_ondisk());
-  }
+  log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
 }
 
 
-
-
-
-
-
-
-
-
 // ===========================================================
 
 void ReplicatedBackend::calc_head_subsets(
@@ -8080,6 +7930,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
     t->register_on_complete(new C_OSD_SendMessageOnConn(
                              osd, reply, m->get_connection()));
   }
+  t->register_on_applied(
+    new ObjectStore::C_DeleteTransaction(t));
   get_parent()->queue_transaction(t);
   return;
 }
@@ -8332,9 +8184,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
     RepGather *repop = repop_queue.front();
     repop_queue.pop_front();
     dout(10) << " applying repop tid " << repop->rep_tid << dendl;
-    if (!repop->applied && !repop->applying)
-      apply_repop(repop);
-    repop->aborted = true;
+    repop->rep_aborted = true;
 
     if (requeue) {
       if (repop->ctx->op) {
@@ -9763,15 +9613,15 @@ void ReplicatedPG::hit_set_persist()
     // the deleted object over this period.
     hobject_t old_obj =
       get_hit_set_current_object(info.hit_set.current_last_stamp);
-    ctx->op_t.remove(coll, old_obj);
+    ctx->op_t->remove(old_obj);
     ctx->log.push_back(
-        pg_log_entry_t(pg_log_entry_t::DELETE,
-                      old_obj,
-                      ctx->at_version,
-                      info.hit_set.current_last_update,
-                      0,
-                      osd_reqid_t(),
-                      ctx->mtime));
+      pg_log_entry_t(pg_log_entry_t::DELETE,
+                    old_obj,
+                    ctx->at_version,
+                    info.hit_set.current_last_update,
+                    0,
+                    osd_reqid_t(),
+                    ctx->mtime));
     ++ctx->at_version.version;
 
     struct stat st;
@@ -9809,19 +9659,19 @@ void ReplicatedPG::hit_set_persist()
   bufferlist boi(sizeof(ctx->new_obs.oi));
   ::encode(ctx->new_obs.oi, boi);
 
-  ctx->op_t.write(coll, oid, 0, bl.length(), bl);
-  ctx->op_t.setattr(coll, oid, OI_ATTR, boi);
-  ctx->op_t.setattr(coll, oid, SS_ATTR, bss);
+  ctx->op_t->write(oid, 0, bl.length(), bl);
+  ctx->op_t->setattr(oid, OI_ATTR, boi);
+  ctx->op_t->setattr(oid, SS_ATTR, bss);
   ctx->log.push_back(
-        pg_log_entry_t(
-         pg_log_entry_t::MODIFY,
-         oid,
-         ctx->at_version,
-         ctx->obs->oi.version,
-         0,
-         osd_reqid_t(),
-         ctx->mtime)
-        );
+    pg_log_entry_t(
+      pg_log_entry_t::MODIFY,
+      oid,
+      ctx->at_version,
+      ctx->obs->oi.version,
+      0,
+      osd_reqid_t(),
+      ctx->mtime)
+    );
 
   hit_set_trim(repop, pool.info.hit_set_count);
 
@@ -9837,7 +9687,7 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
     assert(p != info.hit_set.history.end());
     hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
     dout(20) << __func__ << " removing " << oid << dendl;
-    repop->ctx->op_t.remove(coll, oid);
+    repop->ctx->op_t->remove(oid);
     ++repop->ctx->at_version.version;
     repop->ctx->log.push_back(
         pg_log_entry_t(pg_log_entry_t::DELETE,
@@ -10172,6 +10022,7 @@ boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
   RepGather *repop = pg->trim_object(pos);
   assert(repop);
   repop->queue_snap_trimmer = true;
+
   repops.insert(repop->get());
   pg->simple_repop_submit(repop);
   return discard_event();
@@ -10207,7 +10058,7 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&
   for (set<RepGather *>::iterator i = repops.begin();
        i != repops.end();
        repops.erase(i++)) {
-    if (!(*i)->applied || !(*i)->waitfor_ack.empty()) {
+    if (!(*i)->all_applied) {
       return discard_event();
     } else {
       (*i)->put();
@@ -10243,3 +10094,6 @@ void intrusive_ptr_release(ReplicatedPG *pg) { pg->put("intptr"); }
 uint64_t get_with_id(ReplicatedPG *pg) { return pg->get_with_id(); }
 void put_with_id(ReplicatedPG *pg, uint64_t id) { return pg->put_with_id(id); }
 #endif
+
+void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop) { repop->get(); }
+void intrusive_ptr_release(ReplicatedPG::RepGather *repop) { repop->put(); }
index 0626008dea0e2c55c81058a2d5a74de9ef194ade..1b56b02b7518fc739c33c6542bcfd1559c5f8a52 100644 (file)
@@ -112,7 +112,7 @@ public:
      * Final transaction; if non-empty the callback must execute it before any
      * other accesses to the object (in order to complete the copy).
      */
-    ObjectStore::Transaction final_tx;
+    PGBackend::PGTransaction *final_tx;
     string category; ///< The copy source's category
     version_t user_version; ///< The copy source's user version
     bool should_requeue;  ///< op should be requeued on cancel
@@ -324,8 +324,15 @@ public:
     map<string, bufferptr> &attrs) {
     return get_object_context(hoid, true, &attrs);
   }
+  void log_operation(
+    vector<pg_log_entry_t> &logv,
+    const eversion_t &trim_to,
+    bool update_snaps,
+    ObjectStore::Transaction *t) {
+    append_log(logv, trim_to, *t, update_snaps);
+  }
 
-  void op_applied_replica(
+  void op_applied(
     const eversion_t &applied_version);
 
   bool should_send_op(
@@ -338,6 +345,22 @@ public:
       assert(is_backfill_targets(peer));
     return should_send;
   }
+  
+  void update_peer_last_complete_ondisk(
+    int fromosd,
+    eversion_t lcod) {
+    peer_last_complete_ondisk[fromosd] = lcod;
+  }
+
+  void update_last_complete_ondisk(
+    eversion_t lcod) {
+    last_complete_ondisk = lcod;
+  }
+
+  void update_stats(
+    const pg_stat_t &stat) {
+    info.stats = stat;
+  }
 
   /*
    * Capture all object state associated with an in-progress read or write.
@@ -381,7 +404,7 @@ public:
 
     int current_osd_subop_num;
 
-    ObjectStore::Transaction op_t, local_t;
+    PGBackend::PGTransaction *op_t;
     vector<pg_log_entry_t> log;
 
     interval_set<uint64_t> modified_ranges;
@@ -417,6 +440,7 @@ public:
       modify(false), user_modify(false), undirty(false),
       bytes_written(0), bytes_read(0), user_at_version(0),
       current_osd_subop_num(0),
+      op_t(NULL),
       data_off(0), reply(NULL), pg(_pg),
       num_read(0),
       num_write(0),
@@ -435,6 +459,7 @@ public:
       }
     }
     ~OpContext() {
+      assert(!op_t);
       assert(lock_to_release == NONE);
       if (reply)
        reply->put();
@@ -445,7 +470,6 @@ public:
    * State on the PG primary associated with the replicated mutation
    */
   class RepGather {
-    bool is_done;
   public:
     xlist<RepGather*>::item queue_item;
     int nref;
@@ -458,11 +482,10 @@ public:
 
     tid_t rep_tid;
 
-    bool applying, applied, aborted;
+    bool rep_aborted, rep_done;
 
-    set<int>  waitfor_ack;
-    //set<int>  waitfor_nvram;
-    set<int>  waitfor_disk;
+    bool all_applied;
+    bool all_committed;
     bool sent_ack;
     //bool sent_nvram;
     bool sent_disk;
@@ -471,18 +494,16 @@ public:
     
     eversion_t          pg_local_last_complete;
 
-    list<ObjectStore::Transaction*> tls;
     bool queue_snap_trimmer;
     
     RepGather(OpContext *c, ObjectContextRef pi, tid_t rt, 
              eversion_t lc) :
-      is_done(false),
       queue_item(this),
       nref(1),
       ctx(c), obc(pi),
       rep_tid(rt), 
-      applying(false), applied(false), aborted(false),
-      sent_ack(false),
+      rep_aborted(false), rep_done(false),
+      all_applied(false), all_committed(false), sent_ack(false),
       //sent_nvram(false),
       sent_disk(false),
       pg_local_last_complete(lc),
@@ -500,16 +521,9 @@ public:
        //generic_dout(0) << "deleting " << this << dendl;
       }
     }
-    void mark_done() {
-      is_done = true;
-    }
-    bool done() {
-      return is_done;
-    }
   };
 
 
-
 protected:
 
   /**
@@ -544,6 +558,8 @@ protected:
    */
   void close_op_ctx(OpContext *ctx) {
     release_op_ctx_locks(ctx);
+    delete ctx->op_t;
+    ctx->op_t = NULL;
     delete ctx;
   }
 
@@ -578,16 +594,14 @@ protected:
   xlist<RepGather*> repop_queue;
   map<tid_t, RepGather*> repop_map;
 
-  void apply_repop(RepGather *repop);
-  void op_applied(RepGather *repop);
-  void op_commit(RepGather *repop);
+  friend class C_OSD_RepopApplied;
+  friend class C_OSD_RepopCommit;
+  void repop_all_applied(RepGather *repop);
+  void repop_all_committed(RepGather *repop);
   void eval_repop(RepGather*);
   void issue_repop(RepGather *repop, utime_t now);
   RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid);
   void remove_repop(RepGather *repop);
-  void repop_ack(RepGather *repop,
-                 int result, int ack_type,
-                 int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
 
   RepGather *simple_repop_create(ObjectContextRef obc);
   void simple_repop_submit(RepGather *repop);
@@ -613,7 +627,7 @@ protected:
         ++i) {
       if ((*i)->v > v)
         break;
-      if (!(*i)->waitfor_disk.empty())
+      if (!(*i)->all_committed)
        return false;
     }
     return true;
@@ -625,14 +639,12 @@ protected:
         ++i) {
       if ((*i)->v > v)
         break;
-      if (!(*i)->waitfor_ack.empty())
+      if (!(*i)->all_applied)
        return false;
     }
     return true;
   }
 
-  friend class C_OSD_OpCommit;
-  friend class C_OSD_OpApplied;
   friend struct C_OnPushCommit;
 
   // projected object info
@@ -792,7 +804,7 @@ protected:
 
   // low level ops
 
-  void _make_clone(ObjectStore::Transaction& t,
+  void _make_clone(PGBackend::PGTransaction* t,
                   const hobject_t& head, const hobject_t& coid,
                   object_info_t *poi);
   void execute_ctx(OpContext *ctx);
@@ -878,38 +890,6 @@ protected:
   void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
 
 
-  struct RepModify {
-    ReplicatedPG *pg;
-    OpRequestRef op;
-    OpContext *ctx;
-    bool applied, committed;
-    int ackerosd;
-    eversion_t last_complete;
-    epoch_t epoch_started;
-
-    uint64_t bytes_written;
-
-    ObjectStore::Transaction opt, localt;
-    list<ObjectStore::Transaction*> tls;
-    
-    RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
-                 epoch_started(0), bytes_written(0) {}
-  };
-
-  struct C_OSD_RepModifyApply : public Context {
-    RepModify *rm;
-    C_OSD_RepModifyApply(RepModify *r) : rm(r) { }
-    void finish(int r) {
-      rm->pg->sub_op_modify_applied(rm);
-    }
-  };
-  struct C_OSD_RepModifyCommit : public Context {
-    RepModify *rm;
-    C_OSD_RepModifyCommit(RepModify *r) : rm(r) { }
-    void finish(int r) {
-      rm->pg->sub_op_modify_commit(rm);
-    }
-  };
   struct C_OSD_OndiskWriteUnlock : public Context {
     ObjectContextRef obc, obc2, obc3;
     C_OSD_OndiskWriteUnlock(
@@ -964,11 +944,6 @@ protected:
 
   void sub_op_remove(OpRequestRef op);
 
-  void sub_op_modify(OpRequestRef op);
-  void sub_op_modify_applied(RepModify *rm);
-  void sub_op_modify_commit(RepModify *rm);
-
-  void sub_op_modify_reply(OpRequestRef op);
   void _applied_recovered_object(ObjectContextRef obc);
   void _applied_recovered_object_replica();
   void _committed_pushed_object(epoch_t epoch, eversion_t lc);
@@ -993,10 +968,10 @@ protected:
                  object_locator_t oloc, version_t version, unsigned flags,
                  bool mirror_snapset);
   void process_copy_chunk(hobject_t oid, tid_t tid, int r);
-  void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
+  void _write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t);
   void _copy_some(ObjectContextRef obc, CopyOpRef cop);
   void _build_finish_copy_transaction(CopyOpRef cop,
-                                      ObjectStore::Transaction& t);
+                                      PGBackend::PGTransaction *t);
   void finish_copyfrom(OpContext *ctx);
   void finish_promote(int r, OpRequestRef op,
                      CopyResults *results, ObjectContextRef obc);
@@ -1177,13 +1152,10 @@ public:
 inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
 {
   out << "repgather(" << &repop
-      << (repop.applying ? " applying" : "")
-      << (repop.applied ? " applied" : "")
       << " " << repop.v
       << " rep_tid=" << repop.rep_tid 
-      << " wfack=" << repop.waitfor_ack
-    //<< " wfnvram=" << repop.waitfor_nvram
-      << " wfdisk=" << repop.waitfor_disk;
+      << " committed?=" << repop.all_committed
+      << " applied?=" << repop.all_applied;
   if (repop.ctx->lock_to_release != ReplicatedPG::OpContext::NONE)
     out << " lock=" << (int)repop.ctx->lock_to_release;
   if (repop.ctx->op)
@@ -1192,4 +1164,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
   return out;
 }
 
+void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop);
+void intrusive_ptr_release(ReplicatedPG::RepGather *repop);
+
+
 #endif